Commit 34d9c2c35a21056e9451b66dbcad08c8ca9ff073

Authored by Joanne ago
1 parent ad90805d89
Exists in master

editor score fix

Showing 1 changed file with 44 additions and 9 deletions Side-by-side Diff

app/com/piki_ds/ver1/EditorScore.scala View file @ 34d9c2c
1 1 package com.piki_ds.ver1
2 2  
  3 +import breeze.linalg.min
3 4 import org.apache.spark.rdd.RDD
4 5 import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
5 6 import org.apache.spark.sql.SQLContext
... ... @@ -10,7 +11,7 @@
10 11 import com.piki_ds.utils.GetTextFile.getDashTable
11 12 import com.piki_ds.utils.SqlContextConf.readPartialTable
12 13 import com.piki_ds.utils.TempScoreSaveLoad
13   -import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key
  14 +import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1}
14 15 import com.piki_ds.ver1.DashRelatedScore.previousDash
15 16 import com.piki_ds.utils.hbase.HbaseInserter
16 17  
17 18  
... ... @@ -39,13 +40,18 @@
39 40 }
40 41  
41 42 def followGetter(sQLContext: SQLContext, dateKey:String, fileSave:Boolean = true) = {
42   - val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString)
  43 + val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
43 44 val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
44 45 val unionFrom = fromExisting.unionAll(fromUpdated)
45 46 val intoMap = unionFrom.map(x=>{
46 47 Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
47 48 })
48   - val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum))
  49 + val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{
  50 + val cuml = x._2.map(a=>a("follow_cnt").toLong).sum
  51 + val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong)
  52 + val followScore = 0.1*cuml+ts.take(10).sum
  53 + (x._1, followScore.toLong)
  54 + })
49 55 if (fileSave) {
50 56 unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
51 57 }
... ... @@ -61,7 +67,7 @@
61 67 val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
62 68  
63 69 // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
64   - val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey)
  70 + val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey, false)
65 71  
66 72 val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L))))
67 73  
68 74  
69 75  
... ... @@ -105,16 +111,45 @@
105 111  
106 112 val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")
107 113  
  114 + val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size))
  115 +
108 116 val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)
109 117  
110   - val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> {
111   - val innerSum = x._2.map(a => {
  118 + val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> {
  119 + val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => {
112 120 (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
113 121 a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
114 122 a._2("comment").replaceAll("^$", "0").toDouble.toLong)
115 123 }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
116 124 (x._1, innerSum)
117   - }).map(x=>{
  125 + }).join(uidAndCids)
  126 +
  127 +
  128 + val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
  129 + val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
  130 + x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
  131 + x._2._1._5.toDouble / x._2._2)
  132 + (x._1, ap)
  133 + })
  134 +
  135 + val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{
  136 + import math.{min,max}
  137 + val s1 = min(max(500, x._2._1),25000)
  138 + val s2 = min(max(100, x._2._1),2000)
  139 + val s3 = min(max(10, x._2._1),2000)
  140 + val s4 = min(max(10, x._2._1),2000)
  141 + val s5 = min(max(100, x._2._1),3000)
  142 + (x._1, (s1,s2,s3,s4,s5))
  143 + })
  144 +
  145 + val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
  146 + val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5
  147 + (x._1, score/5)
  148 + })
  149 +
  150 + val performance = make_0to1(sumPerformance)
  151 + /*
  152 + val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{
118 153 val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
119 154 tr_Weibull(x._2._2.toDouble, 1, 0.5)+
120 155 tr_Weibull(x._2._3.toDouble, 1, 0.5)+
121 156  
... ... @@ -122,10 +157,10 @@
122 157 tr_Weibull(x._2._5.toDouble, 1, 0.5)
123 158 (x._1, score/5)
124 159 })
125   -
  160 +*/
126 161 val finalScore = ePopularity.map(x=>{
127 162 (x._1._1,x._2)
128   - }).leftOuterJoin(performance).map(x=>(x._1,(x._2._1+x._2._2.getOrElse(0.5D))/2)).filter(x=>x._1.nonEmpty && ! x._2.isNaN)
  163 + }).leftOuterJoin(performance).map(x=>(x._1,0.3*x._2._1+0.7*x._2._2.getOrElse(0.3D))).filter(x=>x._1.nonEmpty)
129 164  
130 165 val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))
131 166