Blame view

app/com/piki_ds/ver1/EditorScore.scala 6.23 KB
94a64884b   Joanne   editor score
1
  package com.piki_ds.ver1
001a8f7f4   Joanne   finalizing ver1 s...
2

001a8f7f4   Joanne   finalizing ver1 s...
3
  import org.apache.spark.rdd.RDD
7393595ae   Joanne   error in sql
4
  import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
001a8f7f4   Joanne   finalizing ver1 s...
5
6
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.{SparkConf, SparkContext}
7393595ae   Joanne   error in sql
7
  import org.jblas.MatrixFunctions
001a8f7f4   Joanne   finalizing ver1 s...
8

7393595ae   Joanne   error in sql
9
  import com.piki_ds.utils.DateTimeEtc.getDateKey
001a8f7f4   Joanne   finalizing ver1 s...
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
  import com.piki_ds.utils.GetTextFile.getDashTable
  import com.piki_ds.utils.SqlContextConf.readPartialTable
  import com.piki_ds.utils.TempScoreSaveLoad
  import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key
  import com.piki_ds.ver1.DashRelatedScore.previousDash
  import com.piki_ds.utils.hbase.HbaseInserter
  
  /**
    * Created by jungwon on 5/2/16.
    */
  
  object EditorScore {
  
    def getSparkConf= {
      val conf = new SparkConf().setAppName("EditorScore")
      conf.setMaster("local[3]")
      conf.set("master", "local[3]")
      conf.set("spark.app.name", "EditorScore")
      conf.set("spark.driver.allowMultipleContexts", "true")
    }
  
    val sc = new SparkContext(getSparkConf)
    val sqlContext = SQLContext.getOrCreate(sc)
    val hadoopConf = sc.hadoopConfiguration
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  
    def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
      val list = hdfs.listStatus(new Path(path))
      list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
    }
  
    def followGetter(sQLContext: SQLContext, dateKey:String) = {
001a8f7f4   Joanne   finalizing ver1 s...
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
      val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString)
      val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
      val unionFrom = fromExisting.unionAll(fromUpdated)
      val intoMap = unionFrom.map(x=>{
        Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
      })
      val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum))
      unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
      follow_info
    }
  
    def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = {
      // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
      val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
      val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
      val fetchVar = Array("uid","level","name")
      val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
      val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
  
      // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
      val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey)
  
      val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L))))
  
      // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
      val filterS_mgcont = " where uid is not null group by uid"
      val fetchVar_mgcont = Array("uid", "min(udate)")
      val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
      import org.apache.spark.sql.functions._
      val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
      val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
  
      // uid, name, follow, debut
      joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{
        (x._1,(x._2._1 ,x._2._2.getOrElse(10L)))
      }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
    }
  
    def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
      //나이의 단위: 일
      dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
    }
  
    def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
      import scala.math.{exp, pow}
      val intoD = raw/(1000*60*60*24)
      val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
      transformed
    }
  
    def main(args:Array[String]) {
001a8f7f4   Joanne   finalizing ver1 s...
93
      val nowTS: Long = System.currentTimeMillis
7393595ae   Joanne   error in sql
94
      val dateKey = getDateKey(nowTS)
001a8f7f4   Joanne   finalizing ver1 s...
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
      //에디터 인기 점수 구하기
      val fromDB = editorDB(sqlContext, dateKey)
  
      val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
  
      //에디터 성과 점수 구하기
      val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))
  
      val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")
  
      val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)
  
      val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> {
        val innerSum = x._2.map(a => {
          (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
            a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
            a._2("comment").replaceAll("^$", "0").toDouble.toLong)
        }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
        (x._1, innerSum)
      }).map(x=>{
        val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
          tr_Weibull(x._2._2.toDouble, 1, 0.5)+
          tr_Weibull(x._2._3.toDouble, 1, 0.5)+
          tr_Weibull(x._2._4.toDouble, 1, 0.5)+
          tr_Weibull(x._2._5.toDouble, 1, 0.5)
        (x._1, score/5)
      })
  
      val finalScore = ePopularity.map(x=>{
        (x._1._1,x._2)
      }).leftOuterJoin(performance).map(x=>(x._1,(x._2._1+x._2._2.getOrElse(0.5D))/2)).filter(x=>x._1.nonEmpty && ! x._2.isNaN)
  
      val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))
  
      TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)
  
      val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString,  (x._2*1000).toLong.toString)).collect
      val test = new HbaseInserter("editor_score")
      test.insert(insertArray)
  
    }
  
  }