Commit d00b05c6f35fdc3e116683546c34acf98057b6ce
1 parent
5ee0858f89
Exists in
master
editor score range
Showing 1 changed file with 20 additions and 12 deletions Inline Diff
app/com/piki_ds/ver1/EditorScore.scala
View file @
d00b05c
package com.piki_ds.ver1 | 1 | 1 | package com.piki_ds.ver1 | |
2 | 2 | |||
//import breeze.linalg.min | 3 | 3 | //import breeze.linalg.min | |
import org.apache.spark.rdd.RDD | 4 | 4 | import org.apache.spark.rdd.RDD | |
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} | 5 | 5 | import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} | |
import org.apache.spark.sql.SQLContext | 6 | 6 | import org.apache.spark.sql.SQLContext | |
import org.apache.spark.{SparkConf, SparkContext} | 7 | 7 | import org.apache.spark.{SparkConf, SparkContext} | |
import org.jblas.MatrixFunctions | 8 | 8 | import org.jblas.MatrixFunctions | |
9 | 9 | |||
import com.piki_ds.utils.DateTimeEtc.getDateKey | 10 | 10 | import com.piki_ds.utils.DateTimeEtc.getDateKey | |
import com.piki_ds.utils.GetTextFile.getDashTable | 11 | 11 | import com.piki_ds.utils.GetTextFile.getDashTable | |
import com.piki_ds.utils.SqlContextConf.readPartialTable | 12 | 12 | import com.piki_ds.utils.SqlContextConf.readPartialTable | |
import com.piki_ds.utils.TempScoreSaveLoad | 13 | 13 | import com.piki_ds.utils.TempScoreSaveLoad | |
import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1} | 14 | 14 | import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1} | |
import com.piki_ds.ver1.DashRelatedScore.previousDash | 15 | 15 | import com.piki_ds.ver1.DashRelatedScore.previousDash | |
//import com.piki_ds.utils.hbase.HbaseInserter | 16 | 16 | //import com.piki_ds.utils.hbase.HbaseInserter | |
17 | 17 | |||
/** | 18 | 18 | /** | |
* Created by jungwon on 5/2/16. | 19 | 19 | * Created by jungwon on 5/2/16. | |
*/ | 20 | 20 | */ | |
21 | 21 | |||
object EditorScore { | 22 | 22 | object EditorScore { | |
23 | 23 | |||
def getSparkConf= { | 24 | 24 | def getSparkConf1= { | |
val conf = new SparkConf().setAppName("EditorScore") | 25 | 25 | val conf = new SparkConf().setAppName("EditorScore") | |
conf.setMaster("local[3]") | 26 | 26 | conf.setMaster("local[3]") | |
conf.set("master", "local[3]") | 27 | 27 | conf.set("master", "local[3]") | |
conf.set("spark.app.name", "EditorScore") | 28 | 28 | conf.set("spark.app.name", "EditorScore") | |
conf.set("spark.driver.allowMultipleContexts", "true") | 29 | 29 | conf.set("spark.driver.allowMultipleContexts", "true") | |
} | 30 | 30 | } | |
31 | 31 | |||
val sc = new SparkContext(getSparkConf) | 32 | 32 | def getSparkConf2= { | |
val sqlContext = SQLContext.getOrCreate(sc) | 33 | 33 | val conf = new SparkConf().setAppName("EditorScore") | |
val hadoopConf = sc.hadoopConfiguration | 34 | 34 | conf.setMaster("local[3]") | |
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) | 35 | 35 | conf.set("master", "local[3]") | |
36 | conf.set("spark.app.name", "EditorScore") | |||
37 | conf.set("spark.driver.allowMultipleContexts", "t") | |||
38 | } | |||
36 | 39 | |||
def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { | 37 | 40 | def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { | |
val list = hdfs.listStatus(new Path(path)) | 38 | 41 | val list = hdfs.listStatus(new Path(path)) | |
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) | 39 | 42 | list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) | |
} | 40 | 43 | } | |
41 | 44 | |||
def followGetter(sQLContext: SQLContext, dateKey:String, fileSave:Boolean = true) = { | 42 | 45 | def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = { | |
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString) | 43 | 46 | val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString) | |
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) | 44 | 47 | val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) | |
val unionFrom = fromExisting.unionAll(fromUpdated) | 45 | 48 | val unionFrom = fromExisting.unionAll(fromUpdated) | |
val intoMap = unionFrom.map(x=>{ | 46 | 49 | val intoMap = unionFrom.map(x=>{ | |
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) | 47 | 50 | Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) | |
}) | 48 | 51 | }) | |
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{ | 49 | 52 | val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{ | |
val cuml = x._2.map(a=>a("follow_cnt").toLong).sum | 50 | 53 | val cuml = x._2.map(a=>a("follow_cnt").toLong).sum | |
val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong) | 51 | 54 | val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong) | |
val followScore = 0.1*cuml+ts.take(10).sum | 52 | 55 | val followScore = 0.1*cuml+ts.take(10).sum | |
(x._1, followScore.toLong) | 53 | 56 | (x._1, followScore.toLong) | |
}) | 54 | 57 | }) | |
if (fileSave) { | 55 | 58 | if (fileSave) { | |
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") | 56 | 59 | unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") | |
} | 57 | 60 | } | |
follow_info | 58 | 61 | follow_info | |
} | 59 | 62 | } | |
60 | 63 | |||
def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = { | 61 | 64 | def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = { | |
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) | 62 | 65 | // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) | |
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") | 63 | 66 | val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") | |
val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" | 64 | 67 | val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" | |
val fetchVar = Array("uid","level","name") | 65 | 68 | val fetchVar = Array("uid","level","name") | |
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) | 66 | 69 | val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) | |
val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) | 67 | 70 | val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) | |
68 | 71 | |||
// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 | 69 | 72 | // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 | |
val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey) | 70 | 73 | val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs) | |
71 | 74 | |||
val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L)))) | 72 | 75 | val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L)))) | |
73 | 76 | |||
// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 | 74 | 77 | // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 | |
val filterS_mgcont = " where uid is not null group by uid" | 75 | 78 | val filterS_mgcont = " where uid is not null group by uid" | |
val fetchVar_mgcont = Array("uid", "min(udate)") | 76 | 79 | val fetchVar_mgcont = Array("uid", "min(udate)") | |
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) | 77 | 80 | val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) | |
import org.apache.spark.sql.functions._ | 78 | 81 | import org.apache.spark.sql.functions._ | |
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) | 79 | 82 | val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) | |
val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | 80 | 83 | val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | |
81 | 84 | |||
// uid, name, follow, debut | 82 | 85 | // uid, name, follow, debut | |
val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{ | 83 | 86 | val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{ | |
(x._1,(x._2._1 ,x._2._2.getOrElse(10L))) | 84 | 87 | (x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L))) | |
}).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) | 85 | 88 | }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) | |
rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4)) | 86 | 89 | rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4)) | |
} | 87 | 90 | } | |
88 | 91 | |||
def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { | 89 | 92 | def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { | |
//나이의 단위: 일 | 90 | 93 | //나이의 단위: 일 | |
dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) | 91 | 94 | dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) | |
} | 92 | 95 | } | |
93 | 96 | |||
def tr_Weibull(raw:Double, lambd:Int, k:Double) = { | 94 | 97 | def tr_Weibull(raw:Double, lambd:Int, k:Double) = { | |
import scala.math.{exp, pow} | 95 | 98 | import scala.math.{exp, pow} | |
val intoD = raw/(1000*60*60*24) | 96 | 99 | val intoD = raw/(1000*60*60*24) | |
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) | 97 | 100 | val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) | |
transformed | 98 | 101 | transformed | |
} | 99 | 102 | } | |
100 | 103 | |||
def main(args:Array[String]) { | 101 | 104 | def main(args:Array[String]) { | |
105 | val sc = new SparkContext(getSparkConf1) | |||
106 | val sqlContext = SQLContext.getOrCreate(sc) | |||
107 | val hadoopConf = sc.hadoopConfiguration | |||
108 | val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) | |||
109 | ||||
val nowTS: Long = System.currentTimeMillis | 102 | 110 | val nowTS: Long = System.currentTimeMillis | |
val dateKey = getDateKey(nowTS) | 103 | 111 | val dateKey = getDateKey(nowTS) | |
104 | 112 | |||
//에디터 인기 점수 구하기 | 105 | 113 | //에디터 인기 점수 구하기 | |
val fromDB = editorDB(sqlContext, dateKey) | 106 | 114 | val fromDB = editorDB(sqlContext, fs, dateKey) | |
107 | 115 | |||
val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | 108 | 116 | val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | |
109 | 117 | |||
//에디터 성과 점수 구하기 | 110 | 118 | //에디터 성과 점수 구하기 | |
val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) | 111 | 119 | val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) | |
112 | 120 | |||
val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") | 113 | 121 | val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") | |
114 | 122 | |||
val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size)) | 115 | 123 | val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size)) | |
116 | 124 | |||
val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) | 117 | 125 | val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) | |
118 | 126 | |||
val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { | 119 | 127 | val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { | |
val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => { | 120 | 128 | val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => { | |
(a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, | 121 | 129 | (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, | |
a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, | 122 | 130 | a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, | |
a._2("comment").replaceAll("^$", "0").toDouble.toLong) | 123 | 131 | a._2("comment").replaceAll("^$", "0").toDouble.toLong) | |
}).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) | 124 | 132 | }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) | |
(x._1, innerSum) | 125 | 133 | (x._1, innerSum) | |
}).join(uidAndCids) | 126 | 134 | }).join(uidAndCids) | |
127 | 135 | |||
128 | 136 | |||
val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { | 129 | 137 | val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { | |
val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2, | 130 | 138 | val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2, | |
x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2, | 131 | 139 | x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2, | |
x._2._1._5.toDouble / x._2._2) | 132 | 140 | x._2._1._5.toDouble / x._2._2) | |
(x._1, ap) | 133 | 141 | (x._1, ap) | |
}) | 134 | 142 | }) | |
135 | 143 | |||
val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ | 136 | 144 | val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ | |
import math.{min,max} | 137 | 145 | import math.{min,max} | |
val s1 = min(max(500, x._2._1),25000) | 138 | 146 | val s1 = min(max(500, x._2._1),25000) | |
val s2 = min(max(100, x._2._1),2000) | 139 | 147 | val s2 = min(max(100, x._2._1),2000) | |
val s3 = min(max(10, x._2._1),2000) | 140 | 148 | val s3 = min(max(10, x._2._1),2000) | |
val s4 = min(max(10, x._2._1),2000) | 141 | 149 | val s4 = min(max(10, x._2._1),2000) | |
val s5 = min(max(100, x._2._1),3000) | 142 | 150 | val s5 = min(max(100, x._2._1),3000) | |
(x._1, (s1,s2,s3,s4,s5)) | 143 | 151 | (x._1, (s1,s2,s3,s4,s5)) | |
}) | 144 | 152 | }) | |
145 | 153 | |||
val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ | 146 | 154 | val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ | |
val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5 | 147 | 155 | val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5 | |
(x._1, score/5) | 148 | 156 | (x._1, score/5) | |
}) | 149 | 157 | }) | |
150 | 158 | |||
val performance = make_0to1(sumPerformance) | 151 | 159 | val performance = make_0to1(sumPerformance) | |
/* | 152 | 160 | /* | |
val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ | 153 | 161 | val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ | |
val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+ | 154 | 162 | val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+ | |
tr_Weibull(x._2._2.toDouble, 1, 0.5)+ | 155 | 163 | tr_Weibull(x._2._2.toDouble, 1, 0.5)+ | |
tr_Weibull(x._2._3.toDouble, 1, 0.5)+ | 156 | 164 | tr_Weibull(x._2._3.toDouble, 1, 0.5)+ | |
tr_Weibull(x._2._4.toDouble, 1, 0.5)+ | 157 | 165 | tr_Weibull(x._2._4.toDouble, 1, 0.5)+ | |
tr_Weibull(x._2._5.toDouble, 1, 0.5) | 158 | 166 | tr_Weibull(x._2._5.toDouble, 1, 0.5) |