Commit 82c6cdc3dcc4d5e655e4bceb49330718f961e39b

Authored by evan ago
1 parent a7900b4e32
Exists in master

gghVer2 cumulate calculate

Showing 3 changed files with 45 additions and 30 deletions Side-by-side Diff

app/com/piki_ds/ver2ggh/expConTime.scala View file @ 82c6cdc
... ... @@ -182,6 +182,7 @@
182 182 val day_delm = 24 * 1000 * 60 * 60L
183 183 // 로그 파스
184 184 val saveDay = if(args.length == 1) args(0) else format.format(currentTime.getTime - day_delm)
  185 + val (year, month, day) = (saveDay.slice(0, 4), saveDay.slice(4, 6), saveDay.slice(6, 8))
185 186 val df1 = getLogParse(saveDay)
186 187  
187 188 // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
... ... @@ -211,7 +212,7 @@
211 212 exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
212 213 drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
213 214  
214   - expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
  215 + expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/y=$year/mo=$month/dt=$saveDay")
215 216  
216 217 }
217 218 }
app/com/piki_ds/ver2ggh/gghScore.scala View file @ 82c6cdc
... ... @@ -17,7 +17,6 @@
17 17 import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
18 18 import org.apache.spark.sql.functions._
19 19  
20   -
21 20 object gghScore {
22 21  
23 22 val sc = SparkContext.getOrCreate()
24 23  
25 24  
26 25  
... ... @@ -31,19 +30,19 @@
31 30  
32 31 val format = new java.text.SimpleDateFormat("yyyyMMdd")
33 32 val currentTime = new Date()
34   -
35 33 val day_delm = 24 * 1000 * 60 * 60L
36 34 val saveDay = if (args.length >= 1) args(0) else format.format(currentTime.getTime - day_delm)
37   - val ind = -6 to 0
  35 + val ind = -29 to 0
38 36 val dateSet = ind.map(x => {
39 37 format.format(format.parse(saveDay).getTime + day_delm * x)
40   - })
  38 + }).map(x => (x, x.slice(0, 4), x.slice(4, 6), x.slice(6, 8)))
41 39  
42 40 val hadoopConf = sc.hadoopConfiguration
43 41 val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
44 42  
45 43 dateSet.map{x =>
46   - val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
  44 + println(x)
  45 + val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/y=${x._2}/mo=${x._3}/dt=${x._1}"
47 46 val out = if(!fs.exists(new Path(addrs))) {
48 47 x
49 48 } else if (fs.listStatus((new Path(addrs))).size < 100) {
50 49  
... ... @@ -52,12 +51,14 @@
52 51 out
53 52 }.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))
54 53  
55   - val expConsume = dateSet.map { x =>
56   - val expConsumeOut = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}").drop("table").drop("dt").
57   - where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
58   - expConsumeOut
59   - }.reduce((x,y) => x.unionAll(y))
  54 + val expConsume = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/y=${dateSet.last._2}/mo=${dateSet.last._3}/dt=$saveDay").
  55 + drop("table").drop("dt").where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
60 56  
  57 + val (ySet, moSet, dtSet) = (dateSet.map(x => x._2).distinct.mkString(","), dateSet.map(x => x._3).distinct.mkString(","),
  58 + dateSet.map(x => x._1).mkString(","))
  59 + val expConsume30 = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/y={$ySet}/mo={$moSet}/dt={$dtSet}").drop("table").drop("dt").
  60 + where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
  61 +
61 62 /// 컨텐츠 카드 수별 카드 그룹 생성 5단위
62 63 val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
63 64 map(x => (x.getAs[Long]("contents_id"), x.getAs[Long]("cardSize"), x.getAs[Long]("cardSize")/5)).toDF("cid1","cardSize","sizeGroup").
64 65  
... ... @@ -120,11 +121,11 @@
120 121 val cidPredConsume = sc.parallelize(predResult).toDF("cid1","predConsume").withColumn("predConsume", column("predConsume")*1000)
121 122  
122 123 /// 노출 위치별 컨텐츠 소비가 이뤄지기 까지의 노출시간 평균
123   - val posExpTime = expConsume.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
  124 + val posExpTime = expConsume30.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
124 125 selectExpr("fromKey as fromKey1", "position as position1", "posExpTime")
125 126  
126 127 /// 위치 별 ctr
127   - val positionCtr = expConsume.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
  128 + val positionCtr = expConsume30.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
128 129 withColumn("rankingCtr", column("consumeCnt")/column("expSize")).selectExpr("fromKey as fromKey1","position as position1","rankingCtr")
129 130  
130 131 /// 컨텐츠, 노출 위치 별 노출 시간 및 소비시간
131 132  
132 133  
133 134  
134 135  
135 136  
136 137  
137 138  
138 139  
... ... @@ -134,31 +135,46 @@
134 135 join(positionCtr, column("fromKey")===positionCtr("fromKey1") && column("position")===positionCtr("position1"), "leftouter").
135 136 drop(positionCtr("fromKey1")).drop(positionCtr("position1")).
136 137 join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).na.fill(0, Seq("consume")).
137   - where("expSize2 > 200 and cardSize is not null")
  138 + where("cardSize is not null")
138 139  
139 140 val gghtmp = cidPositionInfo.join(posExpTime, cidPositionInfo("fromKey")===posExpTime("fromKey1") && cidPositionInfo("position")===posExpTime("position1"), "leftouter").
140   - drop(posExpTime("fromKey1")).drop(posExpTime("position1")).
141   - join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1"))
  141 + drop(posExpTime("fromKey1")).drop(posExpTime("position1"))
142 142  
143 143 /*
144   - val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
145   - withColumn("ggh", column("consumeEff")/column("expEff"))
146   - val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
147   - expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
148   - */
  144 + val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
  145 + withColumn("ggh", column("consumeEff")/column("expEff"))
  146 + val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
  147 + expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
  148 + */
149 149  
150   - val gghBase4 = gghtmp.selectExpr("*", "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")
  150 + val gghBase4 = gghtmp.selectExpr("cid","fromKey","position","expSize2","consume",
  151 + "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")
151 152  
152   - val gghVer3 = gghBase4.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
  153 + val gghBase4Past = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/cumulate/${dateSet(dateSet.size - 2)._1}")
  154 +
  155 + val gghCum = gghBase4.unionAll(gghBase4Past).groupBy("cid","fromKey","position").agg(
  156 + expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"),
  157 + expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff) as expEff")
  158 + )
  159 +
  160 + gghCum.coalesce(25).write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/cumulate/$saveDay")
  161 +
  162 + val contStat = Util.tables("MG_CONTENTS").where("status = 'ACTV'").selectExpr("contents_id as cid","status","udate")
  163 + val gghVer2 = gghCum.join(contStat, gghCum("cid")===contStat("cid")).drop(contStat("cid")).
  164 + groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
153 165 expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff)/count(*) as expEff")).
154   - join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
  166 + join(cidPredConsume, gghCum("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
155 167 selectExpr("cid","totalExpSize2","consume/(expectConCnt * predConsume) as consumeEff", "expEff").withColumn("ggh", column("consumeEff")/column("expEff")).
156   - where("totalExpSize2 > 1000")
  168 + where("totalExpSize2 > 10000")
157 169  
158   - val gghMean = gghVer3.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
159   - val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
  170 + val gghMean = gghVer2.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
  171 + val gghStd = gghVer2.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
160 172  
161   - val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
  173 + val gghScaled = gghVer2.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
  174 +
  175 + /*gghScaled.map{x =>
  176 + s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
  177 + }.coalesce(25).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/table=expConTime/reusult/$saveDay")*/
162 178  
163 179 gghScaled.map{x =>
164 180 s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
app/com/piki_ds/ver2ggh/simContentsModel.scala View file @ 82c6cdc
... ... @@ -24,7 +24,6 @@
24 24 java.net.URI("hdfs://pikinn"), hadoopConf)
25 25 import sqlContext.implicits._
26 26  
27   -
28 27 def main(args: Array[String]): Unit = {
29 28  
30 29 /// 컨텐츠 카드 수 정보 불러오기