Commit c9e19f40ee2fd12ee19c292cebf6e73e32777177

Authored by Joanne ago
1 parent 9614639165
Exists in master

score version 1 finalize

Showing 5 changed files with 200 additions and 11 deletions Side-by-side Diff

app/com/piki_ds/ver1/CmtScore.scala View file @ c9e19f4
  1 +package com.piki_ds.ver1
  2 +
  3 +import com.piki_ds.preprocess.CidValidation._
  4 +import org.apache.spark.sql._
  5 +import org.apache.spark.{SparkContext, SparkConf}
  6 +import org.apache.spark.rdd.RDD
  7 +
  8 +import com.piki_ds.utils.DateTimeEtc._
  9 +import com.piki_ds.utils.GeneralTransformers.make_0to1
  10 +import com.piki_ds.utils.TempScoreSaveLoad
  11 +/**
  12 + * new comment score
  13 + * Created by jungwon on 7/10/15.
  14 + */
  15 +
  16 +object CmtScore {
  17 + def getSparkConf= {
  18 + val conf = new SparkConf().setAppName("CmtScore")
  19 + conf.setMaster("yarn-client")
  20 + conf.set("master", "yarn-client")
  21 + conf.set("spark.app.name", "CmtScore")
  22 + conf.set("spark.akka.frameSize", "1024")
  23 + }
  24 +
  25 + val sc = new SparkContext(getSparkConf)
  26 + val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
  27 +
  28 + def main(args:Array[String]) {
  29 + val nowTS: Long = System.currentTimeMillis
  30 + val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
  31 + val dateKey= yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  32 +
  33 + val cumlt= sqlContext.read.load(s"hdfs://pikinn/preprocess/cmtCountByHour/y=${dateKey.take(4)}/mo=${dateKey.slice(4,6)}/d=${dateKey.slice(6,8)}/*")
  34 + val a= cumlt.map(x=>(x.getAs[Long]("contents_id"),
  35 + (x.getAs[Long]("all_comment") + x.getAs[Long]("parent_comment") + x.getAs[Long]("unique_comment")).toDouble/3))
  36 + val cidOfInterest = getCidByStatus(sqlContext, Array("ACTV","HOLD")).map(x=>(x._1.toString,x._2))
  37 + val tempScoring: RDD[(String, Double)] = a.map(x=>(x._1.toString, x._2)).join(cidOfInterest).map(x=>(x._1, x._2._1))
  38 + val formatOutput= make_0to1(tempScoring).map(x=>(x._1,(x._2*1000).toLong))
  39 + TempScoreSaveLoad.scoreSave(dateKey,"comment","",formatOutput,5)
  40 + }
  41 +
  42 +
  43 +
  44 +}
app/com/piki_ds/ver1/ConEfficiencyScore.scala View file @ c9e19f4
1 1 package com.piki_ds.ver1
2 2  
3 3 import scala.collection._
4   -import org.apache.spark.sql.SQLContext
5 4  
  5 +import org.apache.spark.sql.SQLContext
6 6 import org.apache.spark.rdd.RDD
7 7 import org.apache.spark.{SparkConf, SparkContext}
8 8  
app/com/piki_ds/ver1/ConRejectScore.scala View file @ c9e19f4
1 1 package com.piki_ds.ver1
2 2  
3   -import com.piki_ds.preprocess.CidValidation._
4 3 import org.apache.spark.rdd.RDD
5 4 import org.apache.spark.sql.SQLContext
6 5 import org.apache.spark.{SparkConf, SparkContext}
7 6  
  7 +import com.piki_ds.preprocess.CidValidation._
8 8 import com.piki_ds.ver1.Progress._
9   -import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  9 +import com.piki_ds.utils.DateTimeEtc.getDateKey
10 10 import com.piki_ds.utils.GeneralTransformers._
11 11 import com.piki_ds.utils.GetTextFile.getLog
12 12 import com.piki_ds.utils.TempScoreSaveLoad.scoreSave
13 13  
14 14  
... ... @@ -30,11 +30,15 @@
30 30 val sc = SparkContext.getOrCreate()
31 31 val sqlContext = SQLContext.getOrCreate(sc)
32 32  
  33 + def getAProgress(sQLContext: SQLContext, doi:String, maxId:Long): RDD[(String, Double)] = {
  34 + val aProgress = alternativeProgress(sQLContext,getLog(sc, doi),maxId)
  35 + avgProgress(aProgress)
  36 + }
  37 +
33 38 def main(args:Array[String]) {
34 39 val nowTS: Long = System.currentTimeMillis
35   - val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
36 40 val maxCid = getMaxId(sqlContext,"content")
37   - val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  41 + val dateKey = getDateKey(nowTS)
38 42 val cidAndScore = getAProgress(sqlContext, dateKey,maxCid)
39 43 val cidOfInterest = getCidByStatus(sqlContext, Array("ACTV","HOLD")).map(x=>(x._1.toString,x._2))
40 44 val cidAndScoreFormatted = cidAndScore.join(cidOfInterest).map(x=>(x._1,x._2._1)).map(x=>{
... ... @@ -42,11 +46,5 @@
42 46 })
43 47 scoreSave(dateKey,"content","reject",cidAndScoreFormatted,1)
44 48 }
45   -
46   - def getAProgress(sQLContext: SQLContext, doi:String, maxId:Long): RDD[(String, Double)] = {
47   - val aProgress = alternativeProgress(sQLContext,getLog(sc, doi),maxId)
48   - avgProgress(aProgress)
49   - }
50   -
51 49 }
app/com/piki_ds/ver1/ContentScore.scala View file @ c9e19f4
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.sql.SQLContext
  4 +import org.apache.spark.{SparkContext, SparkConf}
  5 +import org.apache.spark.rdd.RDD
  6 +import org.slf4j.LoggerFactory
  7 +
  8 +import com.piki_ds.preprocess.CidValidation._
  9 +import com.piki_ds.utils.DateTimeEtc.getDateKey
  10 +import com.piki_ds.utils.GeneralTransformers.make_0to1
  11 +import com.piki_ds.utils.TempScoreSaveLoad
  12 +
  13 +/**
  14 + * Created by jungwon on 5/2/16.
  15 + */
  16 +
  17 +object ContentScore {
  18 +
  19 + val logger = LoggerFactory.getLogger(getClass)
  20 +
  21 + def getSparkConf= {
  22 + //System.setProperty("SPARK_YARN_MODE", "true")
  23 + val conf = new SparkConf().setAppName("ContentScore")
  24 + conf.setMaster("yarn-client")
  25 + conf.set("master", "yarn-client")
  26 + conf.set("spark.app.name", "ContentScore")
  27 + conf.set("spark.driver.allowMultipleContexts", "true")
  28 + conf.set("spark.akka.frameSize", "1024")
  29 + conf.set("spark.hadoop.validateOutputSpecs","false")
  30 + }
  31 +
  32 + val sc = new SparkContext(getSparkConf)
  33 + val sqlContext = SQLContext.getOrCreate(sc)
  34 +
  35 + def main(args: Array[String]) {
  36 + val nowTS: Long = System.currentTimeMillis
  37 + val doi = getDateKey(nowTS)
  38 +
  39 + val conClick: RDD[(Int, Map[String, Int])] = TempScoreSaveLoad.scoreLoad(sc,doi,"content", "click").repartition(10).map(x=>(x._1,Map("cid"->x._1,"click"->x._2)))
  40 + val conDwell: RDD[(Int, Map[String, Int])] = TempScoreSaveLoad.scoreLoad(sc,doi,"content","dwell").repartition(10).map(x=>(x._1,Map("cid"->x._1,"dwell"->x._2)))
  41 + val conPostAction: RDD[(Int, Map[String, Int])] = TempScoreSaveLoad.scoreLoad(sc,doi,"content","postAction").repartition(10).map(x=>(x._1,Map("cid"->x._1,"postAction"->x._2)))
  42 + val conReject: RDD[(Int, Map[String, Int])] = TempScoreSaveLoad.scoreLoad(sc,doi,"content","reject").repartition(10).map(x=>(x._1,Map("cid"->x._1,"reject"->x._2)))
  43 + val conEfficiency: RDD[(Int, Map[String, Int])] = TempScoreSaveLoad.scoreLoad(sc,doi,"content","efficiency").repartition(10).map(x=>(x._1,Map("cid"->x._1,"efficiency"->x._2)))
  44 +
  45 + val conScores = Array(conClick,conDwell,conPostAction,conReject,conEfficiency)
  46 +
  47 + val contentScore: RDD[(String, Int, Int)] = sc.union(conScores).reduceByKey((a, b) => {
  48 + a++b
  49 + }).map(x=>{
  50 + (x._1.toString,
  51 + x._2.getOrElse("click",0)+
  52 + x._2.getOrElse("dwell",0)+
  53 + x._2.getOrElse("postAction",0)+
  54 + x._2.getOrElse("reject",0),
  55 + x._2.getOrElse("efficiency",0))
  56 + })
  57 + val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatus(sqlContext, Array("ACTV","HOLD"))
  58 +
  59 + val cidAndScoreFormatted = make_0to1(contentScore.map(x=> (x._1,x._2.toDouble))).map(x=>(x._1, (x._2*1000).toLong))
  60 + TempScoreSaveLoad.scoreSave(doi,"content","all",cidAndScoreFormatted,1)
  61 + }
  62 +
  63 +}
app/com/piki_ds/ver1/QualityScore.scala View file @ c9e19f4
  1 +package com.piki_ds.ver1
  2 +
  3 +import org.apache.spark.{SparkContext, SparkConf}
  4 +import org.apache.spark.rdd.RDD
  5 +
  6 +import com.piki_ds.utils.hbase.HbaseInserter
  7 +import com.piki_ds.utils.GeneralTransformers.make_0to1
  8 +import com.piki_ds.utils.DateTimeEtc.getDateKey
  9 +import com.piki_ds.utils.TempScoreSaveLoad._
  10 +
  11 +/**
  12 + * Created by jungwon on 5/2/16.
  13 + */
  14 +
  15 +object QualityScore {
  16 +
  17 + def getSparkConf= {
  18 + val conf = new SparkConf().setAppName("QualityScore")
  19 + conf.setMaster("yarn-client")
  20 + conf.set("master", "yarn-client")
  21 + conf.set("spark.app.name", "QualityScore")
  22 + conf.set("spark.akka.frameSize", "1024")
  23 + }
  24 +
  25 + val sc = new SparkContext(getSparkConf)
  26 +
  27 + def infoJoined_(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
  28 + val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
  29 + val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","")
  30 + val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
  31 + val concom = content.fullOuterJoin(comment).map(x=>(x._1,(x._2._1.getOrElse(9999),x._2._2.getOrElse(9999))))
  32 + cuInfo.join(concom).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2._1},${x._2._2._2}")))
  33 + }
  34 +
  35 + def infoJoined(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
  36 + val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
  37 + val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
  38 + cuInfo.join(content).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2}")))
  39 + }
  40 +
  41 + def getScoresByDates(cuInfo: RDD[(Int,Int)], dateList:Array[String]) = {
  42 + val scores = dateList.map(d=> infoJoined(cuInfo, d))
  43 + val groupedScore: RDD[((Int, Int), Map[String, String])] = sc.union(scores).reduceByKey(_++_)
  44 + groupedScore.map(x=>s"${x._1._1},${x._1._2},${dateList.map(x._2.getOrElse(_,",,")).mkString(",")}")
  45 + }
  46 +
  47 + def joinEditorCid(editorScore: RDD[(String, Long)], editorInfo:RDD[(String, String)]) = {
  48 + editorInfo.join(editorScore).map(_._2)
  49 + }
  50 +
  51 + def combineScores(content:RDD[(Int,Int)], comment:RDD[(Int,Int)], editor:RDD[(Int,Int)],
  52 + param:Map[String,Double]): RDD[(String, Long)] = {
  53 + val combine = content.fullOuterJoin(comment).map(x=>{
  54 + (x._1, (x._2._1.getOrElse(120),x._2._2.getOrElse(1)))
  55 + }).fullOuterJoin(editor).map(x=>{
  56 + val b: (Int, (Option[(Int, Int)], Option[Int])) = x
  57 + (x._1, (x._2._1.getOrElse((110,112)),x._2._2.getOrElse(121)))
  58 + }).map(x=> {
  59 + (x._1, (x._2._1._1.toDouble, x._2._1._2.toDouble, x._2._2.toDouble))
  60 + }).map(x=>{
  61 + val score:Double = param("content")*x._2._1+param("comment")*x._2._2+param("editor")*x._2._3
  62 + (x._1, score)
  63 + }).map(x=>(x._1.toString,x._2))
  64 + make_0to1(combine).map(x=>(x._1,(x._2*1000).toLong))
  65 + }
  66 +
  67 + def main(args: Array[String]) {
  68 + val nowTS: Long = System.currentTimeMillis
  69 + val doi = getDateKey(nowTS)
  70 +
  71 + val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
  72 + val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","")
  73 + val editor: RDD[(Int, Int)] = scoreLoad(sc,doi,"editor","")
  74 +
  75 + val param = Map("content"->0.5D,"comment"-> 0.30D,"editor"->0.20D)
  76 +
  77 + val finalScore: RDD[(Int, Long)] = combineScores(content,comment,editor,param).map(x=>(x._1.toInt,x._2.toLong)).filter(_._1 != 0)
  78 +
  79 + val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect()
  80 + val test = new HbaseInserter("cid_quality")
  81 + test.insert(insertArray)
  82 +}
  83 +
  84 +}