Commit 856dd1fc5bed5fe635baf8ab577cc982ab33e19c

Authored by evan ago
1 parent 2a033a39da
Exists in master

ggh ver2 first commit

Showing 5 changed files with 659 additions and 6 deletions Side-by-side Diff

app/com/piki_ds/ver2ggh/expConTime.scala View file @ 856dd1f
  1 +/**
  2 + * Created by Evan on 2016. 5. 10..
  3 + */
  4 +package com.piki_ds.ver2ggh
  5 +import org.apache.spark.SparkContext
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  8 +import org.apache.spark.sql.functions._
  9 +import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType}
  10 +
  11 +import scala.collection.immutable.Iterable
  12 +
  13 +object expConTime {
  14 +
  15 + val sc = SparkContext.getOrCreate()
  16 + val sqlContext = SQLContext.getOrCreate(sc)
  17 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  18 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  19 + java.net.URI("hdfs://pikinn"), hadoopConf)
  20 + import sqlContext.implicits._
  21 +
  22 + /**
  23 + * uuid 별 노출, 오픈, 소비 정보를 불러옴
  24 + *
  25 + * @param date
  26 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume)
  27 + */
  28 + def getLogParse(date: String): DataFrame = {
  29 + val current = date
  30 + val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8))
  31 + val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz")
  32 +
  33 + val format = new java.text.SimpleDateFormat("yyyyMMdd")
  34 + val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60))
  35 + val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8)
  36 + val currentday = year + "-" + month + "-" + day
  37 +
  38 + val action = log.map(x => {
  39 + try {
  40 + val s = x.split("\\|", -1)
  41 + var mapping = scala.collection.mutable.Map.empty[String, String]
  42 + if (s.size > 4 && (
  43 + s(2).forall(Character.isDigit(_)) &&
  44 + s(3).forall(Character.isDigit(_)) &&
  45 + s(4).forall(Character.isDigit(_)))) {
  46 + val s2 = if (s(2).size == 10) s(2) + "000" else s(2)
  47 + mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4))
  48 + if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
  49 + && s(9).forall(Character.isDigit(_)) && s(9).size < 4) {
  50 + def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
  51 + mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell())
  52 + Some(mapping)
  53 + } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
  54 + && s(9).forall(Character.isLetter(_)) && s(8).size < 4) {
  55 + def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
  56 + mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell())
  57 + Some(mapping)
  58 + } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) {
  59 + mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7))
  60 + Some(mapping)
  61 + } else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
  62 + def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
  63 + mapping = mapping ++ Map("consume" -> consumeTime(30).toString)
  64 + Some(mapping)
  65 + } else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
  66 + def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
  67 + mapping = mapping ++ Map("consume" -> consumeTime2(30).toString)
  68 + Some(mapping)
  69 + } else {
  70 + None
  71 + }
  72 + }
  73 + else {
  74 + None
  75 + }
  76 + } catch {
  77 + case e: Exception =>
  78 + None
  79 + }
  80 + }).flatMap(x => x)
  81 +
  82 + val df = action.filter(x => x.getOrElse("ERROR", null) == null).map { x =>
  83 + (x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong,
  84 + x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt,x.getOrElse("consume", "0").toInt)}
  85 +
  86 + // 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택
  87 + val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0").
  88 + where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")).
  89 + where(s"timeStamp >= '$currentday'")
  90 +
  91 + df1
  92 + }
  93 +
  94 + /**
  95 + * uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑
  96 + * uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에
  97 + * 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑
  98 + *
  99 + * (오픈 , 소비 정보 활용)
  100 + *
  101 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
  102 + */
  103 + def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = {
  104 + val out = base.map { y =>
  105 + val info = y._2.map { x =>
  106 + val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
  107 + (event, time, key, position, cid)
  108 + }.filter(x => x._1 == "OPEN")
  109 +
  110 + var idx = -1
  111 + val result1 = y._2.map { x =>
  112 + def idxExc(idx: Int) = if (idx == -1) 0 else idx
  113 + if (x._1 == "OPEN") {
  114 + idx += 1
  115 + Some(x.copy(_9 = info(idx)._2, _8 = 0))
  116 + } else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) {
  117 + Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4))
  118 + } else None
  119 + }
  120 + val result2 = result1.filter(x => x != None).map(x => x.get)
  121 + (y._1, result2)
  122 + }.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime")
  123 +
  124 + val StringToInt = udf[Int, String]( _.toInt)
  125 +
  126 + val out1 = out.withColumn("position", StringToInt(out("position")))
  127 +
  128 + out1
  129 + }
  130 +
  131 + /**
  132 + * uuid 별 컨텐츠 노출에 대한 노출 시간 계산
  133 + * uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가?
  134 + * uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산
  135 + *
  136 + * (노출 , 오픈 정보 활용)
  137 + *
  138 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
  139 + */
  140 + def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = {
  141 + val expTime = base.flatMap { x =>
  142 + try {
  143 + val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y =>
  144 + val sortY = y._2.sortWith(_._2 < _._2)
  145 +
  146 + val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN")
  147 + val out = if (lastIdx == -1) {
  148 + y._2
  149 + } else {
  150 + y._2.take(lastIdx + 1)
  151 + }
  152 +
  153 + val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
  154 + val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
  155 + val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
  156 + val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
  157 +
  158 + (y._1._1, y._1._2, y._1._3, result1, size1, result2, size2)
  159 + }.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7))
  160 + ar
  161 + } catch {
  162 + case e:Exception => None
  163 + }
  164 + }.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2").
  165 + where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0")
  166 +
  167 + expTime
  168 + }
  169 +
  170 + def main(args: Array[String]): Unit = {
  171 +
  172 + // 로그 파스
  173 + val saveDay = if(args.length == 1) args(0) else "20160518"
  174 + val df1 = getLogParse(saveDay)
  175 +
  176 + // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
  177 + val base1 = df1.map { x =>
  178 + (x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"),
  179 + x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L)
  180 + }.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2)))
  181 +
  182 + val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))).
  183 + filter { x => x._2.map(y => y._1).contains("OPEN") }
  184 +
  185 + val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN"))))
  186 +
  187 + // (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑
  188 + val openConsumeIdxed = consumeIndexing(base2)
  189 +
  190 + //val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'").
  191 + val openConsume = openConsumeIdxed.where("event = 'CONSUME'").
  192 + groupBy("uuid", "cid", "fromKey", "position").
  193 + agg(expr("sum(consume) as consume"))
  194 +
  195 + // (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산
  196 + val expTime = exposureTime(base3)
  197 +
  198 + //val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1")
  199 + val exposureInfo = expTime
  200 +
  201 + val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") &&
  202 + exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
  203 + drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
  204 +
  205 + expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
  206 +
  207 + }
  208 +}
  209 +
  210 + ///////////////////////ㅣ
  211 +
  212 + /* 로그 변환하여 종합
  213 + val tst3 = base3.flatMap { x =>
  214 + val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x =>
  215 + val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN")
  216 + val out = if (lastIdx == -1) {
  217 + x._2
  218 + } else {
  219 + x._2.take(lastIdx + 1)
  220 + }
  221 + (x._1, out)
  222 + }.flatMap(x => x._2).toArray.sortWith(_._2 < _._2)
  223 + ar2
  224 + }.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime")
  225 + val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'").
  226 + withColumn("position", StringToInt(tst3("position"))).
  227 + selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking").
  228 + groupBy("uuid","cid","ranking").
  229 + agg(expr("sum(dwell) as dwell"))
  230 +
  231 + val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
  232 + ("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
  233 + ("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) )
  234 +
  235 + sqlContext.createDataFrame(ar).show
  236 +
  237 + val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x =>
  238 + val lastIdx = x._2.lastIndexWhere(x => x._1 == "2")
  239 + val out = if( lastIdx == -1) {
  240 + x._2
  241 + } else x._2.take(lastIdx+1)
  242 + (x._1, out)
  243 + }.flatMap(x => x._2).toArray
  244 +
  245 + sqlContext.createDataFrame(gg).show
  246 +
  247 + ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x)
  248 +
  249 + val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10))
  250 +
  251 + val tt1 = tt.take(lastIdx+1)
  252 +
  253 + val tt = consumeIndexing(df2)
  254 + val dff = base2.take
  255 +
  256 + val tmp2 = df2._2.map { x =>
  257 + val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
  258 + (event, time, key, position, cid)
  259 + }.filter(x => x._1 == "OPEN")
  260 +
  261 + var idx = -1
  262 + val gg = df2._2.map { x =>
  263 + println(x)
  264 + println(idx)
  265 + def idxExc(idx: Int) = if (idx == -1) 0 else idx
  266 + if (x._1 == "OPEN") {
  267 + idx += 1
  268 + Some(x.copy(_8 = tmp2(idx)._2))
  269 + } else if (x._1 == "CONSUME" && x._4 == tmp2(idxExc(idx))._5) {
  270 + Some(x.copy(_8 = tmp2(idxExc(idx))._2, _5 = tmp2(idxExc(idx))._3, _6 = tmp2(idxExc(idx))._4))
  271 + } else None
  272 + }
  273 +
  274 + val a = gg.filter(x => x != None).map(x => x.get)
  275 + sqlContext.createDataFrame(a).show(100)
  276 +
  277 + val ar = Array( ("1",20,"h",0), ("2",21,null,0), ("2",22,null,0), ("2",23,null,0),
  278 + ("1",30,"h",0), ("2",31,null,0), ("2",32,null,0), ("2",33,null,0),
  279 + ("1",40,"h",0), ("2",41,null,0), ("2",42,null,0), ("2",43,null,0) )
  280 +
  281 + //val tst = sc.parallelize(ar)
  282 +
  283 +
  284 + tst.map {x=>
  285 + val (opentime, fromkey, position) = if(x._1 == "1") {
  286 + val opentime = x._2
  287 + val fromkey = x._3
  288 + val position = x._4
  289 + (opentime, fromkey, position)
  290 + }
  291 +
  292 + if(x._1 == "1") {
  293 + x.copy(_4 = opentime)
  294 + } else {
  295 + x.copy(_2 = fromkey, _3 = position, _4 = opentime)
  296 + }
  297 + }
  298 +
  299 + val t1 = ar.map { x =>
  300 + val (event,time,key) = if (x._1 == "1") (x._1, x._2, x._3) else ("2", 0, "0")
  301 + (event, time, key)
  302 + }.filter(x => x._1 == "1")
  303 +
  304 + var idx = -1
  305 + ar.map{ x=>
  306 + println(x)
  307 + println(idx)
  308 + if (x._1 == "1") {
  309 + idx += 1
  310 + x.copy(_4 = t1(idx)._2)
  311 + } else {
  312 + x.copy(_4 = t1(idx)._2, _3=t1(idx)._3)
  313 + }
  314 + }
  315 + */
app/com/piki_ds/ver2ggh/gghScore.scala View file @ 856dd1f
  1 +/**
  2 + * Created by Evan on 2016. 5. 18..
  3 + */
  4 +package com.piki_ds.ver2ggh
  5 +
  6 +import java.util.Date
  7 +
  8 +import com.piki_ds.ver2ggh
  9 +import com.piki_ds.data.contents.report.Util
  10 +import org.apache.hadoop.fs.Path
  11 +import org.apache.spark.SparkContext
  12 +import org.apache.spark.mllib.linalg.Vectors
  13 +import org.apache.spark.mllib.regression.LabeledPoint
  14 +import org.apache.spark.mllib.tree.model.RandomForestModel
  15 +import org.apache.spark.rdd.RDD
  16 +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  17 +import org.apache.spark.sql.functions._
  18 +
  19 +
  20 +object gghScore {
  21 +
  22 + val sc = SparkContext.getOrCreate()
  23 + val sqlContext = SQLContext.getOrCreate(sc)
  24 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  25 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  26 + java.net.URI("hdfs://pikinn"), hadoopConf)
  27 + import sqlContext.implicits._
  28 +
  29 + def main(args: Array[String]): Unit = {
  30 +
  31 + val format = new java.text.SimpleDateFormat("yyyyMMdd")
  32 + val currentTime = new Date()
  33 +
  34 + val day_delm = 24 * 1000 * 60 * 60L
  35 + val saveDay = if (args.length >= 1) args(0) else ""
  36 + val ind = -6 to 0
  37 + val dateSet = ind.map(x => {
  38 + format.format(format.parse(saveDay).getTime + day_delm * x)
  39 + })
  40 +
  41 + val hadoopConf = sc.hadoopConfiguration
  42 + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  43 +
  44 + dateSet.map{x =>
  45 + val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
  46 + val out = if(!fs.exists((new Path(addrs)))) {x} else "null"
  47 + out
  48 + }.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))
  49 +
  50 + val expConsume = dateSet.map { x =>
  51 + val expConsumeOut = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}").drop("table").drop("dt").
  52 + where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
  53 + expConsumeOut
  54 + }.reduce((x,y) => x.unionAll(y))
  55 +
  56 + /// 컨텐츠 카드 수별 카드 그룹 생성 5단위
  57 + val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
  58 + map(x => (x.getAs[Long]("contents_id"), x.getAs[Long]("cardSize"), x.getAs[Long]("cardSize")/5)).toDF("cid1","cardSize","sizeGroup").
  59 + selectExpr("*","if(sizeGroup>19, 20, sizeGroup) as cardGroup").drop("sizeGroup")
  60 + /// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
  61 + val kpi3 = Util.readDashboardTable("kpi3","*","{*}","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
  62 + join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null").cache
  63 + val cardGroupConsume = kpi3.where("consumeTime > 10.0").groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
  64 + // kpi3.map(x => s"${x(0)}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(5)}|${x(6)}").coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/kpi3")
  65 + // cardGroupConsume.stat.corr("cardGroup1","cardGroupConTime") cardGroup 과 소비시간평균과의 상관관계 0.89
  66 +
  67 + ///
  68 + val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
  69 + "if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")
  70 +
  71 + val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
  72 + expr("cid"),
  73 + expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
  74 + expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
  75 + expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
  76 + expr("case when card_type = 'SNS' then count else 0 end as SNS"),
  77 + expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
  78 + expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
  79 + expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
  80 + expr("case when card_type = 'INTR' then count else 0 end as INTR"),
  81 + expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
  82 + ).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
  83 + expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
  84 + expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))
  85 +
  86 + val cidCardTypeSize = cidCardType.join(cardSize, cidCardType("cid")===cardSize("cid1"),"leftouter").drop(cardSize("cid1")).drop(cardSize("cardGroup"))
  87 + val predData = cidCardTypeSize.map { line =>
  88 + LabeledPoint(line.getAs[Long]("cid"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
  89 + line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble,
  90 + line.getAs[Long]("TEXT").toDouble, line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble
  91 + ))
  92 + }
  93 +
  94 + val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  95 + val predResult = predData.collect.map { point =>
  96 + val prediction = RFmodel.predict(point.features)
  97 + (point.label.toLong, prediction)
  98 + }
  99 +
  100 + val cidPredConsume = sc.parallelize(predResult).toDF("cid1","predConsume").withColumn("predConsume", column("predConsume")*1000)
  101 +
  102 + /// 노출 위치별 컨텐츠 소비가 이뤄지기 까지의 노출시간 평균
  103 + val posExpTime = expConsume.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
  104 + selectExpr("fromKey as fromKey1", "position as position1", "posExpTime")
  105 +
  106 + /// 위치 별 ctr
  107 + val positionCtr = expConsume.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
  108 + withColumn("rankingCtr", column("consumeCnt")/column("expSize")).selectExpr("fromKey as fromKey1","position as position1","rankingCtr")
  109 +
  110 + /// 컨텐츠, 노출 위치 별 노출 시간 및 소비시간
  111 + val cidPositionInfo = expConsume.groupBy("cid","fromKey","position").
  112 + agg(//expr("sum(expTime1) as expTime1"), expr("count(expSize1) as expSize1"),
  113 + expr("sum(expTime2) as expTime2"), expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
  114 + join(positionCtr, column("fromKey")===positionCtr("fromKey1") && column("position")===positionCtr("position1"), "leftouter").
  115 + drop(positionCtr("fromKey1")).drop(positionCtr("position1")).
  116 + join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).na.fill(0, Seq("consume")).
  117 + where("expSize2 > 200 and cardSize is not null")
  118 +
  119 + val gghtmp = cidPositionInfo.join(posExpTime, cidPositionInfo("fromKey")===posExpTime("fromKey1") && cidPositionInfo("position")===posExpTime("position1"), "leftouter").
  120 + drop(posExpTime("fromKey1")).drop(posExpTime("position1")).
  121 + join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1"))
  122 +
  123 + /*
  124 + val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
  125 + withColumn("ggh", column("consumeEff")/column("expEff"))
  126 + val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
  127 + expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
  128 + */
  129 +
  130 + val gghBase4 = gghtmp.selectExpr("*", "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")
  131 +
  132 + val gghVer3 = gghBase4.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
  133 + expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff)/count(*) as expEff")).
  134 + join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
  135 + selectExpr("cid","totalExpSize2","consume/(expectConCnt * predConsume) as consumeEff", "expEff").withColumn("ggh", column("consumeEff")/column("expEff")).
  136 + where("totalExpSize2 > 1000")
  137 +
  138 + val gghMean = gghVer3.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
  139 + val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
  140 +
  141 + val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
  142 +
  143 + //////// CTR and CTR Time 계산
  144 + val ctr = expConsume.groupBy("cid").
  145 + agg(expr("sum(expTime1) as expTime1"), expr("sum(expSize1) as expSize1"), expr("sum(expTime2) as expTime2"),
  146 + expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
  147 + join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).
  148 + withColumn("ctr",column("conCount")/column("expSize2")).
  149 + withColumn("ctrTime",column("consume")/column("expTime2")).
  150 + withColumnRenamed("cid","cid1")
  151 +
  152 + //////////////////////////
  153 + /// contentsInfo 불러오기
  154 + val ContentsInfo = Util.tables("MG_CONTENTS").selectExpr("contents_id", "udate","title" ,"uid")
  155 + val userInfo = Util.tables("USER").selectExpr("uid","name")
  156 + val ciduser = ContentsInfo.join(userInfo, ContentsInfo("uid")===userInfo("uid"), "leftouter").drop(userInfo("uid"))
  157 +
  158 + val result2 = gghScaled.join(ciduser, gghScaled("cid")===ciduser("contents_id"), "leftouter").drop(ciduser("contents_id"))//.where("udate > '2016-05-12'")
  159 + ////////// ctr and ctrTime 지표와 비교
  160 + val gghResult = result2.join(ctr.selectExpr("cid1","ctr as noRankCtr","ctrTime as noRankCtrTime","expTime2 as expTime","expSize2 as expSize","consume","conCount"),
  161 + result2("cid")===ctr("cid1"), "leftouter").drop(ctr("cid1")).drop("uid").drop("totalExpSize2")
  162 +
  163 + gghResult.map{x=>
  164 + val title = x.getAs[String]("title").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
  165 + val editor = x.getAs[String]("name").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
  166 + s"${x(0)}|${title}|${x(5)}|${editor}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}"}.
  167 + coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/table=ggh/dt=${saveDay}")
  168 + }
  169 +
  170 +}
app/com/piki_ds/ver2ggh/simContentsModel.scala View file @ 856dd1f
  1 +/**
  2 + * Created by Evan on 2016. 6. 2..
  3 + */
  4 +package com.piki_ds.ver2ggh
  5 +
  6 +import com.piki_ds.data.contents.report.Util
  7 +import org.apache.spark.SparkContext
  8 +import org.apache.spark.rdd.RDD
  9 +import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
  10 +import org.apache.spark.sql.{Row, DataFrame, SaveMode, SQLContext}
  11 +import org.apache.spark.sql.functions._
  12 +
  13 +import org.apache.spark.mllib.regression.LabeledPoint
  14 +import org.apache.spark.mllib.linalg.Vectors
  15 +import org.apache.spark.mllib.tree.RandomForest
  16 +import org.apache.spark.mllib.tree.model.RandomForestModel
  17 +
  18 +object simContentsModel {
  19 +
  20 + val sc = SparkContext.getOrCreate()
  21 + val sqlContext = SQLContext.getOrCreate(sc)
  22 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  23 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  24 + java.net.URI("hdfs://pikinn"), hadoopConf)
  25 + import sqlContext.implicits._
  26 +
  27 +
  28 + def main(args: Array[String]): Unit = {
  29 +
  30 + /// 컨텐츠 카드 수 정보 불러오기
  31 + val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
  32 + selectExpr("contents_id as cid1", "cardSize")
  33 +
  34 + /// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
  35 + val kpi = Util.readDashboardTable("kpi3","*","*","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
  36 + join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null and consumeTime > 10.0")
  37 +
  38 + //val cardGroupConsume = kpi3.groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
  39 +
  40 + //////////
  41 + val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
  42 + "if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")
  43 +
  44 + val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
  45 + expr("cid"),
  46 + expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
  47 + expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
  48 + expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
  49 + expr("case when card_type = 'SNS' then count else 0 end as SNS"),
  50 + expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
  51 + expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
  52 + expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
  53 + expr("case when card_type = 'INTR' then count else 0 end as INTR"),
  54 + expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
  55 + ).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
  56 + expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
  57 + expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))
  58 +
  59 + val contentsType = Util.tables("MG_CONTENTS").where("status='ACTV'").select(
  60 + expr("contents_id as cid"),
  61 + expr("case when contents_type = 'ALBUM' then 1 else 0 end as ALBUM"),
  62 + expr("case when contents_type = 'ALBUM.A' then 1 else 0 end as ALBUM_A"),
  63 + expr("case when contents_type = 'CHST' then 1 else 0 end as CHST"),
  64 + expr("case when contents_type = 'CHST.A' then 1 else 0 end as CHST_A"),
  65 + expr("case when contents_type = 'TOON' then 1 else 0 end as TOON"),
  66 + expr("case when contents_type = 'LIVE' then 1 else 0 end as LIVE")
  67 + )
  68 +
  69 + val simConTrn = kpi.join(cidCardType, kpi("cid")===cidCardType("cid")).drop(cidCardType("cid")).
  70 + join(contentsType, kpi("cid")===contentsType("cid")).drop(contentsType("cid"))
  71 +
  72 + //simConTrn.map(x => s"${x(5)}|${x(7)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}|${x(14)}|${x(15)}|${x(3)}").
  73 + // coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/cardTypeConsume1")
  74 +
  75 + //val tstdf = sc.textFile(s"hdfs://pikinn/user/evan/Features/cardTypeConsume1").map(x => x.split("\\|")).
  76 + // map(x => (x(0).toDouble,x(1).toDouble,x(2).toDouble,x(3).toDouble,x(4).toDouble,x(5).toDouble,x(6).toDouble,x(7).toDouble,x(8).toDouble,x(9).toDouble,x(10).toDouble)).
  77 + // toDF("cardSize","LANDING","SHOPPING","PHOTO","SNS","PANORAMA","TEXT","YOUTUBE","INTR","VIDEO","consumeTime")
  78 +
  79 + // Load and parse the data
  80 + val parsedData1 = simConTrn.map { line =>
  81 + LabeledPoint(line.getAs[Double]("consumeTime"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
  82 + line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble, line.getAs[Long]("TEXT").toDouble,
  83 + line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble, line.getAs[Int]("ALBUM").toDouble,
  84 + line.getAs[Int]("ALBUM_A").toDouble, line.getAs[Int]("CHST").toDouble, line.getAs[Int]("CHST_A").toDouble, line.getAs[Int]("TOON").toDouble, line.getAs[Int]("LIVE").toDouble
  85 + ))
  86 + }.cache()
  87 +
  88 + /*val parsedData2 = tstdf.map { line =>
  89 + LabeledPoint(line.getAs[Double]("consumeTime"), Vectors.dense(line.getAs[Double]("cardSize"), line.getAs[Double]("LANDING"),
  90 + line.getAs[Double]("SHOPPING"), line.getAs[Double]("PHOTO"), line.getAs[Double]("SNS"), line.getAs[Double]("PANORAMA"), line.getAs[Double]("TEXT"),
  91 + line.getAs[Double]("YOUTUBE"), line.getAs[Double]("INTR"), line.getAs[Double]("VIDEO")
  92 + ))
  93 + }.cache()*/
  94 +
  95 + val splits = parsedData1.randomSplit(Array(0.7, 0.3))
  96 + val (trainingData, testData) = (splits(0), splits(1))
  97 +
  98 + val numClasses = 2
  99 + val categoricalFeaturesInfo = Map[Int, Int]()
  100 + val numTrees = 100 // Use more in practice.
  101 + val featureSubsetStrategy = "auto" // Let the algorithm choose.
  102 + val impurity = "variance"
  103 + val maxDepth = 10
  104 + val maxBins = 32
  105 +
  106 + val model = RandomForest.trainRegressor(parsedData1, categoricalFeaturesInfo,
  107 + numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
  108 +
  109 + val trnPredictions = trainingData.collect.map { point =>
  110 + val prediction = model.predict(point.features)
  111 + (point.label, prediction)
  112 + }
  113 + trnPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / trnPredictions.size
  114 +
  115 + val labelsAndPredictions = testData.collect.map { point =>
  116 + val prediction = model.predict(point.features)
  117 + (point.label, prediction)
  118 + }
  119 + labelsAndPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / labelsAndPredictions.size
  120 +
  121 + try {
  122 + hdfs.delete(new org.apache.hadoop.fs.Path(s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel"), true)
  123 + } catch {
  124 + case _: Throwable => {}
  125 + }
  126 + model.save(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  127 + val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  128 +
  129 + val allPredictions = parsedData1.collect.map { point =>
  130 + val prediction = RFmodel.predict(point.features)
  131 + (point.label, prediction)
  132 + }
  133 + allPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / allPredictions.size
  134 +
  135 + val tst = sc.parallelize(allPredictions).toDF("act","pred").selectExpr("*", "abs(act - pred) as gap").selectExpr("*","gap/act as ErrorRate")
  136 +
  137 + }
  138 +}
... ... @@ -83,8 +83,8 @@
83 83 "com.piki_ds" %% "dsutils" % "0.1.8",
84 84 "com.piki_ds" %% "dsutils_hbase" % "0.3.6-SNAPSHOT",
85 85 "org.codehaus.jackson" % "jackson-core-asl" % "1.9.13",
86   - "org.jblas" % "jblas" % "1.2.4"
87   -
  86 + "org.jblas" % "jblas" % "1.2.4",
  87 + "com.piki_ds" %% "dscontentreport" % "0.1.0-SNAPSHOT"
88 88 )
89 89  
90 90  
... ... @@ -2,7 +2,7 @@
2 2  
3 3 BASEDIR=$(dirname $0)
4 4  
5   -if [ $# != '1' ];
  5 +if [ $# != '1' ] && [ $# != '2' ];
6 6 then
7 7 echo "usage : sh score.sh <scalaClassName>"
8 8 echo "ex) sh score.sh EditorScore"
9 9  
10 10  
... ... @@ -16,15 +16,15 @@
16 16 DATE_SUB="7"
17 17 DAY_TO_DELETE="$(date "+%Y%m%d" -d "$DATE_SUB days ago")"
18 18 DELETE_LOG="${LOG_DIR}/$1_$DAY_TO_DELETE.log"
19   -LOG="${LOG_DIR}/$1_$TODAY.log"
  19 +LOG="${LOG_DIR}/$1_$2_$TODAY.log"
20 20  
21 21 #HADOOP_CONF_DIR=/etc/hadoop/conf
22 22  
23 23 /data/spark/bin/spark-submit \
24   ---class com.piki_ds.ver1.$1 \
  24 +--class $1 \
25 25 --master yarn-client \
26 26 --conf "spark.default.parallelism=250" \
27   -$BASEDIR/target/scala-2.11/dsquality-assembly-0.1.0-SNAPSHOT.jar >> $LOG 2>&1
  27 +$BASEDIR/target/scala-2.11/dsquality-assembly-0.1.0-SNAPSHOT.jar $2 >> $LOG 2>&1
28 28 #target/scala-2.11/dsmakingscore-assembly-0.1.0-SNAPSHOT.jar >> $LOG 2>&1
29 29 #target/scala-2.11/dsmakingscore_2.11-0.1.0-SNAPSHOT.jar >> $LOG 2>&1
30 30 #--jars "lib_managed/jars/mysql/mysql-connector-java/mysql-connector-java-5.1.36.jar" \