From d00b05c6f35fdc3e116683546c34acf98057b6ce Mon Sep 17 00:00:00 2001 From: Joanne Date: Tue, 19 Jul 2016 15:07:10 +0900 Subject: [PATCH] editor score range --- app/com/piki_ds/ver1/EditorScore.scala | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/app/com/piki_ds/ver1/EditorScore.scala b/app/com/piki_ds/ver1/EditorScore.scala index 828fe77..f9697e4 100644 --- a/app/com/piki_ds/ver1/EditorScore.scala +++ b/app/com/piki_ds/ver1/EditorScore.scala @@ -21,7 +21,7 @@ import com.piki_ds.ver1.DashRelatedScore.previousDash object EditorScore { - def getSparkConf= { + def getSparkConf1= { val conf = new SparkConf().setAppName("EditorScore") conf.setMaster("local[3]") conf.set("master", "local[3]") @@ -29,17 +29,20 @@ object EditorScore { conf.set("spark.driver.allowMultipleContexts", "true") } - val sc = new SparkContext(getSparkConf) - val sqlContext = SQLContext.getOrCreate(sc) - val hadoopConf = sc.hadoopConfiguration - val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) + def getSparkConf2= { + val conf = new SparkConf().setAppName("EditorScore") + conf.setMaster("local[3]") + conf.set("master", "local[3]") + conf.set("spark.app.name", "EditorScore") + conf.set("spark.driver.allowMultipleContexts", "t") + } def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { val list = hdfs.listStatus(new Path(path)) list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) } - def followGetter(sQLContext: SQLContext, dateKey:String, fileSave:Boolean = true) = { + def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = { val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString) val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) val unionFrom = fromExisting.unionAll(fromUpdated) @@ -58,7 +61,7 @@ object EditorScore { follow_info } - def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = { + def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = { // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" @@ -67,9 +70,9 @@ object EditorScore { val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 - val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey) + val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs) - val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L)))) + val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L)))) // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 val filterS_mgcont = " where uid is not null group by uid" @@ -80,8 +83,8 @@ object EditorScore { val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) // uid, name, follow, debut - val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{ - (x._1,(x._2._1 ,x._2._2.getOrElse(10L))) + val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{ + (x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L))) }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4)) } @@ -99,11 +102,16 @@ object EditorScore { } def main(args:Array[String]) { + val sc = new SparkContext(getSparkConf1) + val sqlContext = SQLContext.getOrCreate(sc) + val hadoopConf = sc.hadoopConfiguration + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) + val nowTS: Long = System.currentTimeMillis val dateKey = getDateKey(nowTS) //에디터 인기 점수 구하기 - val fromDB = editorDB(sqlContext, dateKey) + val fromDB = editorDB(sqlContext, fs, dateKey) val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) -- 1.8.5.2