Commit 20834fca6c3eb7b3db78814fb16db7d09525c67d

Authored by Joanne ago
1 parent 2fd2d149ed
Exists in master

progress score

Showing 4 changed files with 256 additions and 21 deletions Side-by-side Diff

app/com/piki_ds/preprocess/MapWithRank.scala View file @ 20834fc
1 1 package com.piki_ds.preprocess
2 2  
3   -import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
4   -import com.piki_ds.utils.GetTextFile.{getLapse,getLog}
5   -
6 3 import org.apache.spark.{SparkContext, SparkConf}
7 4 import org.apache.spark.rdd.RDD
8 5 import org.json4s.jackson.JsonMethods.parse
9 6  
  7 +import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  8 +import com.piki_ds.utils.GetTextFile.{getLapse,getLog}
10 9  
11 10 /**
12 11 * ggh를 위한 preprocessing
13 12  
14 13  
... ... @@ -14,31 +13,14 @@
14 13 */
15 14 object MapWithRank {
16 15 def getSparkConf = {
17   - //System.setProperty("SPARK_YARN_MODE", "true")
18 16 val conf = new SparkConf().setAppName("MapWithRank")
19 17 conf.setMaster("yarn-client")
20 18 conf.set("master", "yarn-client")
21 19 conf.set("spark.app.name", "MapWithRank")
22 20 }
23 21  
24   - val sc = new SparkContext(getSparkConf)
  22 + val sc = SparkContext.getOrCreate()
25 23  
26   - def main(args: Array[String]) {
27   - val nowTS: Long = System.currentTimeMillis
28   - val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
29   -
30   - val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
31   -
32   - val log = getLog(sc, doi)
33   - val joinedLog = joinLogWithLapse(log, getLapse(sc, doi, "CONSUME"))
34   - val mapped = mapActions(joinedLog, countUser(log, 10000))
35   - val ecWithRank: RDD[(String, (Int, String), (Long, Long))] = mapRanges(mapped)
36   -
37   - val csvMapRank = ecWithRank.map(x=>s"${x._1},${x._2._1},${x._2._2},${x._3._1},${x._3._2}")
38   - csvMapRank.saveAsTextFile(s"hdfs://pikinn/preprocess/ecWithRank/$doi/")
39   -
40   - }
41   -
42 24 /**
43 25 * filter users with log line size larger than limit
44 26 *
... ... @@ -197,6 +179,22 @@
197 179 }
198 180 }).flatMap(x => x).flatMap(x => x)
199 181 //최종형태(uuid, (노출위치, cid), (노출시간, 소비시간))
  182 + }
  183 +
  184 + def main(args: Array[String]) {
  185 + val nowTS: Long = System.currentTimeMillis
  186 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  187 +
  188 + val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  189 +
  190 + val log = getLog(sc, doi)
  191 + val joinedLog = joinLogWithLapse(log, getLapse(sc, doi, "CONSUME"))
  192 + val mapped = mapActions(joinedLog, countUser(log, 10000))
  193 + val ecWithRank: RDD[(String, (Int, String), (Long, Long))] = mapRanges(mapped)
  194 +
  195 + val csvMapRank = ecWithRank.map(x=>s"${x._1},${x._2._1},${x._2._2},${x._3._1},${x._3._2}")
  196 + csvMapRank.saveAsTextFile(s"hdfs://pikinn/preprocess/ecWithRank/$doi/")
  197 +
200 198 }
201 199  
202 200 }
app/com/piki_ds/ver1/ConEfficiencyScore.scala View file @ 20834fc
  1 +package com.piki_ds.ver1
  2 +
  3 +import scala.collection._
  4 +import org.apache.spark.sql.SQLContext
  5 +
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.{SparkConf, SparkContext}
  8 +
  9 +import com.piki_ds.utils.DateTimeEtc._
  10 +import com.piki_ds.utils.GeneralTransformers.make_logit
  11 +import com.piki_ds.utils.TempScoreSaveLoad
  12 +import com.piki_ds.preprocess.CidValidation._
  13 +
  14 +/**
  15 + * temporary ggh generation
  16 + * Created by jungwon on 7/9/15.
  17 + */
  18 +
  19 +object ConEfficiencyScore {
  20 + def getSparkConf= {
  21 + val conf = new SparkConf().setAppName("ConEfficiencyScore")
  22 + conf.setMaster("yarn-client")
  23 + conf.set("master", "yarn-client")
  24 + conf.set("spark.app.name", "ConEfficiencyScore")
  25 + conf.set("spark.akka.frameSize", "1024")
  26 + }
  27 +
  28 + val sc = SparkContext.getOrCreate()
  29 + val sqlContext = SQLContext.getOrCreate(sc)
  30 +
  31 +
  32 + def getECWithRank(dateKey:String) = {
  33 + val ecWithRankText = sc.textFile(s"hdfs://pikinn/preprocess/ecWithRank/$dateKey/part*")
  34 + val ecWithRank: RDD[(Int, (Int, Int), (Long, Long))] = ecWithRankText.map(x=> {
  35 + import au.com.bytecode.opencsv.CSVParser
  36 + val parser = new CSVParser(',', ''')
  37 + val s = parser.parseLine(x)
  38 + if (s.size == 5 &&
  39 + s(0).nonEmpty && s(0).forall(_.isDigit) && s(0).length < 10 && s(0).toInt != 0 &&
  40 + s(2).nonEmpty && s(2).forall(_.isDigit) && s(2).length < 10 && s(2).toInt != 0) {
  41 + Some((s(0).toInt, (s(1).toInt, s(2).toInt), (s(3).toLong, s(4).toLong)))
  42 + } else None
  43 + }).flatMap(x=>x)
  44 + ecWithRank
  45 + }
  46 + /**
  47 + *
  48 + * @param mappedRanges : RDD(uuid, (노출위치, cid), (노출시간,소비시간))
  49 + * @return : RDD(Int,Long)
  50 + */
  51 +
  52 + def getRankMean(mappedRanges: RDD[(Int, (Int, Int), (Long, Long))]) = {
  53 + val preCal = mappedRanges.filter(x => x._3._1 != 0).groupBy(x => x._2).map(x => (x._1, (x._2.map(a => a._3._1).sum, x._2.size)))
  54 + preCal.groupBy(x=>x._1._1).map(x=>(x._1, (x._2.map(a=>a._2._1).sum,x._2.map(a=>a._2._2).sum))).map(x=>{
  55 + val mean = x._2._1.toDouble/x._2._2
  56 + (x._1, mean.toLong)
  57 + })
  58 + }
  59 +
  60 + def getCardSize(doi: String) = {
  61 + val cidAndCS: RDD[(String, Int)] = sc.objectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi/part*")
  62 + cidAndCS.filter(x=>x._1.nonEmpty && x._1.forall(_.isDigit) && x._1.length < 10 && x._1.toInt!=0).map(x=>(x._1.toInt, x._2))
  63 + }
  64 +
  65 + def getCSMean(doi: String): Map[Int, Long] = {
  66 + val scMean: RDD[(Int, Long)] = sc.objectFile(s"hdfs://pikinn/preprocess/ctByCardSize/$doi/part*")
  67 + scMean.collectAsMap()
  68 + }
  69 +
  70 + def joinCardSize(mappedRanges: RDD[(Int, (Int, Int), (Long, Long))], cidAndCardSize: RDD[(Int, Int)]) = {
  71 + mappedRanges.map(x => (x._2._2, x)).join(cidAndCardSize).map(x => (x._2._1._1, x._2._1._2, x._2._1._3, x._2._2))
  72 + }
  73 +
  74 +
  75 + /**
  76 + *
  77 + * @param withCardSize
  78 + * @param cardSizeMean
  79 + */
  80 + def consumeTimeByCid(withCardSize:RDD[(Int, (Int, Int), (Long, Long), Int)],
  81 + etByRank: Map[Int, Long],
  82 + cardSizeMean: Map[Int, Long]) = {
  83 + val getRankM: RDD[((Int, (Int, Int), (Long, Long), Int), Long)] = withCardSize.map(x=>{
  84 + try{
  85 + Some((x,etByRank.getOrElse(x._2._1,10L)))
  86 + } catch {
  87 + case e:Exception => None
  88 + }
  89 + }).flatMap(x=>x)
  90 + val result = getRankM.groupBy(x=>(x._1._2._2,x._1._4)).map(x=>{
  91 + try {
  92 + val denom = x._2.map(a => a._1._3._1.toDouble / a._2).sum
  93 + val numer = x._2.map(a => a._1._3._2.toDouble).sum
  94 + val ggh = numer / (cardSizeMean.getOrElse(x._1._2, 10L) * denom)
  95 + Some(x._1._1, denom, numer, cardSizeMean.getOrElse(x._1._2, 10L))
  96 + } catch {
  97 + case e:Exception => None}
  98 + }).flatMap(x=>x)
  99 + result
  100 + }
  101 +
  102 +
  103 + def consumeTimeByCid_(withCardSize:RDD[(String, (Int, String), (Long, Long), Int)],
  104 + etByRank: Map[Int, Long],
  105 + cardSizeMean: Map[Int, Long]) = {
  106 + val getRankM: RDD[((String, (Int, String), (Long, Long), Int), Long)] = withCardSize.map(x=>{
  107 + try{
  108 + Some((x,etByRank.getOrElse(x._2._1,10L)))
  109 + } catch {
  110 + case e:Exception => None
  111 + }
  112 + }).flatMap(x=>x)
  113 + val result = getRankM.groupBy(x=>(x._1._2._2,x._1._4)).map(x=>{
  114 + try {
  115 + val denom = x._2.map(a => a._1._3._1.toDouble / a._2).sum
  116 + val numer = x._2.map(a => a._1._3._2.toDouble).sum
  117 + val ggh = numer / (cardSizeMean.getOrElse(x._1._2, 10L) * denom)
  118 + Some(x._1._1, ggh)
  119 + } catch {
  120 + case e:Exception => None}
  121 + }).flatMap(x=>x)
  122 + result
  123 + }
  124 +
  125 + def main (args: Array[String]):Unit= {
  126 + val nowTS: Long = System.currentTimeMillis
  127 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  128 +
  129 + val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  130 +
  131 + val ecWithRank = getECWithRank(doi)
  132 +
  133 + val etByRank: Map[Int, Long] = getRankMean(ecWithRank).collectAsMap()
  134 + val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatus(sqlContext, Array("ACTV","HOLD"))
  135 + val ggh: RDD[(Int, Double, Double, Long)] = consumeTimeByCid(joinCardSize(ecWithRank,getCardSize(doi)),etByRank,getCSMean(doi))
  136 + val cidAndScoreFormatted = ggh.map(x=>(x._1.toLong, make_logit(x._2))).join(cidOfInterest).map(x=>(x._1, x._2._1)).map(x=>{
  137 + (x._1,math.min(1000L,(x._2*1000).toLong))
  138 + }).map(x=>(x._1.toString, x._2))
  139 + TempScoreSaveLoad.scoreSave(doi,"content", "efficiency",cidAndScoreFormatted,100)
  140 +
  141 + }
  142 +
  143 +}
app/com/piki_ds/ver1/ConRejectScore.scala View file @ 20834fc
  1 +package com.piki_ds.ver1
  2 +
  3 +import com.piki_ds.preprocess.CidValidation._
  4 +import org.apache.spark.rdd.RDD
  5 +import org.apache.spark.sql.SQLContext
  6 +import org.apache.spark.{SparkConf, SparkContext}
  7 +
  8 +import com.piki_ds.ver1.Progress._
  9 +import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  10 +import com.piki_ds.utils.GeneralTransformers._
  11 +import com.piki_ds.utils.GetTextFile.getLog
  12 +import com.piki_ds.utils.TempScoreSaveLoad.scoreSave
  13 +import com.piki_ds.utils.SqlContextConf.getMaxId
  14 +
  15 +
  16 +/**
  17 + *
  18 + * Created by jungwon on 8/5/15.
  19 + */
  20 +
  21 +object ConRejectScore {
  22 +
  23 + def getSparkConf = {
  24 + val conf = new SparkConf().setAppName("ConRejectScore")
  25 + conf.setMaster("yarn-client")
  26 + conf.set("master", "yarn-client")
  27 + conf.set("spark.app.name", "ConRejectScore")
  28 + }
  29 +
  30 + val sc = SparkContext.getOrCreate()
  31 + val sqlContext = SQLContext.getOrCreate(sc)
  32 +
  33 + def main(args:Array[String]) {
  34 + val nowTS: Long = System.currentTimeMillis
  35 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  36 + val maxCid = getMaxId(sqlContext,"content")
  37 + val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  38 + val cidAndScore = getAProgress(sqlContext, dateKey,maxCid)
  39 + val cidOfInterest = getCidByStatus(sqlContext, Array("ACTV","HOLD")).map(x=>(x._1.toString,x._2))
  40 + val cidAndScoreFormatted = cidAndScore.join(cidOfInterest).map(x=>(x._1,x._2._1)).map(x=>{
  41 + (x._1, (1000*make_logit(x._2)).toLong)
  42 + })
  43 + scoreSave(dateKey,"content","reject",cidAndScoreFormatted,1)
  44 + }
  45 +
  46 + def getAProgress(sQLContext: SQLContext, doi:String, maxId:Long): RDD[(String, Double)] = {
  47 + val aProgress = alternativeProgress(sQLContext,getLog(sc, doi),maxId)
  48 + avgProgress(aProgress)
  49 + }
  50 +
  51 +}
app/com/piki_ds/ver1/Progress.scala View file @ 20834fc
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.sql.SQLContext
  4 +import org.apache.spark.rdd.RDD
  5 +
  6 +import com.piki_ds.utils.SqlContextConf.readPartialTableBig
  7 +
  8 +/**
  9 + *
  10 + * Created by jungwon on 7/9/15.
  11 + */
  12 +object Progress {
  13 +
  14 + def alternativeProgress(sqlContext:SQLContext, log: RDD[Array[String]], maxCid:Long) ={
  15 + val filterStr = "group by contents_id"
  16 + val fetchVar = Array("contents_id","count(ordering)")
  17 + val cardnumTGet = readPartialTableBig(sqlContext, "REPL", "new_pikicast_common","MG_CARD",
  18 + fetchVar, filterStr,"contents_id",maxCid,1000)
  19 + val cardnum = cardnumTGet.map(x=>{
  20 + (x.getAs[Long]("contents_id").toString,x.getAs[Long]("count(ordering)").toInt)
  21 + }).collectAsMap()
  22 +
  23 + log.filter(x=>x.length>5 && x(1).equals("CONSUME")).map(x=>{
  24 + //uuid,cid,cardnum
  25 + (x(3),x(4),x(5))
  26 + }).groupBy(x=>(x._1,x._2)).map(x=>{
  27 + (x._1,x._2.map(a=>a._3).toSeq.distinct.size)
  28 + }).map(x=>{
  29 + try {
  30 + Some(x._1, x._2.toDouble/(cardnum(x._1._2)+2L))
  31 + } catch {
  32 + case e:Exception => None
  33 + }
  34 + }).flatMap(x=>x).map(x=>(x._1,math.min(x._2,1.0)))
  35 + }
  36 +
  37 + def avgProgress(prgs:RDD[((String, String), Double)]) ={
  38 + prgs.groupBy(x=>x._1._2).map(x=>{
  39 + val avg = x._2.map(a=>a._2).sum/x._2.size
  40 + (x._1, avg)
  41 + })
  42 + }
  43 +}