package com.piki_ds.ver1 //import breeze.linalg.min import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.jblas.MatrixFunctions import com.piki_ds.utils.DateTimeEtc.getDateKey import com.piki_ds.utils.GetTextFile.getDashTable import com.piki_ds.utils.SqlContextConf.readPartialTable import com.piki_ds.utils.TempScoreSaveLoad import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1} import com.piki_ds.ver1.DashRelatedScore.previousDash //import com.piki_ds.utils.hbase.HbaseInserter /** * Created by jungwon on 5/2/16. */ object EditorScore { def getSparkConf1= { val conf = new SparkConf().setAppName("EditorScore") conf.setMaster("local[3]") conf.set("master", "local[3]") conf.set("spark.app.name", "EditorScore") conf.set("spark.driver.allowMultipleContexts", "true") } def getSparkConf2= { val conf = new SparkConf().setAppName("EditorScore") conf.setMaster("local[3]") conf.set("master", "local[3]") conf.set("spark.app.name", "EditorScore") conf.set("spark.driver.allowMultipleContexts", "t") } def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { val list = hdfs.listStatus(new Path(path)) list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) } def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = { val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString) val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) val unionFrom = fromExisting.unionAll(fromUpdated) val intoMap = unionFrom.map(x=>{ Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) }) val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>{ val cuml = x._2.map(a=>a("follow_cnt").toLong).sum val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong) val followScore = 0.1*cuml+ts.take(10).sum (x._1, followScore.toLong) }) if (fileSave) { unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") } follow_info } def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = { // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" val fetchVar = Array("uid","level","name") val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs) val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L)))) // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 val filterS_mgcont = " where uid is not null group by uid" val fetchVar_mgcont = Array("uid", "min(udate)") val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) import org.apache.spark.sql.functions._ val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) // uid, name, follow, debut val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{ (x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L))) }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4)) } def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { //나이의 단위: 일 dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) } def tr_Weibull(raw:Double, lambd:Int, k:Double) = { import scala.math.{exp, pow} val intoD = raw/(1000*60*60*24) val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) transformed } def main(args:Array[String]) { val sc = new SparkContext(getSparkConf1) val sqlContext = SQLContext.getOrCreate(sc) val hadoopConf = sc.hadoopConfiguration val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) val nowTS: Long = System.currentTimeMillis val dateKey = getDateKey(nowTS) //에디터 인기 점수 구하기 val fromDB = editorDB(sqlContext, fs, dateKey) val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) //에디터 성과 점수 구하기 val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size)) val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => { (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, a._2("comment").replaceAll("^$", "0").toDouble.toLong) }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) (x._1, innerSum) }).join(uidAndCids) val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2, x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2, x._2._1._5.toDouble / x._2._2) (x._1, ap) }) val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ import math.{min,max} val s1 = min(max(500, x._2._1),25000) val s2 = min(max(100, x._2._1),2000) val s3 = min(max(10, x._2._1),2000) val s4 = min(max(10, x._2._1),2000) val s5 = min(max(100, x._2._1),3000) (x._1, (s1,s2,s3,s4,s5)) }) val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5 (x._1, score/5) }) val performance = make_0to1(sumPerformance) /* val performance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+ tr_Weibull(x._2._2.toDouble, 1, 0.5)+ tr_Weibull(x._2._3.toDouble, 1, 0.5)+ tr_Weibull(x._2._4.toDouble, 1, 0.5)+ tr_Weibull(x._2._5.toDouble, 1, 0.5) (x._1, score/5) }) */ val finalScore = ePopularity.map(x=>{ (x._1._1,x._2) }).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D))).filter(x=>x._1.nonEmpty) val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong)) TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1) scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$dateKey") /* val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect val test = new HbaseInserter("editor_score") test.insert(insertArray) */ } }