Commit 282422a4501f62158e3bf0cc0f907a3ee8ee8ef9

Authored by Joanne ago
1 parent 311adf0a10
Exists in master

first commit of project related files

Showing 16 changed files with 1397 additions and 0 deletions Side-by-side Diff

  1 +# Created by .ignore support plugin (hsz.mobi)
app/com/piki_ds/preprocess/CidValidation.scala View file @ 282422a
  1 +package com.piki_ds.preprocess
  2 +
  3 +import org.apache.spark.sql.SQLContext
  4 +
  5 +import com.piki_ds.utils.GetTextFile.getDashDump
  6 +
  7 +
  8 +/**
  9 + * Created by jungwon on 4/21/16.
  10 + */
  11 +
  12 +object CidValidation {
  13 +
  14 + def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = {
  15 + import org.apache.spark.sql.functions._
  16 + val mgc = getDashDump(sQLContext,"MG_CONTENTS").where(s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})")
  17 + val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate")))
  18 + mgContents.map(x=>{
  19 + val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)")
  20 + val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0
  21 + (x.getAs[Long]("contents_id"), (status, ts))
  22 + }).reduceByKey((a,b) => {
  23 + import math.{min,max}
  24 + (max(a._1,b._1), min(a._2,b._2))
  25 + })
  26 + }
  27 +}
app/com/piki_ds/preprocess/HourlyCommentCnt.scala View file @ 282422a
  1 +package com.piki_ds.preprocess
  2 +
  3 +import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
  4 +import org.apache.spark.sql.functions._
  5 +import org.apache.spark.{SparkContext, SparkConf}
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.sql._
  8 +
  9 +import com.piki_ds.utils.SqlContextConf._
  10 +
  11 +/**
  12 + * 코멘트 점수 계산을 위한 preprocessing
  13 + * output:
  14 + * RDD[((cid, yyyy-MM-dd HH),(countAComment, countPComment, countUComment))
  15 + *
  16 + * 위치:
  17 + * /preprocess/cmtCountByHour/y=$yyyy/mo=$MM/d=$dd/h=$HH
  18 + *
  19 + * Created by jungwon on 8/19/15.
  20 + */
  21 +
  22 +object HourlyCommentCnt {
  23 +
  24 + def getSparkConf= {
  25 + //System.setProperty("SPARK_YARN_MODE", "true")
  26 + val conf = new SparkConf().setAppName("HourlyCommentCnt")
  27 + conf.setMaster("yarn-client")
  28 + conf.set("master", "yarn-client")
  29 + conf.set("spark.app.name", "HourlyCommentCnt")
  30 + conf.set("spark.akka.frameSize", "1024")
  31 + }
  32 +
  33 + val sc = new SparkContext(getSparkConf)
  34 + val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
  35 + val hadoopConf = sc.hadoopConfiguration
  36 + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  37 +
  38 + def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
  39 + val list = hdfs.listStatus(new Path(path))
  40 + list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
  41 + }
  42 +
  43 +
  44 + /**
  45 + * 한시간 단위로 MG_COMMENT에서 댓글 정보 가져오기
  46 + *
  47 + * @param sttDate : yyyy-MM-dd HH
  48 + * @param endDate : yyyy-MM-dd HH
  49 + * @return : RDD[
  50 + * Map[컬럼,값]
  51 + * ]
  52 + */
  53 +
  54 + def getCommentDB(sttDate:String,endDate:String) = {
  55 + val filterStr = s"where (cdate between '$sttDate:00' and '$endDate:00') and status ='ACTV'"
  56 + val fetchCol = Array("contents_id", "comment_id", "parent_comment_id", "uid", "cdate")
  57 + val maxCmtId = getMaxId(sqlContext,"comment")
  58 + val totalTable = readPartialTableBig(sqlContext,"REPL","new_pikicast_common","MG_COMMENT",
  59 + fetchCol,filterStr, "comment_id", maxCmtId,1000)
  60 + val selectedRDD: RDD[Map[String, Long]] = totalTable.select(totalTable("contents_id"), totalTable("parent_comment_id"),
  61 + totalTable("uid"), unix_timestamp(totalTable("cdate"))).map(x=>{
  62 + Map(
  63 + "contents_id"-> x.getAs[Long]("contents_id"),
  64 + "parent_comment_id" -> x.getAs[Long]("parent_comment_id"),
  65 + "uid" -> x.getAs[Long]("uid"),
  66 + "cdate" -> x.getAs[Long]("unixtimestamp(cdate,yyyy-MM-dd HH:mm:ss)")
  67 + )
  68 + })
  69 + selectedRDD.map(x=>{
  70 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  71 + val pid: String = try {
  72 + x("parent_comment_id").toString
  73 + } catch {
  74 + case e:Exception => "0"
  75 + }
  76 + Map(
  77 + "contents_id"-> x("contents_id").toString,
  78 + "parent_comment_id" -> pid,
  79 + "uid" -> x("uid").toString,
  80 + "cdate" -> format_date.format(x("cdate"))
  81 + )
  82 + })
  83 + }
  84 +
  85 + def getCommentParquet(sttDate:String, endDate:String) = {
  86 + val filePath = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_COMMENT",true,fs).getPath.toString
  87 + val totalTable = sqlContext.read.format("parquet").load(filePath)
  88 +
  89 + val selectedTable = totalTable.where(s"(cdate between '$sttDate:00' and '$endDate:00') and status ='ACTV'").select(totalTable("contents_id"), totalTable("parent_comment_id"),
  90 + totalTable("uid"), unix_timestamp(totalTable("cdate"))).na.fill(0)
  91 +
  92 + val selectedRDD = selectedTable.map(x=>{
  93 + Map(
  94 + "contents_id"-> x.getAs[Long]("contents_id"),
  95 + "parent_comment_id" -> x.getAs[Long]("parent_comment_id"),
  96 + "uid" -> x.getAs[Long]("uid"),
  97 + "cdate" -> x.getAs[Long]("unixtimestamp(cdate,yyyy-MM-dd HH:mm:ss)")
  98 + )
  99 + })
  100 +
  101 + selectedRDD.map(x=>{
  102 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  103 + val pid = x("parent_comment_id").toString
  104 + Map(
  105 + "contents_id"-> x("contents_id").toString,
  106 + "parent_comment_id" -> pid,
  107 + "uid" -> x("uid").toString,
  108 + "cdate" -> format_date.format(x("cdate"))
  109 + )
  110 + })
  111 + }
  112 +
  113 + /**
  114 + * 전체 작성 코멘트 수
  115 + *
  116 + * @param parsedDB : getCommentDB로 가져온 정보
  117 + * @return : RDD[
  118 + * ((cid, yyyy-MM-dd HH),
  119 + * (전체코멘트수, parent comment 수, comment 작성 유저 수)
  120 + * )]
  121 + */
  122 + def dailyComment_DB(parsedDB:RDD[Map[String, String]]) = {
  123 + parsedDB.groupBy(x => {
  124 + val datehr = x("cdate").take(13)
  125 + (datehr, x("contents_id"))
  126 + }).map(x =>{
  127 + val countAComment = x._2.size.toLong
  128 + val countPComment = x._2.count(a => a("parent_comment_id")== "0").toLong
  129 + val countUComment = x._2.map(a=>a("uid")).toSet.size.toLong
  130 + (x._1, (countAComment, countPComment, countUComment))
  131 + })
  132 + }
  133 +
  134 + /**
  135 + * 어제 날짜 전체 분량 계산, update
  136 + *
  137 + * @param nowTS: currentTimeMillis
  138 + */
  139 + def updateYesterdays(nowTS:Long) {
  140 + val format_d = new java.text.SimpleDateFormat("yyyy-MM-dd")
  141 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH")
  142 + val c = java.util.Calendar.getInstance
  143 + val midnight = format_d.format(nowTS)+" 00"
  144 + c.setTime(format_d.parse(midnight))
  145 +
  146 + val timestamps = (1 to 24).map(x => {
  147 + val cur = format_date.format(c.getTime)
  148 + c.add(java.util.Calendar.HOUR_OF_DAY, -1)
  149 + (cur, format_date.format(c.getTime))
  150 + })
  151 +
  152 + timestamps.map(s=>{
  153 + val stt = s._2
  154 + val edt = s._1
  155 + val byHour: RDD[((String, String), (Long, Long, Long))] = dailyComment_DB(getCommentParquet(stt,edt))
  156 + val stdate = stt.split(" ")(0).split("-")
  157 + val sttime = stt.split(" ")(1)
  158 + val filepath = s"hdfs://pikinn/preprocess/cmtCountByHour/y=${stdate(0)}/mo=${stdate(1)}/d=${stdate(2)}/h=$sttime/"
  159 + import org.apache.spark.sql.types.{StructType,StructField, LongType}
  160 + val schema = StructType(Array("contents_id", "all_comment", "parent_comment", "unique_comment").map(fieldName =>{
  161 + StructField(fieldName, LongType, true)
  162 + }))
  163 + val byHourFmt= byHour.map{
  164 + case ((datehr,cid),(a,p,u)) =>{
  165 + Row(cid.toLong, a, p, u)
  166 + }
  167 + }.coalesce(1)
  168 + val byHourDF = sqlContext.createDataFrame(byHourFmt,schema)
  169 + byHourDF.write.parquet(filepath)
  170 + })
  171 + }
  172 +
  173 + def main (args: Array[String]) {
  174 + val nowTS: Long = System.currentTimeMillis
  175 + /*
  176 + makeDateList("20151202",13,true,"yyyyMMdd").map(d=>{
  177 + val format_d = new java.text.SimpleDateFormat("yyyyMMdd")
  178 + val ts = format_d.parse(d).getTime
  179 + updateYesterdays(ts)
  180 + })
  181 + */
  182 + updateYesterdays(nowTS)
  183 + }
  184 +
  185 +}
app/com/piki_ds/preprocess/MakingLapse.scala View file @ 282422a
  1 +package com.piki_ds.preprocess
  2 +
  3 +import org.apache.hadoop.io.compress.GzipCodec
  4 +import org.apache.spark.rdd.RDD
  5 +import org.apache.spark.{SparkConf, SparkContext}
  6 +
  7 +import com.piki_ds.utils.GetTextFile.getLog
  8 +import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  9 +
  10 +/**
  11 + * making lapse
  12 + *
  13 + * Created by jungwon on 6/16/15.
  14 + */
  15 +
  16 +object MakingLapse {
  17 +
  18 + def getSparkConf = {
  19 + //System.setProperty("SPARK_YARN_MODE", "true")
  20 + val conf = new SparkConf().setAppName("MakingLapse")
  21 + conf.setMaster("yarn-client")
  22 + conf.set("master", "yarn-client")
  23 + conf.set("spark.app.name", "MakingLapse")
  24 + }
  25 +
  26 + val sc = new SparkContext(getSparkConf)
  27 +
  28 + def main(args:Array[String]) {
  29 + val nowTS: Long = System.currentTimeMillis
  30 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  31 + println(sc.getConf.toDebugString)
  32 + val a1 = if (args.length >0) args(0) else ""
  33 + val yesterday = if(a1.length == 8) a1 else yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  34 +
  35 + //optional: 큰덩치 로그(ex.15000줄 이상) 골라내기
  36 + val log = filterBig(getLog(sc, yesterday), countUser(getLog(sc, yesterday), 15000))
  37 + val groupedAndSorted = groupAndSort(log)
  38 + val lapse = addLapse(groupedAndSorted)
  39 + //map -> csv
  40 + val csvLapse = lapse.map(x=>{
  41 + s"${x("category")}|${x("eventType")}|${x("actionTime")}|${x("uuid")}|${x("field0")}|${x("field1")}|${x("lapse")}"
  42 + })
  43 + csvLapse.coalesce(100).saveAsTextFile(s"hdfs://pikinn/preprocess/lapse/$yesterday", classOf[GzipCodec])
  44 + }
  45 +
  46 +
  47 + /**
  48 + * 과하게 덩치 큰 유저를 걸러내기 위해 uuid당 로그 줄 수 계산
  49 + *
  50 + * @param log
  51 + * @param dungchi : 얼마나 큰?
  52 + * @return
  53 + */
  54 + def countUser(log:RDD[Array[String]], dungchi: Long): scala.collection.Map[String, Long] = {
  55 + log.filter(x => x.length > 3).map(x => (x(3), "-")).countByKey().filter(x => x._2 > dungchi)
  56 + }
  57 +
  58 + /**
  59 + * 계산한 덩치를 기준으로 필터링
  60 + *
  61 + * @param log
  62 + * @param bigUserMap
  63 + * @return
  64 + */
  65 + def filterBig(log:RDD[Array[String]], bigUserMap:scala.collection.Map[String, Long]) ={
  66 + log.filter(x=> !bigUserMap.contains(x(3)))
  67 + }
  68 +
  69 + /**
  70 + * uuid로 groupBy, 시간 순으로 sortBy
  71 + *
  72 + * @param log
  73 + * @return
  74 + */
  75 + def groupAndSort(log:RDD[Array[String]]) = {
  76 + val groupedRanker = log.groupBy(x => x(3))
  77 + groupedRanker.map(x => (x._1, x._2.toSeq.sortBy(a => a(2))))
  78 + }
  79 +
  80 + /** 해당 action에서 다음 action으로 넘어가기까지 걸린 시간
  81 + *
  82 + * @param gsLog : grouped and sorted log
  83 + * @return
  84 + */
  85 + def addLapse(gsLog: RDD[(String, Seq[Array[String]])]) ={
  86 + gsLog.map(x=>{
  87 + val c = x._2
  88 + val withLapse = c.indices.map(i => {
  89 + val l = c(i)
  90 + val lapse = if (i > 0) {
  91 + try {
  92 + l(2).toLong - c(i - 1)(2).toLong
  93 + } catch {
  94 + case e: Exception => 0L
  95 + }
  96 + } else 0L
  97 +
  98 + val category = l(0)
  99 + val eventType = l(1)
  100 + val actionTime = l(2)
  101 + val uuid = l(3)
  102 + val field0 = l(4)
  103 + val field1 = if (l.length > 5) l(5) else ""
  104 +
  105 + if (category.nonEmpty && eventType.nonEmpty && actionTime.nonEmpty && uuid.nonEmpty){
  106 + Some(
  107 + Map("category" -> category,
  108 + "eventType" ->eventType,
  109 + "actionTime" -> actionTime,
  110 + "uuid" -> uuid,
  111 + "lapse" -> math.max(0, math.min(lapse,1000L * 60L * 1L)).toString,
  112 + "field0"->field0,
  113 + "field1"->field1))
  114 + } else None
  115 + }).flatMap(x=>x)
  116 + withLapse
  117 + }).flatMap(x=>x)
  118 + }
  119 +
  120 +}
app/com/piki_ds/preprocess/MapWithRank.scala View file @ 282422a
  1 +package com.piki_ds.preprocess
  2 +
  3 +import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  4 +import com.piki_ds.utils.GetTextFile.{getLapse,getLog}
  5 +
  6 +import org.apache.spark.{SparkContext, SparkConf}
  7 +import org.apache.spark.rdd.RDD
  8 +import org.json4s.jackson.JsonMethods.parse
  9 +
  10 +
  11 +/**
  12 + * ggh를 위한 preprocessing
  13 + * Created by jungwon on 8/25/15.
  14 + */
  15 +object MapWithRank {
  16 + def getSparkConf = {
  17 + //System.setProperty("SPARK_YARN_MODE", "true")
  18 + val conf = new SparkConf().setAppName("MapWithRank")
  19 + conf.setMaster("yarn-client")
  20 + conf.set("master", "yarn-client")
  21 + conf.set("spark.app.name", "MapWithRank")
  22 + }
  23 +
  24 + val sc = new SparkContext(getSparkConf)
  25 +
  26 + def main(args: Array[String]) {
  27 + val nowTS: Long = System.currentTimeMillis
  28 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  29 +
  30 + val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  31 +
  32 + val log = getLog(sc, doi)
  33 + val joinedLog = joinLogWithLapse(log, getLapse(sc, doi, "CONSUME"))
  34 + val mapped = mapActions(joinedLog, countUser(log, 10000))
  35 + val ecWithRank: RDD[(String, (Int, String), (Long, Long))] = mapRanges(mapped)
  36 +
  37 + val csvMapRank = ecWithRank.map(x=>s"${x._1},${x._2._1},${x._2._2},${x._3._1},${x._3._2}")
  38 + csvMapRank.saveAsTextFile(s"hdfs://pikinn/preprocess/ecWithRank/$doi/")
  39 +
  40 + }
  41 +
  42 + /**
  43 + * filter users with log line size larger than limit
  44 + *
  45 + * @param log : 로그를 넣어서
  46 + * @param limit : groupBy uuid해서 limit보다 큰 로그규모를 가지고 있으면,
  47 + * @return : 그 uuid, count를 Map(String,Int)으로 출력
  48 + */
  49 + def countUser(log: RDD[Array[String]], limit: Int) = {
  50 + log.filter(x => x.length > 3).map(x => (x(3), "-")).countByKey().filter(x => x._2 > limit)
  51 + }
  52 +
  53 +
  54 + /**
  55 + * 구해놓은 lapse와 log를 join
  56 + *
  57 + * @param log : 동일한 doi
  58 + * @param lapse : 동일한 doi
  59 + * @return : CONSUME의 경우 replace된 log
  60 + */
  61 +
  62 + def joinLogWithLapse(log:RDD[Array[String]],lapse: RDD[((String, String, String, String), String)]) ={
  63 + val tempJoined = log.filter(_.size>5).map(x=>((x(1),x(2),x(3),x(5)),x)).leftOuterJoin(lapse)
  64 + tempJoined.map(x=>{
  65 + val l: Array[String] = x._2._1
  66 + if (x._2._2.isEmpty) {
  67 + Some(l)
  68 + }
  69 + else {
  70 + if (l.length==6){
  71 + Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), x._2._2.get))
  72 + } else if (l.length<9 && l.length>6 && l(6).forall(_.isDigit)){
  73 + Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), scala.math.min(x._2._2.get.toLong,l(6).toLong).toString))
  74 + } else if (l.length==9 && l(7).forall(_.isDigit)){
  75 + Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), scala.math.min(x._2._2.get.toLong,l(7).toLong).toString))
  76 + }
  77 + else None
  78 + }})
  79 + .flatMap(x=>x)
  80 + }
  81 +
  82 +
  83 + def splitByVersion(joinedLog: RDD[Array[String]]) = {
  84 +
  85 + }
  86 +
  87 + /**
  88 + * processed log를 다듬는 과정: formatting, min-max처리, 큰덩어리 filtering
  89 + *
  90 + * @param log: processed log
  91 + * @param bigUser: countUser로 큰 덩어리 유저는 제외
  92 + * @return : (
  93 + * mapping: Map[String,String],
  94 + * Some(loadInfo: List[(timeStamp:String, rank:Int)]),
  95 + * Some((expTime:Long, conTime:Long))
  96 + * )
  97 + */
  98 +
  99 + def mapActions(log:RDD[Array[String]], bigUser:scala.collection.Map[String,Long]) = {
  100 + log.map(s=>{
  101 + if (s.length > 5 && !bigUser.contains(s(3))) {
  102 + try {
  103 +
  104 + val format_date = new java.text.SimpleDateFormat("yyyyMMdd HHmm")
  105 + val genTime = if(s(2).length == 10) s(2)+"000" else s(2)
  106 + val mapping = Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1),
  107 + "time" -> genTime, "uuid" -> s(3),
  108 + "dt" -> format_date.format(new java.util.Date(genTime.toLong)).take(11))
  109 + if (s(0).equals("COMMON") && s(1).equals("LOAD") && s(4).equals("m")){
  110 + val jsonString = s(5).replace("\"content\"", "\"contents\"")
  111 + val parsed = parse(jsonString).values.asInstanceOf[Map[String, AnyRef]]("contents")
  112 + val inner: List[(BigInt, Int)] = parsed.asInstanceOf[List[Map[String, List[BigInt]]]].flatMap(x=>x.filter(x=> !x._1.startsWith("r")).values).flatten.zipWithIndex
  113 + Some((mapping, inner.map(a=>(a._1, scala.math.min(a._2, 50))), null)) //50 이상의 노출랭킹은 의미없다고 간주
  114 + }
  115 + else if (s(4).forall(_.isDigit)) {
  116 + import scala.math.{min,max}
  117 + //log version별 처리
  118 + if (s.length == 10 && s(1).equals("EXPOSURE")) {
  119 + Some((mapping, null, (s(4), (min(max(0L,s(5).toLong-s(6).toLong),1000*10), 0L), Some(s(9).toInt))))
  120 + }
  121 + else if (s.length > 6 && s(1).equals("EXPOSURE")){
  122 + Some((mapping, null, (s(4), (min(max(0L,s(5).toLong-s(6).toLong),1000*10), 0L), None)))
  123 + }
  124 + else if (s.length > 6 && s(1).equals("CONSUME")) {
  125 + Some((mapping, null, (s(4), (0L, min(1000L*30, if (s(6).isEmpty) 0L else s(6).toLong)), None)))
  126 + }
  127 + else None
  128 + }
  129 + else None
  130 + } catch {
  131 + case e: Exception => None
  132 + }
  133 + }
  134 + else None
  135 + }
  136 + ).flatMap(x => x)
  137 + }
  138 +
  139 +
  140 + /**
  141 + * LOAD의 노출위치 정보에 따라 로그 매핑
  142 + *
  143 + * @param mappedLog:
  144 + * RDD[
  145 + * (mapping: Map[String,String]
  146 + * , Some(loadInfo: List[(timeStamp:String, rank:Int)])
  147 + * , Some((expTime:Long, conTime:Long)))]
  148 + * @return :
  149 + * RDD[
  150 + * ((자리: Int, cid: String)
  151 + * , 노출총시간: Long)]
  152 + */
  153 +
  154 + def mapRanges(mappedLog: RDD[(Map[String, String], List[(BigInt, Int)], (String, (Long, Long), Option[Int]))]) = {
  155 + mappedLog.groupBy(x => x._1("uuid")).map(x => {
  156 + try {
  157 + // COMMON|LOAD 로그만 가져와서 각 rank 마다 매핑하는 과정
  158 + val loads = x._2.filter(a => a._3 == null)
  159 + if (loads.nonEmpty) {
  160 + val loadList = loads.map(a => (a._1("time").toLong, a))
  161 + val rangeLoads = loadList.foldLeft((
  162 + scala.collection.mutable.ArrayBuffer.empty[(Long, Long, List[(BigInt, Int)])], 0L))(
  163 + (a, b) => {
  164 + a._1 += ((a._2, b._1, b._2._2))
  165 + (a._1, b._1)
  166 + })._1
  167 + //최대값을 찾아서 매핑될 마지막 부분을 rangeLoads에 추가해준다. (최대값~ )에 해당하는 콘텐츠
  168 + val max = loads.maxBy(a => a._1("time").toLong)
  169 + rangeLoads += ((max._1("time").toLong, Long.MaxValue, max._2))
  170 +
  171 + //COMMON|LOAD가 아닌 그외의 로그들(현재로써는 EXPOSURE & CONSUME)을 rangeLoads에 매핑시켜준다
  172 + val expcon = x._2.filter(a => a._2 == null)
  173 + val mapWithLoad = expcon.map(a => {
  174 + val cid = a._3._1
  175 + if (a._3._3.isEmpty){
  176 + val location: List[(BigInt, Int)] = rangeLoads.filter(b => {
  177 + b._1 <= a._1("time").toLong && b._2 > a._1("time").toLong
  178 + }).head._3.filter(c => c._1.toString().equals(cid))
  179 + (location, (a._1, a._3))
  180 + }
  181 + else (List((BigInt(0),a._3._3.get)),(a._1, a._3))
  182 + }).filter(_._1.nonEmpty).map(a => (a._1.head._2, (a._2._2._1, a._2._2._2))) //expcon의 ._2 는 필요없으므로(null) 매핑시 제외
  183 +
  184 + //cid에 따른 각 자리(rank)마다 노출시간을 넣어준다
  185 + val rankedMapping = mapWithLoad.groupBy(m => (m._1, m._2._1)).map(m=>{
  186 + val cuml = m._2.map(a=>a._2._2).reduce((q,w)=>(q._1+w._1,q._2+w._2))
  187 + val consumeTime = scala.math.min(cuml._2, 4000000L)
  188 + (x._1, m._1, (cuml._1, consumeTime))
  189 + })
  190 + Some(rankedMapping)
  191 + }
  192 + else None
  193 + }
  194 + catch {
  195 + case e: Exception =>
  196 + None
  197 + }
  198 + }).flatMap(x => x).flatMap(x => x)
  199 + //최종형태(uuid, (노출위치, cid), (노출시간, 소비시간))
  200 + }
  201 +
  202 +}
app/com/piki_ds/utils/DateTimeEtc.scala View file @ 282422a
  1 +package com.piki_ds.utils
  2 +
  3 +/**
  4 + *
  5 + * Created by jungwon on 8/17/15.
  6 + */
  7 +object DateTimeEtc {
  8 + def intoTimeMillis(yr:Int, mo:Int, d:Int, hr:Int, min:Int) = {
  9 + val c = java.util.Calendar.getInstance()
  10 + c.set(yr,mo,d,hr,min,0)
  11 + c.getTimeInMillis()
  12 + }
  13 +
  14 + def intoTimeString(yr:Int, mo:Int, d:Int, hr:Int, min:Int, dType:String): String = {
  15 + val format_date = new java.text.SimpleDateFormat(dType)
  16 + val c = java.util.Calendar.getInstance()
  17 + c.set(yr,mo,d,hr,min,0)
  18 + format_date.format(c.getTimeInMillis)
  19 + }
  20 +
  21 + def getToday(timeMillis:Long) = {
  22 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd")
  23 + format_date.format(timeMillis)
  24 + }
  25 +
  26 + def intoYesterdayMN (nowTS:Long) = {
  27 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd")
  28 + val c = java.util.Calendar.getInstance
  29 + c.setTimeInMillis(nowTS)
  30 + c.add(java.util.Calendar.DAY_OF_WEEK, -1)
  31 + val midnight = format_date.format(c.getTime)
  32 + (format_date.parse(midnight).getTime,midnight+" 00:00:00")
  33 + }
  34 +
  35 + def intoNDaysTS (nowTS:Long, n:Int, goUP:Boolean) = {
  36 + val incre = if (goUP) n else n*(-1)
  37 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  38 + val c = java.util.Calendar.getInstance
  39 + c.setTimeInMillis(nowTS)
  40 + c.add(java.util.Calendar.DAY_OF_WEEK, incre)
  41 + val beforeNh = format_date.format(c.getTime)
  42 + (format_date.parse(beforeNh).getTime, beforeNh)
  43 + }
  44 +
  45 + def intoNDaysDT (nowDT:String, n:Int, goUP:Boolean) = {
  46 + val incre = if (goUP) n else n*(-1)
  47 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  48 + val c = java.util.Calendar.getInstance
  49 + c.setTimeInMillis(format_date.parse(nowDT).getTime())
  50 + c.add(java.util.Calendar.DAY_OF_WEEK, incre)
  51 + val beforeNh = format_date.format(c.getTime)
  52 + (format_date.parse(beforeNh).getTime, beforeNh)
  53 + }
  54 +
  55 + def into24hBefore (nowTS:Long) = {
  56 + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  57 + val c = java.util.Calendar.getInstance
  58 + c.setTimeInMillis(nowTS)
  59 + c.add(java.util.Calendar.DAY_OF_WEEK, -1)
  60 + val before24h = format_date.format(c.getTime)
  61 + (format_date.parse(before24h).getTime, before24h)
  62 + }
  63 +
  64 + def makeDateList(limit:String, range:Int, goUp:Boolean, intoFormat:String) = {
  65 + val c = java.util.Calendar.getInstance
  66 + val format_date = new java.text.SimpleDateFormat(intoFormat)
  67 + c.setTimeInMillis(format_date.parse(limit).getTime)
  68 + (1 to range).map(x=>{
  69 + if (goUp) c.add(java.util.Calendar.DAY_OF_MONTH,1) else c.add(java.util.Calendar.DAY_OF_MONTH,-1)
  70 + format_date.format(c.getTime)
  71 + })
  72 + }
  73 +
  74 + def makeHourList(start:String, limit:Long, goUp:Boolean, intoFormat:String) = {
  75 + val c = java.util.Calendar.getInstance
  76 + val format_date = new java.text.SimpleDateFormat(intoFormat)
  77 + c.setTimeInMillis(format_date.parse(start).getTime)
  78 + var hrList = scala.collection.mutable.ListBuffer.empty[String]
  79 + while (c.getTimeInMillis < limit){
  80 + if (goUp) c.add(java.util.Calendar.HOUR_OF_DAY,1) else c.add(java.util.Calendar.HOUR_OF_DAY,-1)
  81 + hrList += format_date.format(c.getTime)
  82 + }
  83 + hrList.toList
  84 + }
  85 +}
app/com/piki_ds/utils/GeneralTransformers.scala View file @ 282422a
  1 +package com.piki_ds.utils
  2 +
  3 +import org.apache.spark.SparkContext
  4 +import org.apache.spark.rdd.RDD
  5 +
  6 +/**
  7 + * Created by jungwon on 7/29/15.
  8 + * frequently used transformations
  9 + */
  10 +
  11 +object GeneralTransformers {
  12 +
  13 + /**
  14 + * logit transformation
  15 + * @param num: Double
  16 + * @return : Double
  17 + */
  18 + def make_logit(num:Double) ={
  19 + 1.0D/(1+scala.math.exp((-1)*num+1))
  20 + }
  21 +
  22 + /**
  23 + * scaling doubles into doubles ranging [0,1]
  24 + * @param rawInp: RDD[(key, value to be transformed)]
  25 + * @return : RDD[(key, scaled value)]
  26 + */
  27 + def make_0to1(rawInp:RDD[(String,Double)]) = {
  28 + val inp = rawInp.filter(! _._2.isNaN)
  29 + val mini = inp.map(x=>x._2).min()
  30 + val maxi = inp.map(x=>x._2).max()
  31 + inp.map(x=>{
  32 + val scaled = (x._2 - mini)/(maxi-mini+0.000001)
  33 + (x._1, scaled)
  34 + })
  35 + }
  36 +
  37 + def make_0to1_2Key(rawInp:RDD[((String,String),Double)]) = {
  38 + val inp = rawInp.filter(! _._2.isNaN)
  39 + val mini = inp.map(x=>x._2).min()
  40 + val maxi = inp.map(x=>x._2).max()
  41 + inp.map(x=>{
  42 + val scaled = (x._2 - mini+0.01)/(maxi-mini+0.01)
  43 + (x._1, scaled)
  44 + })
  45 + }
  46 +
  47 + def make_0to1Rating(rawInp:RDD[(String,Double)]) = {
  48 + val inp = rawInp.filter(! _._2.isNaN)
  49 + val mini = inp.map(x=>x._2).min()
  50 + val maxi = inp.map(x=>x._2).max()
  51 + inp.map(x=>{
  52 + val scaled = (x._2 - mini)/(maxi-mini)
  53 + (x._1, scaled)
  54 + })
  55 + }
  56 +
  57 + def cDFWei(inp:RDD[(String,Double)],l:Double,k:Double) = {
  58 + inp.map(x=>{
  59 + val tr = 1-scala.math.exp(-scala.math.pow(x._2.toDouble/l,k))
  60 + (x._1, tr)
  61 + })
  62 + }
  63 +
  64 + def genLogic(inp:Long) = {
  65 + import scala.math.{pow,sqrt}
  66 + inp.toDouble/sqrt(pow(inp,2)+8000)
  67 + }
  68 +
  69 + /**
  70 + * transform into Weibull distribution
  71 + * @param raw: RDD[(key, value to be transformed)]
  72 + * @param lambd: lambda the scale parameter
  73 + * @param k: the shape parameter
  74 + * @return : RDD[(key, transformed value)]
  75 + */
  76 + def make_Weibull(raw:RDD[(String, Double)], lambd:Int, k:Double) = {
  77 + raw.map(x=>{
  78 + import scala.math.{exp, pow}
  79 + val transformed = (k/lambd)*pow(x._2/lambd,k-1)*exp(-pow(x._2/lambd,k))
  80 + (x._1,transformed)
  81 + })
  82 + }
  83 +
  84 + def stringToRdd(sc:SparkContext, str: String): RDD[(String, Long)] = {
  85 + sc.parallelize(str.split("\n").map(x=> {
  86 + try{
  87 + Some(x.split(",")(0), x.split(",")(1).toLong)
  88 + }
  89 + catch { case e:Exception => Some((x,0L))}
  90 + })).flatMap(x=>x)
  91 + }
  92 +
  93 + def zscore(raw:RDD[(String, Long)]) = {
  94 + import org.apache.spark.mllib.linalg._
  95 + import org.apache.spark.mllib.stat.Statistics
  96 + val vec = raw.map(x => {
  97 + Vectors.dense(x._2.toDouble)
  98 + })
  99 + val sumStat = Statistics.colStats(vec)
  100 + val mean = sumStat.mean(0)
  101 + val stdev = scala.math.sqrt(sumStat.variance(0))
  102 + raw.map(x=>{
  103 + val z= (x._2.toDouble - mean)/stdev
  104 + (x._1, z)
  105 + })
  106 + }
  107 +
  108 + def trTanh(raw:Double, a:Double, b:Double) = {
  109 + val en2 = scala.math.exp((-2)*raw)
  110 + (1-en2)/(1+en2)
  111 + }
  112 +}
app/com/piki_ds/utils/GetTextFile.scala View file @ 282422a
  1 +package com.piki_ds.utils
  2 +
  3 +import org.apache.spark.SparkContext
  4 +import org.apache.spark.rdd.RDD
  5 +import org.apache.spark.sql.{SQLContext,DataFrame}
  6 +
  7 +/**
  8 + * TextFile을 가져와서 RDD로 변환하는 모듈. log와 lapse를 가져올 때 활용
  9 + *
  10 + * Created by jungwon on 7/14/15.
  11 + */
  12 +
  13 +object GetTextFile {
  14 +
  15 + def getDashDump(sQLContext: SQLContext, tableName:String) = {
  16 + sQLContext.read.parquet(s"hdfs://pikinn/preprocess/db/table=$tableName/")
  17 + }
  18 +
  19 + /**
  20 + * 관심 날짜의 로그를 가져온다
  21 + *
  22 + * @param dateOfInterest : yyyyMMdd
  23 + * @return : delimiter로 split된 로그를 뱉어준다
  24 + */
  25 + def getLog(sc:SparkContext, dateOfInterest:String) = {
  26 + println("DEBUG getLog : " + dateOfInterest )
  27 + val yr = dateOfInterest.take(4)
  28 + val mo = dateOfInterest.slice(4,6)
  29 + val day = dateOfInterest.slice(6,8)
  30 + sc.textFile(s"hdfs://pikinn/log/dev=app/y=$yr/mo=$mo/d=$day/h=*/mi=*/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
  31 + x.size>4 && x(2).forall(_.isDigit) && x(2).nonEmpty &&
  32 + x(3).forall(_.isDigit) && x(3).forall(_.isDigit)}).map(x=>{
  33 + val genTime = if(x(2).length == 10) x(2)+"000" else x(2)
  34 + x(2) = genTime
  35 + x
  36 + })
  37 + }
  38 +
  39 + /**
  40 + * 미리 구해놓은 lapse 가져오기
  41 + *
  42 + * @param dateKey : yyyyMMdd
  43 + * @return : ((EVENT, actionTime, uuid), lapse)
  44 + */
  45 +
  46 + def getLapse(sc:SparkContext, dateKey:String, action:String)= {
  47 + val parseFilter = sc.textFile(s"hdfs://pikinn/preprocess/lapse/$dateKey/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
  48 + x.size==7 && x(2).forall(_.isDigit) && x(3).forall(_.isDigit) && x(4).forall(_.isDigit)
  49 + })
  50 + parseFilter.map(x=>{
  51 + ((x(1),x(2),x(3),x(4)),x(6))
  52 + })
  53 + }
  54 +
  55 + def getUserInfo(sc:SparkContext, dateOfInterest:String) = {
  56 + val c = java.util.Calendar.getInstance
  57 + val format_date = new java.text.SimpleDateFormat("yyyyMMdd")
  58 + val startingPoint = format_date.parse(dateOfInterest)
  59 + c.setTime(startingPoint)
  60 + c.add(java.util.Calendar.DATE, -1)
  61 + val fetchDate = format_date.format(c.getTime)
  62 + val textF:RDD[(String,Long)] = sc.objectFile(s"hdfs://pikinn/user/joanne/userReadAmt/$fetchDate/part*")
  63 + textF
  64 + }
  65 +
  66 + def getCardSize(sc:SparkContext, doi: String) = {
  67 + val cidAndCS: RDD[(String, Int)] = sc.objectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi/part*")
  68 + cidAndCS
  69 + }
  70 +
  71 + def parseHueDB(sc: SparkContext, dbDate:String, tableName:String, dbLabel:Array[String]) = {
  72 + val dbDir = sc.textFile(s"/data/db/$dbDate/$tableName/\\*.gz")
  73 + val dbRaw = dbDir.map(x=>{
  74 + import au.com.bytecode.opencsv.CSVParser
  75 + val parser = new CSVParser(',',''')
  76 + val s = parser.parseLine(x)
  77 + if (s.size==dbLabel.length) Some(dbLabel.zip(s).toMap) else None
  78 + }).flatMap(x=>x)
  79 + dbRaw
  80 + }
  81 +
  82 + def getContentsViewTable(sqlContext:SQLContext, table:String, saveDate:String) = {
  83 + val pathStr = s"/user/dashboard/db/${table.toLowerCase}/y=${saveDate.take(4)}/mo=${saveDate.slice(4, 6)}/d=${saveDate.slice(6, 8)}/*.json"
  84 + val df = sqlContext.read.json(pathStr)
  85 + df
  86 + }
  87 +
  88 + def getDashTable(sqlContext:SQLContext, table:String, saveDate:String): DataFrame= {
  89 + val pathStr = s"/user/dashboard/db/${table.toLowerCase}/y=${saveDate.take(4)}/mo=${saveDate.slice(4, 6)}/$saveDate.json"
  90 + val df = sqlContext.read.json(pathStr)
  91 + df
  92 + }
  93 +}
app/com/piki_ds/utils/SqlContextConf.scala View file @ 282422a
  1 +package com.piki_ds.utils
  2 +
  3 +import org.apache.spark.rdd.RDD
  4 +import org.apache.spark.sql._
  5 +
  6 +/**
  7 + * Created by jungwon on 11/26/15.
  8 + */
  9 +
  10 +object SqlContextConf{
  11 +
  12 + def readAllTable(sqlContext:SQLContext, dbName:String, scheme:String, tableName:String,
  13 + partitionCol:String, upperBound:Long,
  14 + numPartition:Int): DataFrame = {
  15 + // val url = if(dbName.equals("REPL")) "jdbc:mysql://192.168.30.61:3306" else "jdbc:mysql://192.168.20.61:3306"
  16 + val url = if(dbName.equals("REPL")) "jdbc:mysql:replication://b3.piki.work,b3.piki.work,b2.piki.info,b1.piki.work" else "jdbc:mysql://192.168.20.61:3306"
  17 + val user = if(dbName.equals("REPL")) "pikids" else "ds"
  18 + sqlContext.read.format("jdbc").options(
  19 + Map(
  20 + "driver"->"com.mysql.jdbc.Driver",
  21 + "dbtable"-> (scheme + "." + tableName),
  22 + "user" -> user,
  23 + "password" -> "zmfflr2715",
  24 + "partitionColumn" -> partitionCol,
  25 + "lowerBound" -> "0",
  26 + "upperBound" -> upperBound.toString,
  27 + "numPartitions" -> numPartition.toString,
  28 + "url"->
  29 + s"""$url/$scheme?charset=utf8&use_unicode=0&characterSetResults=utf8&characterEncoding=utf8&roundRobinLoadBalance=true&failOverReadOnly=true&allowMasterDownConnections=true&autoReconnect=true&autoReconnect=true""")
  30 + ).load
  31 + }
  32 +
  33 + def readPartialTable(sqlContext:SQLContext, dbName:String, scheme:String, tableName:String,
  34 + select:Array[String], additional:String):DataFrame = {
  35 + // val url = if(dbName.equals("REPL")) "jdbc:mysql://192.168.30.61:3306" else "jdbc:mysql://192.168.20.61:3306"
  36 + val url = if(dbName.equals("REPL")) "jdbc:mysql:replication://b3.piki.work,b3.piki.work,b2.piki.info,b1.piki.work" else "jdbc:mysql://192.168.20.61:3306"
  37 + val user = if(dbName.equals("REPL")) "pikids" else "ds"
  38 + val tableQuery = s"(select ${select.mkString(",")} from ${scheme + "." + tableName} $additional) named"
  39 + sqlContext.read.format("jdbc").options(
  40 + Map(
  41 + "driver"->"com.mysql.jdbc.Driver",
  42 + "dbtable"-> tableQuery,
  43 + "user" -> user,
  44 + "password" -> "zmfflr2715",
  45 + "url"->
  46 + s"""$url/$scheme?charset=utf8&use_unicode=0&characterSetResults=utf8&characterEncoding=utf8&roundRobinLoadBalance=true&failOverReadOnly=true&allowMasterDownConnections=true&autoReconnect=true&autoReconnect=true""")
  47 + ).load
  48 + }
  49 +
  50 + def readPartialTableBig(sqlContext:SQLContext, dbName:String, scheme:String, tableName:String,
  51 + select:Array[String], additional:String,
  52 + partitionCol:String, upperBound:Long,
  53 + numPartition:Int):DataFrame = {
  54 + // val url = if(dbName.equals("REPL")) "jdbc:mysql://192.168.30.61:3306" else "jdbc:mysql://192.168.20.61:3306"
  55 + val url = if(dbName.equals("REPL")) "jdbc:mysql:replication://b3.piki.work,b3.piki.work,b2.piki.info,b1.piki.work" else "jdbc:mysql://192.168.20.61:3306"
  56 + val user = if(dbName.equals("REPL")) "pikids" else "ds"
  57 + val tableQuery = s"(select ${select.mkString(",")} from ${scheme + "." + tableName} $additional) named"
  58 + sqlContext.read.format("jdbc").options(
  59 + Map(
  60 + "driver"->"com.mysql.jdbc.Driver",
  61 + "dbtable"-> tableQuery,
  62 + "user" -> user,
  63 + "password" -> "zmfflr2715",
  64 + "partitionColumn" -> partitionCol,
  65 + "lowerBound" -> "0",
  66 + "upperBound" -> upperBound.toString,
  67 + "numPartitions" -> numPartition.toString,
  68 + "url"->
  69 + s"""$url/$scheme?charset=utf8&use_unicode=0&characterSetResults=utf8&characterEncoding=utf8&roundRobinLoadBalance=true&failOverReadOnly=true&allowMasterDownConnections=true&autoReconnect=true&autoReconnect=true""")
  70 + ).load
  71 + }
  72 +
  73 + def getMaxId(sqlContext:SQLContext, idType:String) = {
  74 + val tableIdMap = Map("content" -> ("MG_CONTENTS","contents_id"), "comment" -> ("MG_COMMENT","comment_id"),
  75 + "user" -> ("USER","uid"), "uniqueuser" -> ("UNIQUEUSER","uuid"))
  76 + val tableName = tableIdMap(idType)._1
  77 + val idName = tableIdMap(idType)._2
  78 +
  79 + val maxIdTableGet = readPartialTable(sqlContext, "REPL", "new_pikicast_common", tableName, Array(idName), s"order by $idName desc limit 1")
  80 + maxIdTableGet.take(1).head.getAs[Long](idName)+100L
  81 + }
  82 +
  83 + def dFToRDDMap(df:DataFrame,fields:Array[String]): RDD[Map[String, String]] = {
  84 + df.rdd.map(x=>{
  85 + (0 until x.size).map(a=>x.getAs[String](a))
  86 + }).map(x=>fields.zip(x.toSeq).toMap)
  87 + }
  88 +
  89 + def selectTable(table:DataFrame, selectVar:Seq[String], where:String): DataFrame = {
  90 + table.select(selectVar(0), selectVar.drop(1): _*).where(where)
  91 + }
  92 +
  93 +}
app/com/piki_ds/utils/TempScoreSaveLoad.scala View file @ 282422a
  1 +package com.piki_ds.utils
  2 +
  3 +import org.apache.spark.rdd.RDD
  4 +import org.apache.spark.SparkContext
  5 +
  6 +/**
  7 + * Created by jungwon on 11/18/15.
  8 + */
  9 +
  10 +object TempScoreSaveLoad {
  11 + def scoreSave(doi:String, scoreType:String, subScore: String, input:RDD[(String,Long)], coal:Int) = {
  12 + input.map{
  13 + case (cid, score) =>
  14 + s"$cid,$score"
  15 + }.coalesce(coal).saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/$doi")
  16 + }
  17 + def scoreLoad(sc:SparkContext, doi:String, scoreType:String, subScore: String) = {
  18 + val scoreTxt = sc.textFile(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/$doi")
  19 + scoreTxt.map(x=>{
  20 + val s = x.split(",")
  21 + (s(0).toInt, s(1).toInt)
  22 + })
  23 + }
  24 +}
app/com/piki_ds/ver1/ConClickScore.scala View file @ 282422a
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.rdd.RDD
  4 +import com.piki_ds.utils.GeneralTransformers.make_logit
  5 +
  6 +/**
  7 + * Created by jungwon on 8/4/15.
  8 + * Dashboard 활용해서 click score update
  9 + */
  10 +
  11 +//uview 확인
  12 +
  13 +object ConClickScore {
  14 +
  15 + def rawDashClick(rawDash: RDD[Map[String, String]]) = {
  16 + rawDash.map(x => (x("cid"), (x("expCnt").toDouble.toLong, x("view").toDouble.toLong)))
  17 + }
  18 +
  19 + def updateClickInfo(cidOfInterest: RDD[(Long, (Int, Long))], rawClick: RDD[(String, (Long, Long))]) = {
  20 + val reduced = rawClick.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x=>(x._1.toLong,x))
  21 + cidOfInterest.join(reduced).map(_._2._2)
  22 + }
  23 +
  24 + def findNormalStat(raw: RDD[(String, (Long, Long))]): Map[String, Array[Long]] = {
  25 + import org.apache.spark.mllib.linalg._
  26 + import org.apache.spark.mllib.stat.Statistics
  27 + val vec = raw.map(x => Vectors.dense(x._2._1, x._2._2))
  28 + val sumStat = Statistics.colStats(vec)
  29 + val mean = sumStat.mean.toArray.map(_.toLong)
  30 + val std = sumStat.variance.toArray.map(x => math.sqrt(x).toLong)
  31 + Map("mean" -> mean, "std" -> std)
  32 + }
  33 +
  34 + def refineForScoring(raw: RDD[(String, (Long, Long))], outlierInfo: Map[String, Array[Long]]) = {
  35 + val expOutlier = outlierInfo("mean")(0) + 3 * outlierInfo("std")(0)
  36 + val conOutlier = outlierInfo("mean")(1) + 3 * outlierInfo("std")(1)
  37 + raw.map(x => (x._1, (math.min(x._2._1, expOutlier), math.min(x._2._2, conOutlier))))
  38 + }
  39 +
  40 + def clickScoreCalc(cidOfInterest: RDD[(Long, (Int, Long))], rawDash: RDD[Map[String, String]]): RDD[(String, Long)] = {
  41 + val raw = updateClickInfo(cidOfInterest, rawDashClick(rawDash))
  42 + val outlierInfo = findNormalStat(raw)
  43 + val refined = refineForScoring(raw, outlierInfo)
  44 + refined.map(x => (x._1, x._2._2.toDouble / x._2._1)).map(x => (x._1, math.min(999L, (1000 * make_logit(x._2)).toLong)))
  45 + }
  46 +
  47 +}
app/com/piki_ds/ver1/ConDwellScore.scala View file @ 282422a
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.rdd.RDD
  4 +
  5 +import com.piki_ds.utils.GeneralTransformers._
  6 +
  7 +/**
  8 + * 체류시간 가중 점수
  9 + * Created by jungwon on 8/5/15.
  10 + */
  11 +
  12 +object ConDwellScore {
  13 +
  14 +
  15 + def rawDashDwell(rawDash: RDD[Map[String, String]]): RDD[(String, (Long, Long))] = {
  16 + rawDash.map(x => (x("cid"), (x("expTime").toDouble.toLong, x("consumeTime").toDouble.toLong)))
  17 + }
  18 +
  19 + def updateDwellInfo(cidOfInterest: RDD[(Long, (Int, Long))], rawDwell: RDD[(String, (Long, Long))]) = {
  20 + val reduced = rawDwell.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x=>(x._1.toLong,x))
  21 + cidOfInterest.join(reduced).map(_._2._2)
  22 + }
  23 +
  24 + def findNormalStat(raw: RDD[(String, (Long, Long))]): Map[String, Array[Long]] = {
  25 + import org.apache.spark.mllib.linalg._
  26 + import org.apache.spark.mllib.stat.Statistics
  27 + val vec = raw.map(x => Vectors.dense(x._2._1, x._2._2))
  28 + val sumStat = Statistics.colStats(vec)
  29 + val mean = sumStat.mean.toArray.map(_.toLong)
  30 + val std = sumStat.variance.toArray.map(x => math.sqrt(x).toLong)
  31 + Map("mean" -> mean, "std" -> std)
  32 + }
  33 +
  34 + def refineForScoring(raw: RDD[(String, (Long, Long))], outlierInfo: Map[String, Array[Long]]) = {
  35 + val expOutlier = outlierInfo("mean")(0) + 3 * outlierInfo("std")(0)
  36 + val conOutlier = outlierInfo("mean")(1) + 3 * outlierInfo("std")(1)
  37 + raw.map(x => (x._1, (math.min(x._2._1, expOutlier), math.min(x._2._2, conOutlier))))
  38 + }
  39 +
  40 + def dwellScoreCalc(cidOfInterest: RDD[(Long, (Int, Long))], rawDash: RDD[Map[String, String]]): RDD[(String, Long)] = {
  41 + val raw = updateDwellInfo(cidOfInterest, rawDashDwell(rawDash))
  42 + val outlierInfo = findNormalStat(raw)
  43 + val refined = refineForScoring(raw, outlierInfo)
  44 + refined.map(x => (x._1, x._2._2.toDouble / x._2._1)).map(x => (x._1, math.min(999L, (1000 * make_logit(x._2)).toLong)))
  45 + }
  46 +}
app/com/piki_ds/ver1/ConPostActionScore.scala View file @ 282422a
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.rdd.RDD
  4 +
  5 +import com.piki_ds.utils.GeneralTransformers._
  6 +
  7 +/**
  8 + *
  9 + * Created by jungwon on 8/5/15.
  10 + */
  11 +object ConPostActionScore {
  12 +
  13 + def rawDashPA(rawDash: RDD[Map[String, String]]): RDD[(String, (Long, Long, Long, Long, Long))] = {
  14 + rawDash.map(x => {
  15 + (x("cid"),
  16 + (x("view").toDouble.toLong, x("share").toDouble.toLong, x("bookmark").toDouble.toLong, x("likes").toDouble.toLong, x("comment").toDouble.toLong))
  17 + })
  18 + }
  19 +
  20 + def updatePAInfo(cidOfInterest: RDD[(Long, (Int, Long))], rawPA: RDD[(String, (Long, Long, Long, Long, Long))]): RDD[(String, (Long, Long, Long, Long, Long))] = {
  21 + val reduced = rawPA.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3, a._4 + b._4, a._5 + b._5)).map(x=>(x._1.toLong,x))
  22 + cidOfInterest.join(reduced).map(_._2._2)
  23 + }
  24 +
  25 + def findNormalStat(raw: RDD[(String, (Long, Long, Long, Long, Long))]): Map[String, Array[Long]] = {
  26