EditorScore.scala 8.26 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
package com.piki_ds.ver1

//import breeze.linalg.min
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.MatrixFunctions

import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.GetTextFile.getDashTable
import com.piki_ds.utils.SqlContextConf.readPartialTable
import com.piki_ds.utils.TempScoreSaveLoad
import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1}
import com.piki_ds.ver1.DashRelatedScore.previousDash
//import com.piki_ds.utils.hbase.HbaseInserter

/**
* Created by jungwon on 5/2/16.
*/

object EditorScore {

def getSparkConf1= {
val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]")
conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "true")
}

def getSparkConf2= {
val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]")
conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "t")
}

def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
}

def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = {
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
val unionFrom = fromExisting.unionAll(fromUpdated)
val intoMap = unionFrom.map(x=>{
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
})
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{
val cuml = x._2.map(a=>a("follow_cnt").toLong).sum
val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong)
val followScore = 0.1*cuml+ts.take(10).sum
(x._1, followScore.toLong)
})
if (fileSave) {
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
}
follow_info
}

def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = {
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
val fetchVar = Array("uid","level","name")
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))

// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs)

val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L))))

// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
val filterS_mgcont = " where uid is not null group by uid"
val fetchVar_mgcont = Array("uid", "min(udate)")
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
import org.apache.spark.sql.functions._
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))

// uid, name, follow, debut
val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{
(x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L)))
}).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4))
}

def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
//나이의 단위: 일
dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
}

def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
import scala.math.{exp, pow}
val intoD = raw/(1000*60*60*24)
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
transformed
}

def main(args:Array[String]) {
val sc = new SparkContext(getSparkConf1)
val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

val nowTS: Long = System.currentTimeMillis
val dateKey = getDateKey(nowTS)

//에디터 인기 점수 구하기
val fromDB = editorDB(sqlContext, fs, dateKey)

val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))

//에디터 성과 점수 구하기
val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))

val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")

val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size))

val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)

val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> {
val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => {
(a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
a._2("comment").replaceAll("^$", "0").toDouble.toLong)
}).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
(x._1, innerSum)
}).join(uidAndCids)


val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
x._2._1._5.toDouble / x._2._2)
(x._1, ap)
})

val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{
import math.{min,max}
val s1 = min(max(500, x._2._1),25000)
val s2 = min(max(100, x._2._1),2000)
val s3 = min(max(10, x._2._1),2000)
val s4 = min(max(10, x._2._1),2000)
val s5 = min(max(100, x._2._1),3000)
(x._1, (s1,s2,s3,s4,s5))
})

val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5
(x._1, score/5)
})

val performance = make_0to1(sumPerformance)
/*
val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
tr_Weibull(x._2._2.toDouble, 1, 0.5)+
tr_Weibull(x._2._3.toDouble, 1, 0.5)+
tr_Weibull(x._2._4.toDouble, 1, 0.5)+
tr_Weibull(x._2._5.toDouble, 1, 0.5)
(x._1, score/5)
})
*/
val finalScore = ePopularity.map(x=>{
(x._1._1,x._2)
}).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D))).filter(x=>x._1.nonEmpty)

val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))

TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)
scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$dateKey")
/*
val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect
val test = new HbaseInserter("editor_score")
test.insert(insertArray)
*/
}

}