gghScore.scala 12 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
/**
* Created by Evan on 2016. 5. 18..
*/

package com.piki_ds.ver2ggh

import com.piki_ds.utils.hbase.HbaseInserter
import com.piki_ds.ver2ggh
import com.piki_ds.data.contents.report.Util
import java.util.Date
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.functions._


object gghScore {

val sc = SparkContext.getOrCreate()
val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://pikinn"), hadoopConf)
import sqlContext.implicits._

def main(args: Array[String]): Unit = {

val format = new java.text.SimpleDateFormat("yyyyMMdd")
val currentTime = new Date()

val day_delm = 24 * 1000 * 60 * 60L
val saveDay = if (args.length >= 1) args(0) else format.format(currentTime.getTime - day_delm)
val ind = -6 to 0
val dateSet = ind.map(x => {
format.format(format.parse(saveDay).getTime + day_delm * x)
})

val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

dateSet.map{x =>
val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
val out = if(!fs.exists((new Path(addrs)))) {x} else "null"
out
}.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))

val expConsume = dateSet.map { x =>
val expConsumeOut = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}").drop("table").drop("dt").
where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
expConsumeOut
}.reduce((x,y) => x.unionAll(y))

/// 컨텐츠 카드 수별 카드 그룹 생성 5단위
val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
map(x => (x.getAs[Long]("contents_id"), x.getAs[Long]("cardSize"), x.getAs[Long]("cardSize")/5)).toDF("cid1","cardSize","sizeGroup").
selectExpr("*","if(sizeGroup>19, 20, sizeGroup) as cardGroup").drop("sizeGroup")
/// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
//val kpi3 = Util.readDashboardTable("kpi3","*","{*}","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
// join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null").cache
//val cardGroupConsume = kpi3.where("consumeTime > 10.0").groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
// kpi3.map(x => s"${x(0)}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(5)}|${x(6)}").coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/kpi3")
// cardGroupConsume.stat.corr("cardGroup1","cardGroupConTime") cardGroup 과 소비시간평균과의 상관관계 0.89

///
val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
"if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")

val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
expr("cid"),
expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
expr("case when card_type = 'SNS' then count else 0 end as SNS"),
expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
expr("case when card_type = 'INTR' then count else 0 end as INTR"),
expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))


val contentsType = Util.tables("MG_CONTENTS").where("status='ACTV'").select(
expr("contents_id as cid"),
expr("case when contents_type = 'ALBUM' then 1 else 0 end as ALBUM"),
expr("case when contents_type = 'ALBUM.A' then 1 else 0 end as ALBUM_A"),
expr("case when contents_type = 'CHST' then 1 else 0 end as CHST"),
expr("case when contents_type = 'CHST.A' then 1 else 0 end as CHST_A"),
expr("case when contents_type = 'TOON' then 1 else 0 end as TOON"),
expr("case when contents_type = 'LIVE' then 1 else 0 end as LIVE")
)

val cidCardTypeSize = cidCardType.join(cardSize, cidCardType("cid")===cardSize("cid1"),"leftouter").drop(cardSize("cid1")).drop(cardSize("cardGroup")).
join(contentsType, cidCardType("cid")===contentsType("cid")).drop(contentsType("cid"))

val predData = cidCardTypeSize.map { line =>
LabeledPoint(line.getAs[Long]("cid"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble,
line.getAs[Long]("TEXT").toDouble, line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble,
line.getAs[Int]("ALBUM").toDouble, line.getAs[Int]("ALBUM_A").toDouble, line.getAs[Int]("CHST").toDouble, line.getAs[Int]("CHST_A").toDouble,
line.getAs[Int]("TOON").toDouble, line.getAs[Int]("LIVE").toDouble
))
}

val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
val predResult = predData.collect.map { point =>
val prediction = RFmodel.predict(point.features)
(point.label.toLong, prediction)
}

val cidPredConsume = sc.parallelize(predResult).toDF("cid1","predConsume").withColumn("predConsume", column("predConsume")*1000)

/// 노출 위치별 컨텐츠 소비가 이뤄지기 까지의 노출시간 평균
val posExpTime = expConsume.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
selectExpr("fromKey as fromKey1", "position as position1", "posExpTime")

/// 위치 별 ctr
val positionCtr = expConsume.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
withColumn("rankingCtr", column("consumeCnt")/column("expSize")).selectExpr("fromKey as fromKey1","position as position1","rankingCtr")

/// 컨텐츠, 노출 위치 별 노출 시간 및 소비시간
val cidPositionInfo = expConsume.groupBy("cid","fromKey","position").
agg(//expr("sum(expTime1) as expTime1"), expr("count(expSize1) as expSize1"),
expr("sum(expTime2) as expTime2"), expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
join(positionCtr, column("fromKey")===positionCtr("fromKey1") && column("position")===positionCtr("position1"), "leftouter").
drop(positionCtr("fromKey1")).drop(positionCtr("position1")).
join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).na.fill(0, Seq("consume")).
where("expSize2 > 200 and cardSize is not null")

val gghtmp = cidPositionInfo.join(posExpTime, cidPositionInfo("fromKey")===posExpTime("fromKey1") && cidPositionInfo("position")===posExpTime("position1"), "leftouter").
drop(posExpTime("fromKey1")).drop(posExpTime("position1")).
join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1"))

/*
val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
withColumn("ggh", column("consumeEff")/column("expEff"))
val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
*/

val gghBase4 = gghtmp.selectExpr("*", "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")

val gghVer3 = gghBase4.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff)/count(*) as expEff")).
join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
selectExpr("cid","totalExpSize2","consume/(expectConCnt * predConsume) as consumeEff", "expEff").withColumn("ggh", column("consumeEff")/column("expEff")).
where("totalExpSize2 > 1000")

val gghMean = gghVer3.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble

val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
/*
gghScaled.map{x =>
s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
}.saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
*/
val finalScore = gghScaled.map(x=>(x(0),x.getAs[Double]("scaledGgh").toInt))
val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect()
val test = new HbaseInserter("cid_ggh2")
test.insert(insertArray)
/*//////// CTR and CTR Time 계산
val ctr = expConsume.groupBy("cid").
agg(expr("sum(expTime1) as expTime1"), expr("sum(expSize1) as expSize1"), expr("sum(expTime2) as expTime2"),
expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).
withColumn("ctr",column("conCount")/column("expSize2")).
withColumn("ctrTime",column("consume")/column("expTime2")).
withColumnRenamed("cid","cid1")

//////////////////////////
/// contentsInfo 불러오기
val ContentsInfo = Util.tables("MG_CONTENTS").selectExpr("contents_id", "udate","title" ,"uid")
val userInfo = Util.tables("USER").selectExpr("uid","name")
val ciduser = ContentsInfo.join(userInfo, ContentsInfo("uid")===userInfo("uid"), "leftouter").drop(userInfo("uid"))

val result2 = gghScaled.join(ciduser, gghScaled("cid")===ciduser("contents_id"), "leftouter").drop(ciduser("contents_id"))//.where("udate > '2016-05-12'")
////////// ctr and ctrTime 지표와 비교
val gghResult = result2.join(ctr.selectExpr("cid1","ctr as noRankCtr","ctrTime as noRankCtrTime","expTime2 as expTime","expSize2 as expSize","consume","conCount"),
result2("cid")===ctr("cid1"), "leftouter").drop(ctr("cid1")).drop("uid").drop("totalExpSize2")

//gghResult.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/preprocess/ggh/$saveDay")

gghResult.map{x=>
val title = x.getAs[String]("title").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
val editor = x.getAs[String]("name").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
s"${x(0)}|${title}|${x(5)}|${editor}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}"}.
coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
*/

}

}