EditorScore.scala 6.23 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
package com.piki_ds.ver1

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.MatrixFunctions

import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.GetTextFile.getDashTable
import com.piki_ds.utils.SqlContextConf.readPartialTable
import com.piki_ds.utils.TempScoreSaveLoad
import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key
import com.piki_ds.ver1.DashRelatedScore.previousDash
import com.piki_ds.utils.hbase.HbaseInserter

/**
* Created by jungwon on 5/2/16.
*/

object EditorScore {

def getSparkConf= {
val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]")
conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "true")
}

val sc = new SparkContext(getSparkConf)
val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
}

def followGetter(sQLContext: SQLContext, dateKey:String) = {
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString)
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
val unionFrom = fromExisting.unionAll(fromUpdated)
val intoMap = unionFrom.map(x=>{
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
})
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum))
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
follow_info
}

def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = {
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
val fetchVar = Array("uid","level","name")
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))

// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey)

val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L))))

// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
val filterS_mgcont = " where uid is not null group by uid"
val fetchVar_mgcont = Array("uid", "min(udate)")
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
import org.apache.spark.sql.functions._
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))

// uid, name, follow, debut
joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{
(x._1,(x._2._1 ,x._2._2.getOrElse(10L)))
}).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
}

def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
//나이의 단위: 일
dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
}

def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
import scala.math.{exp, pow}
val intoD = raw/(1000*60*60*24)
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
transformed
}

def main(args:Array[String]) {
val nowTS: Long = System.currentTimeMillis
val dateKey = getDateKey(nowTS)

//에디터 인기 점수 구하기
val fromDB = editorDB(sqlContext, dateKey)

val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))

//에디터 성과 점수 구하기
val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))

val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")

val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)

val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> {
val innerSum = x._2.map(a => {
(a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
a._2("comment").replaceAll("^$", "0").toDouble.toLong)
}).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
(x._1, innerSum)
}).map(x=>{
val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
tr_Weibull(x._2._2.toDouble, 1, 0.5)+
tr_Weibull(x._2._3.toDouble, 1, 0.5)+
tr_Weibull(x._2._4.toDouble, 1, 0.5)+
tr_Weibull(x._2._5.toDouble, 1, 0.5)
(x._1, score/5)
})

val finalScore = ePopularity.map(x=>{
(x._1._1,x._2)
}).leftOuterJoin(performance).map(x=>(x._1,(x._2._1+x._2._2.getOrElse(0.5D))/2)).filter(x=>x._1.nonEmpty && ! x._2.isNaN)

val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))

TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)

val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect
val test = new HbaseInserter("editor_score")
test.insert(insertArray)

}

}