Blame view

app/com/piki_ds/ver1/EditorScore.scala 8.3 KB
94a64884b   Joanne   editor score
1
  package com.piki_ds.ver1
001a8f7f4   Joanne   finalizing ver1 s...
2

831ea5f93   Joanne   param and maxvalu...
3
  //import breeze.linalg.min
be098b973   Joanne   delete file add
4
  import com.piki_ds.utils.FileDelete._
001a8f7f4   Joanne   finalizing ver1 s...
5
  import org.apache.spark.rdd.RDD
7393595ae   Joanne   error in sql
6
  import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
001a8f7f4   Joanne   finalizing ver1 s...
7
8
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.{SparkConf, SparkContext}
7393595ae   Joanne   error in sql
9
  import org.jblas.MatrixFunctions
001a8f7f4   Joanne   finalizing ver1 s...
10

7393595ae   Joanne   error in sql
11
  import com.piki_ds.utils.DateTimeEtc.getDateKey
001a8f7f4   Joanne   finalizing ver1 s...
12
13
14
  import com.piki_ds.utils.GetTextFile.getDashTable
  import com.piki_ds.utils.SqlContextConf.readPartialTable
  import com.piki_ds.utils.TempScoreSaveLoad
be098b973   Joanne   delete file add
15
  import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Keys,make_0to1}
001a8f7f4   Joanne   finalizing ver1 s...
16
  import com.piki_ds.ver1.DashRelatedScore.previousDash
9777b48db   Joanne   hbase url send
17
  //import com.piki_ds.utils.hbase.HbaseInserter
001a8f7f4   Joanne   finalizing ver1 s...
18
19
20
21
22
23
  
  /**
    * Created by jungwon on 5/2/16.
    */
  
  object EditorScore {
d00b05c6f   Joanne   editor score range
24
    def getSparkConf1= {
001a8f7f4   Joanne   finalizing ver1 s...
25
26
27
28
29
30
      val conf = new SparkConf().setAppName("EditorScore")
      conf.setMaster("local[3]")
      conf.set("master", "local[3]")
      conf.set("spark.app.name", "EditorScore")
      conf.set("spark.driver.allowMultipleContexts", "true")
    }
d00b05c6f   Joanne   editor score range
31
32
33
34
35
36
37
    def getSparkConf2= {
      val conf = new SparkConf().setAppName("EditorScore")
      conf.setMaster("local[3]")
      conf.set("master", "local[3]")
      conf.set("spark.app.name", "EditorScore")
      conf.set("spark.driver.allowMultipleContexts", "t")
    }
001a8f7f4   Joanne   finalizing ver1 s...
38
39
40
41
42
  
    def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
      val list = hdfs.listStatus(new Path(path))
      list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
    }
2917ca52b   Joanne   editor related
43
    def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean) = {
34d9c2c35   Joanne   editor score fix
44
      val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
001a8f7f4   Joanne   finalizing ver1 s...
45
46
47
48
49
      val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
      val unionFrom = fromExisting.unionAll(fromUpdated)
      val intoMap = unionFrom.map(x=>{
        Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
      })
34d9c2c35   Joanne   editor score fix
50
51
52
53
      val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{
        val cuml = x._2.map(a=>a("follow_cnt").toLong).sum
        val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong)
        val followScore = 0.1*cuml+ts.take(10).sum
be098b973   Joanne   delete file add
54
        (x._1.toLong, followScore.toLong)
34d9c2c35   Joanne   editor score fix
55
      })
eb25d6063   Joanne   editor score
56
57
58
      if (fileSave) {
        unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
      }
001a8f7f4   Joanne   finalizing ver1 s...
59
60
      follow_info
    }
be098b973   Joanne   delete file add
61
    def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(Long, ((String, Long), Long))] = {
001a8f7f4   Joanne   finalizing ver1 s...
62
63
      // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
      val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
be098b973   Joanne   delete file add
64
      val filterS_user = s"where level in (${levels.map(x=>"'"+x+"'").mkString(",")})"
001a8f7f4   Joanne   finalizing ver1 s...
65
66
      val fetchVar = Array("uid","level","name")
      val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
be098b973   Joanne   delete file add
67
68
      // userInfo : (uid, 계정명)
      val userInfo: RDD[(Long, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid"), x.getAs[String]("name")))
001a8f7f4   Joanne   finalizing ver1 s...
69
70
71
72
73
74
75
  
      // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
      val filterS_mgcont = " where uid is not null group by uid"
      val fetchVar_mgcont = Array("uid", "min(udate)")
      val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
      import org.apache.spark.sql.functions._
      val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
be098b973   Joanne   delete file add
76
77
78
79
80
81
      //debutInfo: (uid, debut in timestamp)
      val debutInfo: RDD[(Long, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid"), x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
  
      val uidOfInterest: RDD[(Long, (String, Long))] = userInfo.fullOuterJoin(debutInfo).map(x=>{
        (x._1, (x._2._1.getOrElse(""), x._2._2.getOrElse(0L)))
      })
001a8f7f4   Joanne   finalizing ver1 s...
82

be098b973   Joanne   delete file add
83
84
85
86
87
88
89
      // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
      val follow_info: RDD[(Long, Long)] = followGetter(sQLContext, dateKey,fs,fileSave)
      // uid, ((name, debut), follow)
      val joinedFollowInfo: RDD[(Long, ((String, Long), Long))] = uidOfInterest.leftOuterJoin(follow_info).map(x=>{
        (x._1, (x._2._1, math.min(20000, x._2._2.getOrElse(0L))))
      })
      joinedFollowInfo
001a8f7f4   Joanne   finalizing ver1 s...
90
    }
be098b973   Joanne   delete file add
91
92
93
94
95
96
97
    /**
      *
      * @param dBTable  uid, ((name, debut), follow)
      * @param currentTS
      * @return
      */
    def popularity(dBTable: RDD[(Long, ((String, Long), Long))], currentTS:Long) = {
001a8f7f4   Joanne   finalizing ver1 s...
98
      //나이의 단위: 일
be098b973   Joanne   delete file add
99
      dBTable.map(x=>((x._1, x._2._1._1), ((1000*60*60*24).toLong*x._2._2).toDouble/(currentTS-x._2._1._2)))
001a8f7f4   Joanne   finalizing ver1 s...
100
101
102
103
104
105
106
107
108
109
    }
  
    def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
      import scala.math.{exp, pow}
      val intoD = raw/(1000*60*60*24)
      val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
      transformed
    }
  
    def main(args:Array[String]) {
d00b05c6f   Joanne   editor score range
110
111
112
113
      val sc = new SparkContext(getSparkConf1)
      val sqlContext = SQLContext.getOrCreate(sc)
      val hadoopConf = sc.hadoopConfiguration
      val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
2917ca52b   Joanne   editor related
114
      val followFileSave = if (args.nonEmpty && args.head.toInt.equals(0)) false else true
001a8f7f4   Joanne   finalizing ver1 s...
115
      val nowTS: Long = System.currentTimeMillis
be098b973   Joanne   delete file add
116
      val doi = getDateKey(nowTS)
7393595ae   Joanne   error in sql
117

001a8f7f4   Joanne   finalizing ver1 s...
118
      //에디터 인기 점수 구하기
be098b973   Joanne   delete file add
119
      val fromDB: RDD[(Long, ((String, Long), Long))] = editorDB(sqlContext, fs, doi,followFileSave)
001a8f7f4   Joanne   finalizing ver1 s...
120

be098b973   Joanne   delete file add
121
      val ePopularity: RDD[((Long, String), Double)] = make_0to1_2Keys(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
001a8f7f4   Joanne   finalizing ver1 s...
122
123
  
      //에디터 성과 점수 구하기
be098b973   Joanne   delete file add
124
      val dashInfo: RDD[(Long, Map[String, String])] = previousDash(doi).map(x=>(x("cid").toLong,x))
001a8f7f4   Joanne   finalizing ver1 s...
125

be098b973   Joanne   delete file add
126
      val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$doi/")
001a8f7f4   Joanne   finalizing ver1 s...
127

be098b973   Joanne   delete file add
128
      val uidAndContents: RDD[(Long, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1.toLong,x._2.size))
34d9c2c35   Joanne   editor score fix
129

be098b973   Joanne   delete file add
130
      val cPDB: RDD[(Long, Map[String, String])] = cidAndUid.map(x=>(x._1.toLong, x._2.toLong)).join(dashInfo).map(_._2)
001a8f7f4   Joanne   finalizing ver1 s...
131

be098b973   Joanne   delete file add
132
      val performanceRaw: RDD[(Long, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> {
34d9c2c35   Joanne   editor score fix
133
        val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => {
dca1584ae   Joanne   debug
134
135
136
          (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
            a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
            a._2("comment").replaceAll("^$", "0").toDouble.toLong)
001a8f7f4   Joanne   finalizing ver1 s...
137
138
        }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
        (x._1, innerSum)
be098b973   Joanne   delete file add
139
140
       }).join(uidAndContents)
      val avgPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
34d9c2c35   Joanne   editor score fix
141
142
143
144
145
        val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
          x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
          x._2._1._5.toDouble / x._2._2)
        (x._1, ap)
      })
be098b973   Joanne   delete file add
146
      val minMaxPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{
34d9c2c35   Joanne   editor score fix
147
148
149
150
151
152
153
154
        import math.{min,max}
        val s1 = min(max(500, x._2._1),25000)
        val s2 = min(max(100, x._2._1),2000)
        val s3 = min(max(10, x._2._1),2000)
        val s4 = min(max(10, x._2._1),2000)
        val s5 = min(max(100, x._2._1),3000)
        (x._1, (s1,s2,s3,s4,s5))
      })
be098b973   Joanne   delete file add
155
      val sumPerformance: RDD[(Long, Double)] = minMaxPerformance.map(x=>{
34d9c2c35   Joanne   editor score fix
156
157
158
159
160
        val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5
        (x._1, score/5)
      })
  
      val performance = make_0to1(sumPerformance)
be098b973   Joanne   delete file add
161

001a8f7f4   Joanne   finalizing ver1 s...
162
163
      val finalScore = ePopularity.map(x=>{
        (x._1._1,x._2)
be098b973   Joanne   delete file add
164
      }).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D)))
001a8f7f4   Joanne   finalizing ver1 s...
165

be098b973   Joanne   delete file add
166
      val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1.toString,(x._2*1000).toLong))
001a8f7f4   Joanne   finalizing ver1 s...
167

be098b973   Joanne   delete file add
168
169
170
      TempScoreSaveLoad.scoreSave(doi,"editor","",formatOutput,1)
      deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("editor",""),doi,30)
      scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$doi")
9777b48db   Joanne   hbase url send
171
      /*
001a8f7f4   Joanne   finalizing ver1 s...
172
173
174
      val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString,  (x._2*1000).toLong.toString)).collect
      val test = new HbaseInserter("editor_score")
      test.insert(insertArray)
9777b48db   Joanne   hbase url send
175
  */
001a8f7f4   Joanne   finalizing ver1 s...
176
177
178
    }
  
  }