QualityScore.scala 3.52 KB
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
package com.piki_ds.ver1

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD

import com.piki_ds.utils.hbase.HbaseInserter
import com.piki_ds.utils.GeneralTransformers.make_0to1
import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.TempScoreSaveLoad._

/**
* Created by jungwon on 5/2/16.
*/

object QualityScore {

def getSparkConf= {
val conf = new SparkConf().setAppName("QualityScore")
conf.setMaster("yarn-client")
conf.set("master", "yarn-client")
conf.set("spark.app.name", "QualityScore")
conf.set("spark.akka.frameSize", "1024")
}

val sc = new SparkContext(getSparkConf)

def infoJoined_(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","")
val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
val concom = content.fullOuterJoin(comment).map(x=>(x._1,(x._2._1.getOrElse(9999),x._2._2.getOrElse(9999))))
cuInfo.join(concom).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2._1},${x._2._2._2}")))
}

def infoJoined(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
cuInfo.join(content).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2}")))
}

def getScoresByDates(cuInfo: RDD[(Int,Int)], dateList:Array[String]) = {
val scores = dateList.map(d=> infoJoined(cuInfo, d))
val groupedScore: RDD[((Int, Int), Map[String, String])] = sc.union(scores).reduceByKey(_++_)
groupedScore.map(x=>s"${x._1._1},${x._1._2},${dateList.map(x._2.getOrElse(_,",,")).mkString(",")}")
}

def joinEditorCid(editorScore: RDD[(String, Long)], editorInfo:RDD[(String, String)]) = {
editorInfo.join(editorScore).map(_._2)
}

def combineScores(content:RDD[(Int,Int)], comment:RDD[(Int,Int)], editor:RDD[(Int,Int)],
param:Map[String,Double]): RDD[(String, Long)] = {
val combine = content.fullOuterJoin(comment).map(x=>{
(x._1, (x._2._1.getOrElse(120),x._2._2.getOrElse(1)))
}).fullOuterJoin(editor).map(x=>{
val b: (Int, (Option[(Int, Int)], Option[Int])) = x
(x._1, (x._2._1.getOrElse((110,112)),x._2._2.getOrElse(121)))
}).map(x=> {
(x._1, (x._2._1._1.toDouble, x._2._1._2.toDouble, x._2._2.toDouble))
}).map(x=>{
val score:Double = param("content")*x._2._1+param("comment")*x._2._2+param("editor")*x._2._3
(x._1, score)
}).map(x=>(x._1.toString,x._2))
make_0to1(combine).map(x=>(x._1,(x._2*1000).toLong))
}

def main(args: Array[String]) {
val nowTS: Long = System.currentTimeMillis
val doi = getDateKey(nowTS)

val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","")
val editor: RDD[(Int, Int)] = scoreLoad(sc,doi,"editor","")

val param = Map("content"->0.5D,"comment"-> 0.30D,"editor"->0.20D)

val finalScore: RDD[(Int, Long)] = combineScores(content,comment,editor,param).map(x=>(x._1.toInt,x._2.toLong)).filter(_._1 != 0)
scoreSave(doi,"quality","",finalScore.map(x=>(x._1.toString,x._2)),1)
val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect()
val test = new HbaseInserter("cid_quality")
test.insert(insertArray)
}

}