Blame view

app/com/piki_ds/ver2ggh/gghScore.scala 12 KB
856dd1fc5   evan   ggh ver2 first co...
1
2
3
  /**
    * Created by Evan on 2016. 5. 18..
    */
856dd1fc5   evan   ggh ver2 first co...
4

335d58b53   evan   ...
5
  package com.piki_ds.ver2ggh
856dd1fc5   evan   ggh ver2 first co...
6

d74fa3aab   Joanne   add hbase inserte...
7
  import com.piki_ds.utils.hbase.HbaseInserter
856dd1fc5   evan   ggh ver2 first co...
8
9
  import com.piki_ds.ver2ggh
  import com.piki_ds.data.contents.report.Util
335d58b53   evan   ...
10
  import java.util.Date
856dd1fc5   evan   ggh ver2 first co...
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  import org.apache.hadoop.fs.Path
  import org.apache.spark.SparkContext
  import org.apache.spark.mllib.linalg.Vectors
  import org.apache.spark.mllib.regression.LabeledPoint
  import org.apache.spark.mllib.tree.model.RandomForestModel
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  import org.apache.spark.sql.functions._
  
  
  object gghScore {
  
    val sc = SparkContext.getOrCreate()
    val sqlContext = SQLContext.getOrCreate(sc)
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new
        java.net.URI("hdfs://pikinn"), hadoopConf)
    import sqlContext.implicits._
  
    def main(args: Array[String]): Unit = {
  
      val format = new java.text.SimpleDateFormat("yyyyMMdd")
      val currentTime = new Date()
  
      val day_delm = 24 * 1000 * 60 * 60L
60c7dbd34   evan   save arg change
36
      val saveDay = if (args.length >= 1) args(0) else format.format(currentTime.getTime - day_delm)
856dd1fc5   evan   ggh ver2 first co...
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
      val ind = -6 to 0
      val dateSet = ind.map(x => {
        format.format(format.parse(saveDay).getTime + day_delm * x)
      })
  
      val hadoopConf = sc.hadoopConfiguration
      val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  
      dateSet.map{x =>
        val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
        val out = if(!fs.exists((new Path(addrs)))) {x} else "null"
        out
      }.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))
  
      val expConsume = dateSet.map { x =>
        val expConsumeOut = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}").drop("table").drop("dt").
          where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
        expConsumeOut
      }.reduce((x,y) => x.unionAll(y))
  
      /// 컨텐츠 카드 수별 카드 그룹 생성 5단위
      val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
        map(x => (x.getAs[Long]("contents_id"), x.getAs[Long]("cardSize"), x.getAs[Long]("cardSize")/5)).toDF("cid1","cardSize","sizeGroup").
        selectExpr("*","if(sizeGroup>19, 20, sizeGroup) as cardGroup").drop("sizeGroup")
      /// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
e8e608219   evan   ...
62
63
64
      //val kpi3 = Util.readDashboardTable("kpi3","*","{*}","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
      //  join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null").cache
      //val cardGroupConsume = kpi3.where("consumeTime > 10.0").groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
856dd1fc5   evan   ggh ver2 first co...
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
      // kpi3.map(x => s"${x(0)}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(5)}|${x(6)}").coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/kpi3")
      // cardGroupConsume.stat.corr("cardGroup1","cardGroupConTime") cardGroup 과 소비시간평균과의 상관관계 0.89
  
      ///
      val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
        "if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")
  
      val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
        expr("cid"),
        expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
        expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
        expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
        expr("case when card_type = 'SNS' then count else 0 end as SNS"),
        expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
        expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
        expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
        expr("case when card_type = 'INTR' then count else 0 end as INTR"),
        expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
      ).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
        expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
        expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))
75bb1e19b   evan   change save direc...
86
87
88
89
90
91
92
93
94
95
96
97
98
  
      val contentsType = Util.tables("MG_CONTENTS").where("status='ACTV'").select(
        expr("contents_id as cid"),
        expr("case when contents_type = 'ALBUM' then 1 else 0 end as ALBUM"),
        expr("case when contents_type = 'ALBUM.A' then 1 else 0 end as ALBUM_A"),
        expr("case when contents_type = 'CHST' then 1 else 0 end as CHST"),
        expr("case when contents_type = 'CHST.A' then 1 else 0 end as CHST_A"),
        expr("case when contents_type = 'TOON' then 1 else 0 end as TOON"),
        expr("case when contents_type = 'LIVE' then 1 else 0 end as LIVE")
      )
  
      val cidCardTypeSize = cidCardType.join(cardSize, cidCardType("cid")===cardSize("cid1"),"leftouter").drop(cardSize("cid1")).drop(cardSize("cardGroup")).
        join(contentsType, cidCardType("cid")===contentsType("cid")).drop(contentsType("cid"))
856dd1fc5   evan   ggh ver2 first co...
99
100
101
      val predData = cidCardTypeSize.map { line =>
        LabeledPoint(line.getAs[Long]("cid"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
          line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble,
75bb1e19b   evan   change save direc...
102
103
104
          line.getAs[Long]("TEXT").toDouble, line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble,
          line.getAs[Int]("ALBUM").toDouble, line.getAs[Int]("ALBUM_A").toDouble, line.getAs[Int]("CHST").toDouble, line.getAs[Int]("CHST_A").toDouble,
          line.getAs[Int]("TOON").toDouble, line.getAs[Int]("LIVE").toDouble
856dd1fc5   evan   ggh ver2 first co...
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
        ))
      }
  
      val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
      val predResult = predData.collect.map { point =>
        val prediction = RFmodel.predict(point.features)
        (point.label.toLong, prediction)
      }
  
      val cidPredConsume = sc.parallelize(predResult).toDF("cid1","predConsume").withColumn("predConsume", column("predConsume")*1000)
  
      /// 노출 위치별 컨텐츠 소비가 이뤄지기 까지의 노출시간 평균
      val posExpTime = expConsume.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
        selectExpr("fromKey as fromKey1", "position as position1", "posExpTime")
  
      /// 위치 별 ctr
      val positionCtr = expConsume.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
        withColumn("rankingCtr", column("consumeCnt")/column("expSize")).selectExpr("fromKey as fromKey1","position as position1","rankingCtr")
  
      /// 컨텐츠, 노출 위치 별 노출 시간 및 소비시간
      val cidPositionInfo = expConsume.groupBy("cid","fromKey","position").
        agg(//expr("sum(expTime1) as expTime1"), expr("count(expSize1) as expSize1"),
          expr("sum(expTime2) as expTime2"), expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
        join(positionCtr, column("fromKey")===positionCtr("fromKey1") && column("position")===positionCtr("position1"), "leftouter").
        drop(positionCtr("fromKey1")).drop(positionCtr("position1")).
        join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).na.fill(0, Seq("consume")).
        where("expSize2 > 200 and cardSize is not null")
  
      val gghtmp = cidPositionInfo.join(posExpTime, cidPositionInfo("fromKey")===posExpTime("fromKey1") && cidPositionInfo("position")===posExpTime("position1"), "leftouter").
        drop(posExpTime("fromKey1")).drop(posExpTime("position1")).
        join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1"))
  
      /*
      val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
        withColumn("ggh", column("consumeEff")/column("expEff"))
      val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
        expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
      */
  
      val gghBase4 = gghtmp.selectExpr("*", "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")
  
      val gghVer3 = gghBase4.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
        expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff)/count(*) as expEff")).
        join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
        selectExpr("cid","totalExpSize2","consume/(expectConCnt * predConsume) as consumeEff", "expEff").withColumn("ggh", column("consumeEff")/column("expEff")).
        where("totalExpSize2 > 1000")
  
      val gghMean = gghVer3.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
      val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
  
      val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
783fe3a33   Joanne   temp insert
156
  /*
75bb1e19b   evan   change save direc...
157
      gghScaled.map{x =>
335d58b53   evan   ...
158
        s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
75bb1e19b   evan   change save direc...
159
      }.saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
783fe3a33   Joanne   temp insert
160
  */
d74fa3aab   Joanne   add hbase inserte...
161
162
163
164
      val finalScore = gghScaled.map(x=>(x(0),x.getAs[Double]("scaledGgh").toInt))
      val insertArray = finalScore.map(x=>(x._1.toString,  x._2.toString)).collect()
      val test = new HbaseInserter("cid_ggh2")
      test.insert(insertArray)
75bb1e19b   evan   change save direc...
165
      /*//////// CTR and CTR Time 계산
856dd1fc5   evan   ggh ver2 first co...
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
      val ctr = expConsume.groupBy("cid").
        agg(expr("sum(expTime1) as expTime1"), expr("sum(expSize1) as expSize1"), expr("sum(expTime2) as expTime2"),
          expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
        join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).
        withColumn("ctr",column("conCount")/column("expSize2")).
        withColumn("ctrTime",column("consume")/column("expTime2")).
        withColumnRenamed("cid","cid1")
  
      //////////////////////////
      /// contentsInfo 불러오기
      val ContentsInfo = Util.tables("MG_CONTENTS").selectExpr("contents_id", "udate","title" ,"uid")
      val userInfo = Util.tables("USER").selectExpr("uid","name")
      val ciduser = ContentsInfo.join(userInfo, ContentsInfo("uid")===userInfo("uid"), "leftouter").drop(userInfo("uid"))
  
      val result2 = gghScaled.join(ciduser, gghScaled("cid")===ciduser("contents_id"), "leftouter").drop(ciduser("contents_id"))//.where("udate > '2016-05-12'")
      ////////// ctr and ctrTime 지표와 비교
      val gghResult = result2.join(ctr.selectExpr("cid1","ctr as noRankCtr","ctrTime as noRankCtrTime","expTime2 as expTime","expSize2 as expSize","consume","conCount"),
        result2("cid")===ctr("cid1"), "leftouter").drop(ctr("cid1")).drop("uid").drop("totalExpSize2")
75bb1e19b   evan   change save direc...
184
      //gghResult.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/preprocess/ggh/$saveDay")
d8c6c1a5d   evan   arg change and ba...
185

75bb1e19b   evan   change save direc...
186
      gghResult.map{x=>
856dd1fc5   evan   ggh ver2 first co...
187
188
189
190
191
        val title = x.getAs[String]("title").replaceAll("
  ", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
        val editor = x.getAs[String]("name").replaceAll("
  ", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
        s"${x(0)}|${title}|${x(5)}|${editor}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}"}.
75bb1e19b   evan   change save direc...
192
193
        coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
      */
856dd1fc5   evan   ggh ver2 first co...
194
195
196
    }
  
  }