Commit e8087e9c6857aff97d527782104fa248d8d13bb3

Authored by Joanne ago
Exists in master

Merge branch 'master' of ssh://gitlab.pikicast.com:2222/data-science-team/dsquality

Showing 6 changed files Side-by-side Diff

app/com/piki_ds/ver2ggh/expConTime.scala View file @ e8087e9
  1 +/**
  2 + * Created by Evan on 2016. 5. 10..
  3 + */
  4 +package com.piki_ds.ver2ggh
  5 +import org.apache.spark.SparkContext
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  8 +import org.apache.spark.sql.functions._
  9 +import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType}
  10 +
  11 +import scala.collection.immutable.Iterable
  12 +
  13 +object expConTime {
  14 +
  15 + val sc = SparkContext.getOrCreate()
  16 + val sqlContext = SQLContext.getOrCreate(sc)
  17 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  18 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  19 + java.net.URI("hdfs://pikinn"), hadoopConf)
  20 + import sqlContext.implicits._
  21 +
  22 + /**
  23 + * uuid 별 노출, 오픈, 소비 정보를 불러옴
  24 + *
  25 + * @param date
  26 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume)
  27 + */
  28 + def getLogParse(date: String): DataFrame = {
  29 + val current = date
  30 + val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8))
  31 + val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz")
  32 +
  33 + val format = new java.text.SimpleDateFormat("yyyyMMdd")
  34 + val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60))
  35 + val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8)
  36 + val currentday = year + "-" + month + "-" + day
  37 +
  38 + val action = log.map(x => {
  39 + try {
  40 + val s = x.split("\\|", -1)
  41 + var mapping = scala.collection.mutable.Map.empty[String, String]
  42 + if (s.size > 4 && (
  43 + s(2).forall(Character.isDigit(_)) &&
  44 + s(3).forall(Character.isDigit(_)) &&
  45 + s(4).forall(Character.isDigit(_)))) {
  46 + val s2 = if (s(2).size == 10) s(2) + "000" else s(2)
  47 + mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4))
  48 + if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
  49 + && s(9).forall(Character.isDigit(_)) && s(9).size < 4) {
  50 + def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
  51 + mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell())
  52 + Some(mapping)
  53 + } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
  54 + && s(9).forall(Character.isLetter(_)) && s(8).size < 4) {
  55 + def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
  56 + mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell())
  57 + Some(mapping)
  58 + } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) {
  59 + mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7))
  60 + Some(mapping)
  61 + } else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
  62 + def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
  63 + mapping = mapping ++ Map("consume" -> consumeTime(30).toString)
  64 + Some(mapping)
  65 + } else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
  66 + def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
  67 + mapping = mapping ++ Map("consume" -> consumeTime2(30).toString)
  68 + Some(mapping)
  69 + } else {
  70 + None
  71 + }
  72 + }
  73 + else {
  74 + None
  75 + }
  76 + } catch {
  77 + case e: Exception =>
  78 + None
  79 + }
  80 + }).flatMap(x => x)
  81 +
  82 + val df = action.filter(x => x != None).filter(x => x.getOrElse("ERROR", null) == null).map { x =>
  83 + try {
  84 + Some(x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong,
  85 + x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt, x.getOrElse("consume", "0").toInt)
  86 + } catch {
  87 + case e: Exception =>
  88 + None
  89 + }
  90 + }.filter(x => x != None).map(x => x.get)
  91 +
  92 + // 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택
  93 + val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0").
  94 + where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")).
  95 + where(s"timeStamp >= '$currentday'")
  96 +
  97 + df1
  98 + }
  99 +
  100 + /**
  101 + * uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑
  102 + * uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에
  103 + * 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑
  104 + *
  105 + * (오픈 , 소비 정보 활용)
  106 + *
  107 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
  108 + */
  109 + def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = {
  110 + val out = base.map { y =>
  111 + val info = y._2.map { x =>
  112 + val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
  113 + (event, time, key, position, cid)
  114 + }.filter(x => x._1 == "OPEN")
  115 +
  116 + var idx = -1
  117 + val result1 = y._2.map { x =>
  118 + def idxExc(idx: Int) = if (idx == -1) 0 else idx
  119 + if (x._1 == "OPEN") {
  120 + idx += 1
  121 + Some(x.copy(_9 = info(idx)._2, _8 = 0))
  122 + } else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) {
  123 + Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4))
  124 + } else None
  125 + }
  126 + val result2 = result1.filter(x => x != None).map(x => x.get)
  127 + (y._1, result2)
  128 + }.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime")
  129 +
  130 + val StringToInt = udf[Int, String]( _.toInt)
  131 +
  132 + val out1 = out.withColumn("position", StringToInt(out("position")))
  133 +
  134 + out1
  135 + }
  136 +
  137 + /**
  138 + * uuid 별 컨텐츠 노출에 대한 노출 시간 계산
  139 + * uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가?
  140 + * uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산
  141 + *
  142 + * (노출 , 오픈 정보 활용)
  143 + *
  144 + * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
  145 + */
  146 + def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = {
  147 + val expTime = base.flatMap { x =>
  148 + try {
  149 + val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y =>
  150 + val sortY = y._2.sortWith(_._2 < _._2)
  151 +
  152 + val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN")
  153 + val out = if (lastIdx == -1) {
  154 + y._2
  155 + } else {
  156 + y._2.take(lastIdx + 1)
  157 + }
  158 +
  159 + val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
  160 + val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
  161 + val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
  162 + val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
  163 +
  164 + (y._1._1, y._1._2, y._1._3, result1, size1, result2, size2)
  165 + }.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7))
  166 + ar
  167 + } catch {
  168 + case e:Exception => None
  169 + }
  170 + }.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2").
  171 + where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0")
  172 +
  173 + expTime
  174 + }
  175 +
  176 + def main(args: Array[String]): Unit = {
  177 +
  178 + // 로그 파스
  179 + val saveDay = if(args.length == 1) args(0) else "20160518"
  180 + val df1 = getLogParse(saveDay)
  181 +
  182 + // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
  183 + val base1 = df1.map { x =>
  184 + (x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"),
  185 + x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L)
  186 + }.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2)))
  187 +
  188 + val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))).
  189 + filter { x => x._2.map(y => y._1).contains("OPEN") }
  190 +
  191 + val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN"))))
  192 +
  193 + // (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑
  194 + val openConsumeIdxed = consumeIndexing(base2)
  195 +
  196 + val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'").
  197 + groupBy("uuid", "cid", "fromKey", "position").
  198 + agg(expr("sum(consume) as consume"))
  199 +
  200 + // (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산
  201 + val expTime = exposureTime(base3)
  202 +
  203 + val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1")
  204 +
  205 + val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") &&
  206 + exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
  207 + drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
  208 +
  209 + expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
  210 +
  211 + }
  212 +}
  213 +
  214 + ///////////////////////ㅣ
  215 +
  216 + /* 로그 변환하여 종합
  217 + val tst3 = base3.flatMap { x =>
  218 + val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x =>
  219 + val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN")
  220 + val out = if (lastIdx == -1) {
  221 + x._2
  222 + } else {
  223 + x._2.take(lastIdx + 1)
  224 + }
  225 + (x._1, out)
  226 + }.flatMap(x => x._2).toArray.sortWith(_._2 < _._2)
  227 + ar2
  228 + }.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime")
  229 + val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'").
  230 + withColumn("position", StringToInt(tst3("position"))).
  231 + selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking").
  232 + groupBy("uuid","cid","ranking").
  233 + agg(expr("sum(dwell) as dwell"))
  234 +
  235 + val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
  236 + ("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
  237 + ("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) )
  238 +
  239 + sqlContext.createDataFrame(ar).show
  240 +
  241 + val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x =>
  242 + val lastIdx = x._2.lastIndexWhere(x => x._1 == "2")
  243 + val out = if( lastIdx == -1) {
  244 + x._2
  245 + } else x._2.take(lastIdx+1)
  246 + (x._1, out)
  247 + }.flatMap(x => x._2).toArray
  248 +
  249 + sqlContext.createDataFrame(gg).show
  250 +
  251 + ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x)
  252 +
  253 + val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10))
  254 +
  255 + val tt1 = tt.take(lastIdx+1)
  256 +
  257 + val tt = consumeIndexing(df2)
  258 + val dff = base2.take
  259 +
  260 + val tmp2 = df2._2.map { x =>
  261 + val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
  262 + (event, time, key, position, cid)
  263 + }.filter(x => x._1 == "OPEN")
  264 +
  265 + var idx = -1
  266 + val gg = df2._2.map { x =>
  267 + println(x)
  268 + println(idx)
  269 + def idxExc(idx: Int) = if (idx == -1) 0 else idx
  270 + if (x._1 == "OPEN") {
  271 + idx += 1
  272 + Some(x.copy(_8 = tmp2(idx)._2))
  273 + } else if (x._1 == "CONSUME" && x._4 == tmp2(idxExc(idx))._5) {
  274 + Some(x.copy(_8 = tmp2(idxExc(idx))._2, _5 = tmp2(idxExc(idx))._3, _6 = tmp2(idxExc(idx))._4))
  275 + } else None
  276 + }
  277 +
  278 + val a = gg.filter(x => x != None).map(x => x.get)
  279 + sqlContext.createDataFrame(a).show(100)
  280 +
  281 + val ar = Array( ("1",20,"h",0), ("2",21,null,0), ("2",22,null,0), ("2",23,null,0),
  282 + ("1",30,"h",0), ("2",31,null,0), ("2",32,null,0), ("2",33,null,0),
  283 + ("1",40,"h",0), ("2",41,null,0), ("2",42,null,0), ("2",43,null,0) )
  284 +
  285 + //val tst = sc.parallelize(ar)
  286 +
  287 +
  288 + tst.map {x=>
  289 + val (opentime, fromkey, position) = if(x._1 == "1") {
  290 + val opentime = x._2
  291 + val fromkey = x._3
  292 + val position = x._4
  293 + (opentime, fromkey, position)
  294 + }
  295 +
  296 + if(x._1 == "1") {
  297 + x.copy(_4 = opentime)
  298 + } else {
  299 + x.copy(_2 = fromkey, _3 = position, _4 = opentime)
  300 + }
  301 + }
  302 +
  303 + val t1 = ar.map { x =>
  304 + val (event,time,key) = if (x._1 == "1") (x._1, x._2, x._3) else ("2", 0, "0")
  305 + (event, time, key)
  306 + }.filter(x => x._1 == "1")
  307 +
  308 + var idx = -1
  309 + ar.map{ x=>
  310 + println(x)
  311 + println(idx)
  312 + if (x._1 == "1") {
  313 + idx += 1
  314 + x.copy(_4 = t1(idx)._2)
  315 + } else {
  316 + x.copy(_4 = t1(idx)._2, _3=t1(idx)._3)
  317 + }
  318 + }
  319 + */
app/com/piki_ds/ver2ggh/gghScore.scala View file @ e8087e9
  1 +/**
  2 + * Created by Evan on 2016. 5. 18..
  3 + */
  4 +
  5 +package com.piki_ds.ver2ggh
  6 +
  7 +import com.piki_ds.ver2ggh
  8 +import com.piki_ds.data.contents.report.Util
  9 +import java.util.Date
  10 +import org.apache.hadoop.fs.Path
  11 +import org.apache.spark.SparkContext
  12 +import org.apache.spark.mllib.linalg.Vectors
  13 +import org.apache.spark.mllib.regression.LabeledPoint
  14 +import org.apache.spark.mllib.tree.model.RandomForestModel
  15 +import org.apache.spark.rdd.RDD
  16 +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  17 +import org.apache.spark.sql.functions._
  18 +
  19 +
  20 +object gghScore {
  21 +
  22 + val sc = SparkContext.getOrCreate()
  23 + val sqlContext = SQLContext.getOrCreate(sc)
  24 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  25 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  26 + java.net.URI("hdfs://pikinn"), hadoopConf)
  27 + import sqlContext.implicits._
  28 +
  29 + def main(args: Array[String]): Unit = {
  30 +
  31 + val format = new java.text.SimpleDateFormat("yyyyMMdd")
  32 + val currentTime = new Date()
  33 +
  34 + val day_delm = 24 * 1000 * 60 * 60L
  35 + val saveDay = if (args.length >= 1) args(0) else format.format(currentTime.getTime - day_delm)
  36 + val ind = -6 to 0
  37 + val dateSet = ind.map(x => {
  38 + format.format(format.parse(saveDay).getTime + day_delm * x)
  39 + })
  40 +
  41 + val hadoopConf = sc.hadoopConfiguration
  42 + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  43 +
  44 + dateSet.map{x =>
  45 + val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
  46 + val out = if(!fs.exists((new Path(addrs)))) {x} else "null"
  47 + out
  48 + }.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))
  49 +
  50 + val expConsume = dateSet.map { x =>
  51 + val expConsumeOut = sqlContext.read.parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}").drop("table").drop("dt").
  52 + where("fromKey = 'm' or (fromKey = 'h' and position = 0)")
  53 + expConsumeOut
  54 + }.reduce((x,y) => x.unionAll(y))
  55 +
  56 + /// 컨텐츠 카드 수별 카드 그룹 생성 5단위
  57 + val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
  58 + map(x => (x.getAs[Long]("contents_id"), x.getAs[Long]("cardSize"), x.getAs[Long]("cardSize")/5)).toDF("cid1","cardSize","sizeGroup").
  59 + selectExpr("*","if(sizeGroup>19, 20, sizeGroup) as cardGroup").drop("sizeGroup")
  60 + /// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
  61 + //val kpi3 = Util.readDashboardTable("kpi3","*","{*}","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
  62 + // join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null").cache
  63 + //val cardGroupConsume = kpi3.where("consumeTime > 10.0").groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
  64 + // kpi3.map(x => s"${x(0)}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(5)}|${x(6)}").coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/kpi3")
  65 + // cardGroupConsume.stat.corr("cardGroup1","cardGroupConTime") cardGroup 과 소비시간평균과의 상관관계 0.89
  66 +
  67 + ///
  68 + val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
  69 + "if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")
  70 +
  71 + val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
  72 + expr("cid"),
  73 + expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
  74 + expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
  75 + expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
  76 + expr("case when card_type = 'SNS' then count else 0 end as SNS"),
  77 + expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
  78 + expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
  79 + expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
  80 + expr("case when card_type = 'INTR' then count else 0 end as INTR"),
  81 + expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
  82 + ).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
  83 + expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
  84 + expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))
  85 +
  86 +
  87 + val contentsType = Util.tables("MG_CONTENTS").where("status='ACTV'").select(
  88 + expr("contents_id as cid"),
  89 + expr("case when contents_type = 'ALBUM' then 1 else 0 end as ALBUM"),
  90 + expr("case when contents_type = 'ALBUM.A' then 1 else 0 end as ALBUM_A"),
  91 + expr("case when contents_type = 'CHST' then 1 else 0 end as CHST"),
  92 + expr("case when contents_type = 'CHST.A' then 1 else 0 end as CHST_A"),
  93 + expr("case when contents_type = 'TOON' then 1 else 0 end as TOON"),
  94 + expr("case when contents_type = 'LIVE' then 1 else 0 end as LIVE")
  95 + )
  96 +
  97 + val cidCardTypeSize = cidCardType.join(cardSize, cidCardType("cid")===cardSize("cid1"),"leftouter").drop(cardSize("cid1")).drop(cardSize("cardGroup")).
  98 + join(contentsType, cidCardType("cid")===contentsType("cid")).drop(contentsType("cid"))
  99 +
  100 + val predData = cidCardTypeSize.map { line =>
  101 + LabeledPoint(line.getAs[Long]("cid"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
  102 + line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble,
  103 + line.getAs[Long]("TEXT").toDouble, line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble,
  104 + line.getAs[Int]("ALBUM").toDouble, line.getAs[Int]("ALBUM_A").toDouble, line.getAs[Int]("CHST").toDouble, line.getAs[Int]("CHST_A").toDouble,
  105 + line.getAs[Int]("TOON").toDouble, line.getAs[Int]("LIVE").toDouble
  106 + ))
  107 + }
  108 +
  109 + val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  110 + val predResult = predData.collect.map { point =>
  111 + val prediction = RFmodel.predict(point.features)
  112 + (point.label.toLong, prediction)
  113 + }
  114 +
  115 + val cidPredConsume = sc.parallelize(predResult).toDF("cid1","predConsume").withColumn("predConsume", column("predConsume")*1000)
  116 +
  117 + /// 노출 위치별 컨텐츠 소비가 이뤄지기 까지의 노출시간 평균
  118 + val posExpTime = expConsume.groupBy("fromKey","position").agg(expr("sum(expTime2)/sum(expSize2) as posExpTime")).
  119 + selectExpr("fromKey as fromKey1", "position as position1", "posExpTime")
  120 +
  121 + /// 위치 별 ctr
  122 + val positionCtr = expConsume.groupBy("fromKey","position").agg(expr("sum(expSize2) as expSize"), expr("count(consume) as consumeCnt")).
  123 + withColumn("rankingCtr", column("consumeCnt")/column("expSize")).selectExpr("fromKey as fromKey1","position as position1","rankingCtr")
  124 +
  125 + /// 컨텐츠, 노출 위치 별 노출 시간 및 소비시간
  126 + val cidPositionInfo = expConsume.groupBy("cid","fromKey","position").
  127 + agg(//expr("sum(expTime1) as expTime1"), expr("count(expSize1) as expSize1"),
  128 + expr("sum(expTime2) as expTime2"), expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
  129 + join(positionCtr, column("fromKey")===positionCtr("fromKey1") && column("position")===positionCtr("position1"), "leftouter").
  130 + drop(positionCtr("fromKey1")).drop(positionCtr("position1")).
  131 + join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).na.fill(0, Seq("consume")).
  132 + where("expSize2 > 200 and cardSize is not null")
  133 +
  134 + val gghtmp = cidPositionInfo.join(posExpTime, cidPositionInfo("fromKey")===posExpTime("fromKey1") && cidPositionInfo("position")===posExpTime("position1"), "leftouter").
  135 + drop(posExpTime("fromKey1")).drop(posExpTime("position1")).
  136 + join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1"))
  137 +
  138 + /*
  139 + val gghBase3 = gghtmp.selectExpr("*", "consume/(expSize2*rankingCtr*predConsume) as consumeEff", "expTime2/(expSize2*posExpTime) as expEff").
  140 + withColumn("ggh", column("consumeEff")/column("expEff"))
  141 + val gghVer2 = gghBase3.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consumeEff)/count(*) as consumeEff"),
  142 + expr("sum(expEff)/count(*) as expEff")).withColumn("ggh", column("consumeEff")/column("expEff")).where("totalExpSize2 > 1000")
  143 + */
  144 +
  145 + val gghBase4 = gghtmp.selectExpr("*", "expSize2*rankingCtr as expectConCnt", "expTime2/(expSize2*posExpTime) as expEff")
  146 +
  147 + val gghVer3 = gghBase4.groupBy("cid").agg(expr("sum(expSize2) as totalExpSize2"), expr("sum(consume) as consume"),
  148 + expr("sum(expectConCnt) as expectConCnt"), expr("sum(expEff)/count(*) as expEff")).
  149 + join(cidPredConsume, cidPositionInfo("cid")===cidPredConsume("cid1"), "leftouter").drop(cidPredConsume("cid1")).
  150 + selectExpr("cid","totalExpSize2","consume/(expectConCnt * predConsume) as consumeEff", "expEff").withColumn("ggh", column("consumeEff")/column("expEff")).
  151 + where("totalExpSize2 > 1000")
  152 +
  153 + val gghMean = gghVer3.describe().where("summary = 'mean'").drop("summary").take(1)(0)(4).toString.toDouble
  154 + val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
  155 +
  156 + val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
  157 +
  158 + gghScaled.map{x =>
  159 + s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
  160 + }.saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
  161 +
  162 + /*//////// CTR and CTR Time 계산
  163 + val ctr = expConsume.groupBy("cid").
  164 + agg(expr("sum(expTime1) as expTime1"), expr("sum(expSize1) as expSize1"), expr("sum(expTime2) as expTime2"),
  165 + expr("sum(expSize2) as expSize2"), expr("sum(consume) as consume"), expr("count(consume) as conCount")).
  166 + join(cardSize, column("cid")===cardSize("cid1"), "leftouter").drop(cardSize("cid1")).
  167 + withColumn("ctr",column("conCount")/column("expSize2")).
  168 + withColumn("ctrTime",column("consume")/column("expTime2")).
  169 + withColumnRenamed("cid","cid1")
  170 +
  171 + //////////////////////////
  172 + /// contentsInfo 불러오기
  173 + val ContentsInfo = Util.tables("MG_CONTENTS").selectExpr("contents_id", "udate","title" ,"uid")
  174 + val userInfo = Util.tables("USER").selectExpr("uid","name")
  175 + val ciduser = ContentsInfo.join(userInfo, ContentsInfo("uid")===userInfo("uid"), "leftouter").drop(userInfo("uid"))
  176 +
  177 + val result2 = gghScaled.join(ciduser, gghScaled("cid")===ciduser("contents_id"), "leftouter").drop(ciduser("contents_id"))//.where("udate > '2016-05-12'")
  178 + ////////// ctr and ctrTime 지표와 비교
  179 + val gghResult = result2.join(ctr.selectExpr("cid1","ctr as noRankCtr","ctrTime as noRankCtrTime","expTime2 as expTime","expSize2 as expSize","consume","conCount"),
  180 + result2("cid")===ctr("cid1"), "leftouter").drop(ctr("cid1")).drop("uid").drop("totalExpSize2")
  181 +
  182 + //gghResult.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/preprocess/ggh/$saveDay")
  183 +
  184 + gghResult.map{x=>
  185 + val title = x.getAs[String]("title").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
  186 + val editor = x.getAs[String]("name").replaceAll("\n", " ").replaceAll("\r", " ").replaceAll("\\|", " ").replaceAll("\\,", " ").trim
  187 + s"${x(0)}|${title}|${x(5)}|${editor}|${x(1)}|${x(2)}|${x(3)}|${x(4)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}"}.
  188 + coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
  189 + */
  190 +
  191 + }
  192 +
  193 +}
app/com/piki_ds/ver2ggh/simContentsModel.scala View file @ e8087e9
  1 +/**
  2 + * Created by Evan on 2016. 6. 2..
  3 + */
  4 +package com.piki_ds.ver2ggh
  5 +
  6 +import com.piki_ds.data.contents.report.Util
  7 +import org.apache.spark.SparkContext
  8 +import org.apache.spark.rdd.RDD
  9 +import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
  10 +import org.apache.spark.sql.{Row, DataFrame, SaveMode, SQLContext}
  11 +import org.apache.spark.sql.functions._
  12 +
  13 +import org.apache.spark.mllib.regression.LabeledPoint
  14 +import org.apache.spark.mllib.linalg.Vectors
  15 +import org.apache.spark.mllib.tree.RandomForest
  16 +import org.apache.spark.mllib.tree.model.RandomForestModel
  17 +
  18 +object simContentsModel {
  19 +
  20 + val sc = SparkContext.getOrCreate()
  21 + val sqlContext = SQLContext.getOrCreate(sc)
  22 + val hadoopConf = new org.apache.hadoop.conf.Configuration()
  23 + val hdfs = org.apache.hadoop.fs.FileSystem.get(new
  24 + java.net.URI("hdfs://pikinn"), hadoopConf)
  25 + import sqlContext.implicits._
  26 +
  27 +
  28 + def main(args: Array[String]): Unit = {
  29 +
  30 + /// 컨텐츠 카드 수 정보 불러오기
  31 + val cardSize = Util.tables("MG_CARD").where("status='ACTV'").groupBy("contents_id").agg(expr("count(ordering) as cardSize")).
  32 + selectExpr("contents_id as cid1", "cardSize")
  33 +
  34 + /// kpi3 지표로 부터 카드 그룹별 평균 컨텐츠 소비시간 계산
  35 + val kpi = Util.readDashboardTable("kpi3","*","*","*", "*").selectExpr("cid","udate","appView","consumeTime","numberOfCard").dropDuplicates(Seq("cid")).
  36 + join(cardSize, column("cid")===cardSize("cid1")).drop(cardSize("cid1")).where("numberOfCard is not null and consumeTime > 10.0")
  37 +
  38 + //val cardGroupConsume = kpi3.groupBy("cardGroup").agg(expr("avg(consumeTime) * 1000 as cardGroupConTime")).selectExpr("cardGroup as cardGroup1", "cardGroupConTime")
  39 +
  40 + //////////
  41 + val cardtype = Util.tables("MG_CARD").where("status='ACTV'").selectExpr("contents_id as cid","card_id as card_id",
  42 + "if(card_type = 'DYNAMIC', 'PHOTO', if(card_type like '%SNS%', 'SNS', card_type)) as card_type")
  43 +
  44 + val cidCardType = cardtype.groupBy("cid","card_type").agg(expr("count(*) as count")).select(
  45 + expr("cid"),
  46 + expr("case when card_type = 'LANDING' then count else 0 end as LANDING"),
  47 + expr("case when card_type = 'SHOPPING' then count else 0 end as SHOPPING"),
  48 + expr("case when card_type = 'PHOTO' then count else 0 end as PHOTO"),
  49 + expr("case when card_type = 'SNS' then count else 0 end as SNS"),
  50 + expr("case when card_type = 'PANORAMA' then count else 0 end as PANORAMA"),
  51 + expr("case when card_type = 'TEXT' then count else 0 end as TEXT"),
  52 + expr("case when card_type = 'YOUTUBE' then count else 0 end as YOUTUBE"),
  53 + expr("case when card_type = 'INTR' then count else 0 end as INTR"),
  54 + expr("case when card_type = 'VIDEO' then count else 0 end as VIDEO")
  55 + ).groupBy("cid").agg(expr("sum(LANDING) as LANDING"), expr("sum(SHOPPING) as SHOPPING"), expr("sum(PHOTO) as PHOTO"),
  56 + expr("sum(SNS) as SNS"),expr("sum(PANORAMA) as PANORAMA"),expr("sum(TEXT) as TEXT"),expr("sum(YOUTUBE) as YOUTUBE"),
  57 + expr("sum(INTR) as INTR"),expr("sum(VIDEO) as VIDEO"))
  58 +
  59 + val contentsType = Util.tables("MG_CONTENTS").where("status='ACTV'").select(
  60 + expr("contents_id as cid"),
  61 + expr("case when contents_type = 'ALBUM' then 1 else 0 end as ALBUM"),
  62 + expr("case when contents_type = 'ALBUM.A' then 1 else 0 end as ALBUM_A"),
  63 + expr("case when contents_type = 'CHST' then 1 else 0 end as CHST"),
  64 + expr("case when contents_type = 'CHST.A' then 1 else 0 end as CHST_A"),
  65 + expr("case when contents_type = 'TOON' then 1 else 0 end as TOON"),
  66 + expr("case when contents_type = 'LIVE' then 1 else 0 end as LIVE")
  67 + )
  68 +
  69 + val simConTrn = kpi.join(cidCardType, kpi("cid")===cidCardType("cid")).drop(cidCardType("cid")).
  70 + join(contentsType, kpi("cid")===contentsType("cid")).drop(contentsType("cid"))
  71 +
  72 + //simConTrn.map(x => s"${x(5)}|${x(7)}|${x(8)}|${x(9)}|${x(10)}|${x(11)}|${x(12)}|${x(13)}|${x(14)}|${x(15)}|${x(3)}").
  73 + // coalesce(1, shuffle = true).saveAsTextFile(s"hdfs://pikinn/user/evan/Features/cardTypeConsume1")
  74 +
  75 + //val tstdf = sc.textFile(s"hdfs://pikinn/user/evan/Features/cardTypeConsume1").map(x => x.split("\\|")).
  76 + // map(x => (x(0).toDouble,x(1).toDouble,x(2).toDouble,x(3).toDouble,x(4).toDouble,x(5).toDouble,x(6).toDouble,x(7).toDouble,x(8).toDouble,x(9).toDouble,x(10).toDouble)).
  77 + // toDF("cardSize","LANDING","SHOPPING","PHOTO","SNS","PANORAMA","TEXT","YOUTUBE","INTR","VIDEO","consumeTime")
  78 +
  79 + // Load and parse the data
  80 + val parsedData1 = simConTrn.map { line =>
  81 + LabeledPoint(line.getAs[Double]("consumeTime"), Vectors.dense(line.getAs[Long]("cardSize").toDouble, line.getAs[Long]("LANDING").toDouble,
  82 + line.getAs[Long]("SHOPPING").toDouble, line.getAs[Long]("PHOTO").toDouble, line.getAs[Long]("SNS").toDouble, line.getAs[Long]("PANORAMA").toDouble, line.getAs[Long]("TEXT").toDouble,
  83 + line.getAs[Long]("YOUTUBE").toDouble, line.getAs[Long]("INTR").toDouble, line.getAs[Long]("VIDEO").toDouble, line.getAs[Int]("ALBUM").toDouble,
  84 + line.getAs[Int]("ALBUM_A").toDouble, line.getAs[Int]("CHST").toDouble, line.getAs[Int]("CHST_A").toDouble, line.getAs[Int]("TOON").toDouble, line.getAs[Int]("LIVE").toDouble
  85 + ))
  86 + }.cache()
  87 +
  88 + /*val parsedData2 = tstdf.map { line =>
  89 + LabeledPoint(line.getAs[Double]("consumeTime"), Vectors.dense(line.getAs[Double]("cardSize"), line.getAs[Double]("LANDING"),
  90 + line.getAs[Double]("SHOPPING"), line.getAs[Double]("PHOTO"), line.getAs[Double]("SNS"), line.getAs[Double]("PANORAMA"), line.getAs[Double]("TEXT"),
  91 + line.getAs[Double]("YOUTUBE"), line.getAs[Double]("INTR"), line.getAs[Double]("VIDEO")
  92 + ))
  93 + }.cache()*/
  94 +
  95 + val splits = parsedData1.randomSplit(Array(0.7, 0.3))
  96 + val (trainingData, testData) = (splits(0), splits(1))
  97 +
  98 + val numClasses = 2
  99 + val categoricalFeaturesInfo = Map[Int, Int]()
  100 + val numTrees = 100 // Use more in practice.
  101 + val featureSubsetStrategy = "auto" // Let the algorithm choose.
  102 + val impurity = "variance"
  103 + val maxDepth = 10
  104 + val maxBins = 32
  105 +
  106 + val model = RandomForest.trainRegressor(parsedData1, categoricalFeaturesInfo,
  107 + numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
  108 +
  109 + val trnPredictions = trainingData.collect.map { point =>
  110 + val prediction = model.predict(point.features)
  111 + (point.label, prediction)
  112 + }
  113 + trnPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / trnPredictions.size
  114 +
  115 + val labelsAndPredictions = testData.collect.map { point =>
  116 + val prediction = model.predict(point.features)
  117 + (point.label, prediction)
  118 + }
  119 + labelsAndPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / labelsAndPredictions.size
  120 +
  121 + try {
  122 + hdfs.delete(new org.apache.hadoop.fs.Path(s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel"), true)
  123 + } catch {
  124 + case _: Throwable => {}
  125 + }
  126 + model.save(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  127 +
  128 + val RFmodel = RandomForestModel.load(sc, s"hdfs://pikinn/user/evan/Features/cardTypeConsume/RFModel")
  129 +
  130 + val allPredictions = parsedData1.collect.map { point =>
  131 + val prediction = RFmodel.predict(point.features)
  132 + (point.label, prediction)
  133 + }
  134 + allPredictions.map{case(v, p) => math.abs(v - p)/v}.sum / allPredictions.size
  135 +
  136 + val tst = sc.parallelize(allPredictions).toDF("act","pred").selectExpr("*", "abs(act - pred) as gap").selectExpr("*","gap/act as ErrorRate")
  137 +
  138 + }
  139 +}
... ... @@ -83,8 +83,8 @@
83 83 "com.piki_ds" %% "dsutils" % "0.1.8",
84 84 "com.piki_ds" %% "dsutils_hbase" % "0.3.6-SNAPSHOT",
85 85 "org.codehaus.jackson" % "jackson-core-asl" % "1.9.13",
86   - "org.jblas" % "jblas" % "1.2.4"
87   -
  86 + "org.jblas" % "jblas" % "1.2.4",
  87 + "com.piki_ds" %% "dscontentreport" % "0.1.0-SNAPSHOT"
88 88 )
89 89  
90 90  
  1 +#!/usr/bin/env
  2 +
  3 +BASEDIR=$(dirname $0)
  4 +
  5 +if [ $# != '1' ] && [ $# != '2' ];
  6 +then
  7 + echo "usage : sh score.sh <scalaClassName>"
  8 + echo "ex) sh score.sh EditorScore"
  9 + exit
  10 +fi
  11 +
  12 +HOME=$BASEDIR
  13 +LOG_DIR=$HOME/logs
  14 +mkdir -p $LOG_DIR
  15 +TODAY=`date +"%Y%m%d"`
  16 +DATE_SUB="7"
  17 +DAY_TO_DELETE="$(date "+%Y%m%d" -d "$DATE_SUB days ago")"
  18 +DELETE_LOG="${LOG_DIR}/$1_$DAY_TO_DELETE.log"
  19 +LOG="${LOG_DIR}/$1_$2_$TODAY.log"
  20 +
  21 +#HADOOP_CONF_DIR=/etc/hadoop/conf
  22 +
  23 +/data/spark/bin/spark-submit \
  24 +--class $1 \
  25 +--master yarn-client \
  26 +--conf "spark.default.parallelism=250" \
  27 +$BASEDIR/target/scala-2.11/dsquality-assembly-0.1.0-SNAPSHOT.jar $2 >> $LOG 2>&1
  28 +#target/scala-2.11/dsmakingscore-assembly-0.1.0-SNAPSHOT.jar >> $LOG 2>&1
  29 +#target/scala-2.11/dsmakingscore_2.11-0.1.0-SNAPSHOT.jar >> $LOG 2>&1
  30 +#--jars "lib_managed/jars/mysql/mysql-connector-java/mysql-connector-java-5.1.36.jar" \
  31 +
  32 +
  33 +echo "END END END END END" >> $LOG
  34 +echo "END END END END END" >> $LOG
  35 +echo "END END END END END" >> $LOG
  36 +echo "END END END END END" >> $LOG
  37 +echo "END END END END END" >> $LOG
  38 +
  39 + echo "IF LOG FILE $DELETE_LOG EXISTS DELETE" >> $LOG
  40 +# Log rotation
  41 +if [ -f $DELETE_LOG ]
  42 +then
  43 + rm -f $DELETE_LOG >> $LOG 2>&1
  44 + echo "$DELETE_LOG deleted" >> $LOG
  45 +fi
... ... @@ -21,7 +21,7 @@
21 21 #HADOOP_CONF_DIR=/etc/hadoop/conf
22 22  
23 23 /data/spark/bin/spark-submit \
24   ---class com.piki_ds.ver1.$1 \
  24 +--class $1 \
25 25 --master yarn-client \
26 26 --conf "spark.default.parallelism=250" \
27 27 $BASEDIR/target/scala-2.11/dsquality-assembly-0.1.0-SNAPSHOT.jar >> $LOG 2>&1