Blame view

app/com/piki_ds/ver1/EditorScore.scala 8.26 KB
94a64884b   Joanne   editor score
1
  package com.piki_ds.ver1
001a8f7f4   Joanne   finalizing ver1 s...
2

831ea5f93   Joanne   param and maxvalu...
3
  //import breeze.linalg.min
001a8f7f4   Joanne   finalizing ver1 s...
4
  import org.apache.spark.rdd.RDD
7393595ae   Joanne   error in sql
5
  import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
001a8f7f4   Joanne   finalizing ver1 s...
6
7
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.{SparkConf, SparkContext}
7393595ae   Joanne   error in sql
8
  import org.jblas.MatrixFunctions
001a8f7f4   Joanne   finalizing ver1 s...
9

7393595ae   Joanne   error in sql
10
  import com.piki_ds.utils.DateTimeEtc.getDateKey
001a8f7f4   Joanne   finalizing ver1 s...
11
12
13
  import com.piki_ds.utils.GetTextFile.getDashTable
  import com.piki_ds.utils.SqlContextConf.readPartialTable
  import com.piki_ds.utils.TempScoreSaveLoad
34d9c2c35   Joanne   editor score fix
14
  import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1}
001a8f7f4   Joanne   finalizing ver1 s...
15
  import com.piki_ds.ver1.DashRelatedScore.previousDash
9777b48db   Joanne   hbase url send
16
  //import com.piki_ds.utils.hbase.HbaseInserter
001a8f7f4   Joanne   finalizing ver1 s...
17
18
19
20
21
22
  
  /**
    * Created by jungwon on 5/2/16.
    */
  
  object EditorScore {
d00b05c6f   Joanne   editor score range
23
    def getSparkConf1= {
001a8f7f4   Joanne   finalizing ver1 s...
24
25
26
27
28
29
      val conf = new SparkConf().setAppName("EditorScore")
      conf.setMaster("local[3]")
      conf.set("master", "local[3]")
      conf.set("spark.app.name", "EditorScore")
      conf.set("spark.driver.allowMultipleContexts", "true")
    }
d00b05c6f   Joanne   editor score range
30
31
32
33
34
35
36
    def getSparkConf2= {
      val conf = new SparkConf().setAppName("EditorScore")
      conf.setMaster("local[3]")
      conf.set("master", "local[3]")
      conf.set("spark.app.name", "EditorScore")
      conf.set("spark.driver.allowMultipleContexts", "t")
    }
001a8f7f4   Joanne   finalizing ver1 s...
37
38
39
40
41
  
    def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
      val list = hdfs.listStatus(new Path(path))
      list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
    }
d00b05c6f   Joanne   editor score range
42
    def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = {
34d9c2c35   Joanne   editor score fix
43
      val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
001a8f7f4   Joanne   finalizing ver1 s...
44
45
46
47
48
      val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
      val unionFrom = fromExisting.unionAll(fromUpdated)
      val intoMap = unionFrom.map(x=>{
        Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
      })
34d9c2c35   Joanne   editor score fix
49
50
51
52
53
54
      val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{
        val cuml = x._2.map(a=>a("follow_cnt").toLong).sum
        val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong)
        val followScore = 0.1*cuml+ts.take(10).sum
        (x._1, followScore.toLong)
      })
eb25d6063   Joanne   editor score
55
56
57
      if (fileSave) {
        unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
      }
001a8f7f4   Joanne   finalizing ver1 s...
58
59
      follow_info
    }
d00b05c6f   Joanne   editor score range
60
    def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = {
001a8f7f4   Joanne   finalizing ver1 s...
61
62
63
64
65
66
67
68
      // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
      val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
      val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
      val fetchVar = Array("uid","level","name")
      val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
      val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
  
      // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
d00b05c6f   Joanne   editor score range
69
      val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs)
001a8f7f4   Joanne   finalizing ver1 s...
70

d00b05c6f   Joanne   editor score range
71
      val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L))))
001a8f7f4   Joanne   finalizing ver1 s...
72
73
74
75
76
77
78
79
80
81
  
      // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
      val filterS_mgcont = " where uid is not null group by uid"
      val fetchVar_mgcont = Array("uid", "min(udate)")
      val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
      import org.apache.spark.sql.functions._
      val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
      val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
  
      // uid, name, follow, debut
d00b05c6f   Joanne   editor score range
82
83
      val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{
        (x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L)))
001a8f7f4   Joanne   finalizing ver1 s...
84
      }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
831ea5f93   Joanne   param and maxvalu...
85
      rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4))
001a8f7f4   Joanne   finalizing ver1 s...
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    }
  
    def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
      //나이의 단위: 일
      dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
    }
  
    def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
      import scala.math.{exp, pow}
      val intoD = raw/(1000*60*60*24)
      val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
      transformed
    }
  
    def main(args:Array[String]) {
d00b05c6f   Joanne   editor score range
101
102
103
104
      val sc = new SparkContext(getSparkConf1)
      val sqlContext = SQLContext.getOrCreate(sc)
      val hadoopConf = sc.hadoopConfiguration
      val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
001a8f7f4   Joanne   finalizing ver1 s...
105
      val nowTS: Long = System.currentTimeMillis
7393595ae   Joanne   error in sql
106
      val dateKey = getDateKey(nowTS)
001a8f7f4   Joanne   finalizing ver1 s...
107
      //에디터 인기 점수 구하기
d00b05c6f   Joanne   editor score range
108
      val fromDB = editorDB(sqlContext, fs, dateKey)
001a8f7f4   Joanne   finalizing ver1 s...
109
110
111
112
113
114
115
  
      val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
  
      //에디터 성과 점수 구하기
      val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))
  
      val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")
34d9c2c35   Joanne   editor score fix
116
      val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size))
001a8f7f4   Joanne   finalizing ver1 s...
117
      val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)
34d9c2c35   Joanne   editor score fix
118
119
      val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> {
        val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => {
001a8f7f4   Joanne   finalizing ver1 s...
120
121
122
123
124
          (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
            a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
            a._2("comment").replaceAll("^$", "0").toDouble.toLong)
        }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
        (x._1, innerSum)
34d9c2c35   Joanne   editor score fix
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
       }).join(uidAndCids)
  
  
      val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
        val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
          x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
          x._2._1._5.toDouble / x._2._2)
        (x._1, ap)
      })
  
      val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{
        import math.{min,max}
        val s1 = min(max(500, x._2._1),25000)
        val s2 = min(max(100, x._2._1),2000)
        val s3 = min(max(10, x._2._1),2000)
        val s4 = min(max(10, x._2._1),2000)
        val s5 = min(max(100, x._2._1),3000)
        (x._1, (s1,s2,s3,s4,s5))
      })
  
      val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
        val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5
        (x._1, score/5)
      })
  
      val performance = make_0to1(sumPerformance)
      /*
    val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
001a8f7f4   Joanne   finalizing ver1 s...
153
154
155
156
157
158
159
        val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
          tr_Weibull(x._2._2.toDouble, 1, 0.5)+
          tr_Weibull(x._2._3.toDouble, 1, 0.5)+
          tr_Weibull(x._2._4.toDouble, 1, 0.5)+
          tr_Weibull(x._2._5.toDouble, 1, 0.5)
        (x._1, score/5)
      })
34d9c2c35   Joanne   editor score fix
160
  */
001a8f7f4   Joanne   finalizing ver1 s...
161
162
      val finalScore = ePopularity.map(x=>{
        (x._1._1,x._2)
a42dcdb4e   Joanne   param change in e...
163
      }).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D))).filter(x=>x._1.nonEmpty)
001a8f7f4   Joanne   finalizing ver1 s...
164
165
166
167
  
      val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))
  
      TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)
f815d04ef   Joanne   hbase url send
168
      scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$dateKey")
9777b48db   Joanne   hbase url send
169
      /*
001a8f7f4   Joanne   finalizing ver1 s...
170
171
172
      val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString,  (x._2*1000).toLong.toString)).collect
      val test = new HbaseInserter("editor_score")
      test.insert(insertArray)
9777b48db   Joanne   hbase url send
173
  */
001a8f7f4   Joanne   finalizing ver1 s...
174
175
176
    }
  
  }