Commit dca1584aece4e47d38468617b64e47b601167e84

Authored by Joanne ago
1 parent be098b973b
Exists in master

debug

Showing 1 changed file with 3 additions and 13 deletions Inline Diff

app/com/piki_ds/ver1/EditorScore.scala View file @ dca1584
package com.piki_ds.ver1 1 1 package com.piki_ds.ver1
2 2
//import breeze.linalg.min 3 3 //import breeze.linalg.min
import com.piki_ds.utils.FileDelete._ 4 4 import com.piki_ds.utils.FileDelete._
import org.apache.spark.rdd.RDD 5 5 import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} 6 6 import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
import org.apache.spark.sql.SQLContext 7 7 import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext} 8 8 import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.MatrixFunctions 9 9 import org.jblas.MatrixFunctions
10 10
import com.piki_ds.utils.DateTimeEtc.getDateKey 11 11 import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.GetTextFile.getDashTable 12 12 import com.piki_ds.utils.GetTextFile.getDashTable
import com.piki_ds.utils.SqlContextConf.readPartialTable 13 13 import com.piki_ds.utils.SqlContextConf.readPartialTable
import com.piki_ds.utils.TempScoreSaveLoad 14 14 import com.piki_ds.utils.TempScoreSaveLoad
import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Keys,make_0to1} 15 15 import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Keys,make_0to1}
import com.piki_ds.ver1.DashRelatedScore.previousDash 16 16 import com.piki_ds.ver1.DashRelatedScore.previousDash
//import com.piki_ds.utils.hbase.HbaseInserter 17 17 //import com.piki_ds.utils.hbase.HbaseInserter
18 18
/** 19 19 /**
* Created by jungwon on 5/2/16. 20 20 * Created by jungwon on 5/2/16.
*/ 21 21 */
22 22
object EditorScore { 23 23 object EditorScore {
24 24
def getSparkConf1= { 25 25 def getSparkConf1= {
val conf = new SparkConf().setAppName("EditorScore") 26 26 val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]") 27 27 conf.setMaster("local[3]")
conf.set("master", "local[3]") 28 28 conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore") 29 29 conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "true") 30 30 conf.set("spark.driver.allowMultipleContexts", "true")
} 31 31 }
32 32
def getSparkConf2= { 33 33 def getSparkConf2= {
val conf = new SparkConf().setAppName("EditorScore") 34 34 val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]") 35 35 conf.setMaster("local[3]")
conf.set("master", "local[3]") 36 36 conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore") 37 37 conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "t") 38 38 conf.set("spark.driver.allowMultipleContexts", "t")
} 39 39 }
40 40
def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { 41 41 def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path)) 42 42 val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) 43 43 list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
} 44 44 }
45 45
def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean) = { 46 46 def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean) = {
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString) 47 47 val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) 48 48 val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
val unionFrom = fromExisting.unionAll(fromUpdated) 49 49 val unionFrom = fromExisting.unionAll(fromUpdated)
val intoMap = unionFrom.map(x=>{ 50 50 val intoMap = unionFrom.map(x=>{
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) 51 51 Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
}) 52 52 })
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{ 53 53 val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{
val cuml = x._2.map(a=>a("follow_cnt").toLong).sum 54 54 val cuml = x._2.map(a=>a("follow_cnt").toLong).sum
val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong) 55 55 val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong)
val followScore = 0.1*cuml+ts.take(10).sum 56 56 val followScore = 0.1*cuml+ts.take(10).sum
(x._1.toLong, followScore.toLong) 57 57 (x._1.toLong, followScore.toLong)
}) 58 58 })
if (fileSave) { 59 59 if (fileSave) {
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") 60 60 unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
} 61 61 }
follow_info 62 62 follow_info
} 63 63 }
64 64
def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(Long, ((String, Long), Long))] = { 65 65 def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(Long, ((String, Long), Long))] = {
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) 66 66 // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") 67 67 val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
val filterS_user = s"where level in (${levels.map(x=>"'"+x+"'").mkString(",")})" 68 68 val filterS_user = s"where level in (${levels.map(x=>"'"+x+"'").mkString(",")})"
val fetchVar = Array("uid","level","name") 69 69 val fetchVar = Array("uid","level","name")
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) 70 70 val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
// userInfo : (uid, 계정명) 71 71 // userInfo : (uid, 계정명)
val userInfo: RDD[(Long, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid"), x.getAs[String]("name"))) 72 72 val userInfo: RDD[(Long, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid"), x.getAs[String]("name")))
73 73
// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 74 74 // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
val filterS_mgcont = " where uid is not null group by uid" 75 75 val filterS_mgcont = " where uid is not null group by uid"
val fetchVar_mgcont = Array("uid", "min(udate)") 76 76 val fetchVar_mgcont = Array("uid", "min(udate)")
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) 77 77 val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
import org.apache.spark.sql.functions._ 78 78 import org.apache.spark.sql.functions._
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) 79 79 val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
//debutInfo: (uid, debut in timestamp) 80 80 //debutInfo: (uid, debut in timestamp)
val debutInfo: RDD[(Long, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid"), x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) 81 81 val debutInfo: RDD[(Long, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid"), x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
82 82
val uidOfInterest: RDD[(Long, (String, Long))] = userInfo.fullOuterJoin(debutInfo).map(x=>{ 83 83 val uidOfInterest: RDD[(Long, (String, Long))] = userInfo.fullOuterJoin(debutInfo).map(x=>{
(x._1, (x._2._1.getOrElse(""), x._2._2.getOrElse(0L))) 84 84 (x._1, (x._2._1.getOrElse(""), x._2._2.getOrElse(0L)))
}) 85 85 })
86 86
// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 87 87 // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
val follow_info: RDD[(Long, Long)] = followGetter(sQLContext, dateKey,fs,fileSave) 88 88 val follow_info: RDD[(Long, Long)] = followGetter(sQLContext, dateKey,fs,fileSave)
// uid, ((name, debut), follow) 89 89 // uid, ((name, debut), follow)
val joinedFollowInfo: RDD[(Long, ((String, Long), Long))] = uidOfInterest.leftOuterJoin(follow_info).map(x=>{ 90 90 val joinedFollowInfo: RDD[(Long, ((String, Long), Long))] = uidOfInterest.leftOuterJoin(follow_info).map(x=>{
(x._1, (x._2._1, math.min(20000, x._2._2.getOrElse(0L)))) 91 91 (x._1, (x._2._1, math.min(20000, x._2._2.getOrElse(0L))))
}) 92 92 })
joinedFollowInfo 93 93 joinedFollowInfo
} 94 94 }
95 95
/** 96 96 /**
* 97 97 *
* @param dBTable uid, ((name, debut), follow) 98 98 * @param dBTable uid, ((name, debut), follow)
* @param currentTS 99 99 * @param currentTS
* @return 100 100 * @return
*/ 101 101 */
def popularity(dBTable: RDD[(Long, ((String, Long), Long))], currentTS:Long) = { 102 102 def popularity(dBTable: RDD[(Long, ((String, Long), Long))], currentTS:Long) = {
//나이의 단위: 일 103 103 //나이의 단위: 일
dBTable.map(x=>((x._1, x._2._1._1), ((1000*60*60*24).toLong*x._2._2).toDouble/(currentTS-x._2._1._2))) 104 104 dBTable.map(x=>((x._1, x._2._1._1), ((1000*60*60*24).toLong*x._2._2).toDouble/(currentTS-x._2._1._2)))
} 105 105 }
106 106
def tr_Weibull(raw:Double, lambd:Int, k:Double) = { 107 107 def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
import scala.math.{exp, pow} 108 108 import scala.math.{exp, pow}
val intoD = raw/(1000*60*60*24) 109 109 val intoD = raw/(1000*60*60*24)
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) 110 110 val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
transformed 111 111 transformed
} 112 112 }
113 113
def main(args:Array[String]) { 114 114 def main(args:Array[String]) {
val sc = new SparkContext(getSparkConf1) 115 115 val sc = new SparkContext(getSparkConf1)
val sqlContext = SQLContext.getOrCreate(sc) 116 116 val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration 117 117 val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) 118 118 val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
119 119
val followFileSave = if (args.nonEmpty && args.head.toInt.equals(0)) false else true 120 120 val followFileSave = if (args.nonEmpty && args.head.toInt.equals(0)) false else true
val nowTS: Long = System.currentTimeMillis 121 121 val nowTS: Long = System.currentTimeMillis
val doi = getDateKey(nowTS) 122 122 val doi = getDateKey(nowTS)
123 123
//에디터 인기 점수 구하기 124 124 //에디터 인기 점수 구하기
val fromDB: RDD[(Long, ((String, Long), Long))] = editorDB(sqlContext, fs, doi,followFileSave) 125 125 val fromDB: RDD[(Long, ((String, Long), Long))] = editorDB(sqlContext, fs, doi,followFileSave)
126 126
val ePopularity: RDD[((Long, String), Double)] = make_0to1_2Keys(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) 127 127 val ePopularity: RDD[((Long, String), Double)] = make_0to1_2Keys(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
128 128
//에디터 성과 점수 구하기 129 129 //에디터 성과 점수 구하기
val dashInfo: RDD[(Long, Map[String, String])] = previousDash(doi).map(x=>(x("cid").toLong,x)) 130 130 val dashInfo: RDD[(Long, Map[String, String])] = previousDash(doi).map(x=>(x("cid").toLong,x))
131 131
val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$doi/") 132 132 val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$doi/")
133 133
val uidAndContents: RDD[(Long, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1.toLong,x._2.size)) 134 134 val uidAndContents: RDD[(Long, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1.toLong,x._2.size))
135 135
val cPDB: RDD[(Long, Map[String, String])] = cidAndUid.map(x=>(x._1.toLong, x._2.toLong)).join(dashInfo).map(_._2) 136 136 val cPDB: RDD[(Long, Map[String, String])] = cidAndUid.map(x=>(x._1.toLong, x._2.toLong)).join(dashInfo).map(_._2)
137 137
val performanceRaw: RDD[(Long, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { 138 138 val performanceRaw: RDD[(Long, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> {
val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => { 139 139 val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => {
(a._2("view").replaceAll("^$", "0").toLong, a._2("likes").replaceAll("^$", "0").toLong, 140 140 (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
a._2("share").replaceAll("^$", "0").toLong, a._2("bookmark").replaceAll("^$", "0").toLong, 141 141 a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
a._2("comment").replaceAll("^$", "0").toLong) 142 142 a._2("comment").replaceAll("^$", "0").toDouble.toLong)
}).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) 143 143 }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
(x._1, innerSum) 144 144 (x._1, innerSum)
}).join(uidAndContents) 145 145 }).join(uidAndContents)
val avgPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { 146 146 val avgPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2, 147 147 val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2, 148 148 x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
x._2._1._5.toDouble / x._2._2) 149 149 x._2._1._5.toDouble / x._2._2)
(x._1, ap) 150 150 (x._1, ap)
}) 151 151 })
152 152
val minMaxPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ 153 153 val minMaxPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{
import math.{min,max} 154 154 import math.{min,max}
val s1 = min(max(500, x._2._1),25000) 155 155 val s1 = min(max(500, x._2._1),25000)
val s2 = min(max(100, x._2._1),2000) 156 156 val s2 = min(max(100, x._2._1),2000)
val s3 = min(max(10, x._2._1),2000) 157 157 val s3 = min(max(10, x._2._1),2000)
val s4 = min(max(10, x._2._1),2000) 158 158 val s4 = min(max(10, x._2._1),2000)
val s5 = min(max(100, x._2._1),3000) 159 159 val s5 = min(max(100, x._2._1),3000)
(x._1, (s1,s2,s3,s4,s5)) 160 160 (x._1, (s1,s2,s3,s4,s5))
}) 161 161 })
162 162
val sumPerformance: RDD[(Long, Double)] = minMaxPerformance.map(x=>{ 163 163 val sumPerformance: RDD[(Long, Double)] = minMaxPerformance.map(x=>{
val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5 164 164 val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5
(x._1, score/5) 165 165 (x._1, score/5)
}) 166 166 })
167 167
val performance = make_0to1(sumPerformance) 168 168 val performance = make_0to1(sumPerformance)
/* 169