Commit 001a8f7f4ba9ab99f5ee5d4534dd1c573da04b91

Authored by Joanne ago
1 parent c9e19f40ee
Exists in master

finalizing ver1 score

Showing 4 changed files with 299 additions and 0 deletions Side-by-side Diff

app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala View file @ 001a8f7
  1 +package com.piki_ds.preprocess
  2 +
  3 +
  4 +import scala.collection.immutable.IndexedSeq
  5 +
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.sql._
  8 +import org.apache.spark.{SparkConf, SparkContext}
  9 +import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
  10 +
  11 +import com.piki_ds.utils.DateTimeEtc.{intoYesterdayMN,makeDateList}
  12 +import com.piki_ds.utils.SqlContextConf._
  13 +import com.piki_ds.utils.GetTextFile.getDashTable
  14 +
  15 +/**
  16 + * Created by jungwon on 5/2/16.
  17 + */
  18 +
  19 +object WeeklyECTbyGroup {
  20 +
  21 + def getSparkConf= {
  22 + //System.setProperty("SPARK_YARN_MODE", "true")
  23 + val conf = new SparkConf().setAppName("WeeklyECTbyGroup")
  24 + conf.setMaster("yarn-client")
  25 + conf.set("master", "yarn-client")
  26 + conf.set("spark.app.name", "WeeklyECTbyGroup")
  27 + conf.set("spark.akka.frameSize", "1024")
  28 + }
  29 +
  30 + val sc = new SparkContext(getSparkConf)
  31 + val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
  32 + val hadoopConf = sc.hadoopConfiguration
  33 + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  34 +
  35 + def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
  36 + val list = hdfs.listStatus(new Path(path))
  37 + list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
  38 + }
  39 + /**
  40 + * 대쉬보드 기준으로 체류시간 계산
  41 + *
  42 + * @param whichDate: 계산하는 날 (어제)
  43 + * @return : RDD[
  44 + * (cid, consumeTime)]
  45 + */
  46 + def getWeeklyCT (whichDate:String) = {
  47 + val oneWeek = makeDateList(whichDate,7,false,"yyyyMMdd")
  48 + val infoWeek: IndexedSeq[RDD[Map[String, String]]] = oneWeek.map(d=>{
  49 + try {
  50 + val updatesDF = getDashTable(sqlContext,"CONTENTS_REPORT",d)
  51 + val fetchVar = Array("cid",
  52 + "expCnt",
  53 + "expTime",
  54 + "consumeTime",
  55 + "ctr",
  56 + "view",
  57 + "uview",
  58 + "bookmark",
  59 + "share",
  60 + "comment",
  61 + "like")
  62 + val selectedDF= updatesDF.select(fetchVar(0),fetchVar(1),fetchVar(2),fetchVar(3),fetchVar(4),fetchVar(5),
  63 + fetchVar(6),fetchVar(7),fetchVar(8),fetchVar(9),fetchVar(10))
  64 + val updatesRaw: RDD[Map[String, String]] = dFToRDDMap(selectedDF,fetchVar)
  65 + updatesRaw
  66 + } catch {
  67 + case e:Exception => sc.emptyRDD[Map[String, String]]
  68 + }
  69 + })
  70 + val weekInfo = sc.union(infoWeek)
  71 + weekInfo.map(x=>(x("cid"),x("consumeTime").toLong)).reduceByKey((a,b) => a+b).map(x=>(x._1, math.min(x._2,4000000L)))
  72 + }
  73 +
  74 + /**
  75 + * cid의 카드 크기를 계산
  76 + *
  77 + * @return : RDD[
  78 + * (cid, cardSize)]
  79 + */
  80 + def getCardSize () = {
  81 +
  82 + val fp = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_CARD",true,fs).getPath.toString
  83 + val totalTable = sqlContext.read.format("parquet").load(fp).where("status='ACTV'")
  84 + import org.apache.spark.sql.functions._
  85 + val selectedTable = totalTable.groupBy("contents_id").agg(count("ordering"))
  86 + val selectedRDD = selectedTable.map(x=>(x.getAs[Long](0).toString,x.getAs[Long](1).toInt))
  87 + selectedRDD
  88 + }
  89 +
  90 + /**
  91 + * cid와 uid 매핑
  92 + *
  93 + * @return : RDD[
  94 + * (uid, cid)
  95 + * ]
  96 + */
  97 + def cidWithUid () = {
  98 + /*
  99 + val filterString = "where uid is not null"
  100 + val fetchCol = Array("contents_id", "uid")
  101 + val totalTable = readPartialTableBig(sqlContext, "REPL","new_pikicast_common","MG_CONTENTS",fetchCol,filterString,"contents_id",maxCid,1000)
  102 + */
  103 + val fp = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_CONTENTS",true,fs).getPath.toString
  104 + val totalTable = sqlContext.read.format("parquet").load(fp).where("uid is not null").select("contents_id", "uid")
  105 + totalTable.map(x=>(x.getAs[Long](0).toString, x.getAs[Long](1).toString))
  106 + }
  107 +
  108 + def main (args: Array[String]) {
  109 + val nowTS: Long = System.currentTimeMillis
  110 + val yesterdayTuple = intoYesterdayMN(nowTS)
  111 + val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  112 +
  113 + val maxCid: Long = getMaxId(sqlContext,"content")
  114 + val weeklyCT = getWeeklyCT(dateKey)
  115 +
  116 + val cardSize = getCardSize()
  117 + cardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$dateKey")
  118 +
  119 + val ctbyCardSize = weeklyCT.join(cardSize).groupBy(x=>x._2._2).map(x=>{
  120 + val consumeTime = x._2.map(_._2._1).sum
  121 + (x._1,consumeTime)
  122 + })
  123 + ctbyCardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/ctByCardSize/$dateKey")
  124 +
  125 + val cidwithUid = cidWithUid()
  126 + cidwithUid.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidWithUid/$dateKey")
  127 + }
  128 +
  129 +
  130 +
  131 +}
app/com/piki_ds/ver1/EditorScore.scala View file @ 001a8f7
  1 +package com.piki_ds.score
  2 +
  3 +import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
  4 +import org.jblas.MatrixFunctions
  5 +
  6 +import org.apache.spark.rdd.RDD
  7 +import org.apache.spark.sql.SQLContext
  8 +import org.apache.spark.{SparkConf, SparkContext}
  9 +
  10 +import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
  11 +import com.piki_ds.utils.GetTextFile.getDashTable
  12 +import com.piki_ds.utils.SqlContextConf.readPartialTable
  13 +import com.piki_ds.utils.TempScoreSaveLoad
  14 +import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key
  15 +import com.piki_ds.ver1.DashRelatedScore.previousDash
  16 +import com.piki_ds.utils.hbase.HbaseInserter
  17 +
  18 +/**
  19 + * Created by jungwon on 5/2/16.
  20 + */
  21 +
  22 +object EditorScore {
  23 +
  24 + def getSparkConf= {
  25 + val conf = new SparkConf().setAppName("EditorScore")
  26 + conf.setMaster("local[3]")
  27 + conf.set("master", "local[3]")
  28 + conf.set("spark.app.name", "EditorScore")
  29 + conf.set("spark.driver.allowMultipleContexts", "true")
  30 + }
  31 +
  32 + val sc = new SparkContext(getSparkConf)
  33 + val sqlContext = SQLContext.getOrCreate(sc)
  34 + val hadoopConf = sc.hadoopConfiguration
  35 + val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
  36 +
  37 + def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
  38 + val list = hdfs.listStatus(new Path(path))
  39 + list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
  40 + }
  41 +
  42 + def followGetter(sQLContext: SQLContext, dateKey:String) = {
  43 +
  44 + val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString)
  45 + val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
  46 + val unionFrom = fromExisting.unionAll(fromUpdated)
  47 + val intoMap = unionFrom.map(x=>{
  48 + Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
  49 + })
  50 + val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum))
  51 + unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
  52 + follow_info
  53 + }
  54 +
  55 + def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = {
  56 + // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
  57 + val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
  58 + val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
  59 + val fetchVar = Array("uid","level","name")
  60 + val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
  61 + val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
  62 +
  63 + // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
  64 + val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey)
  65 +
  66 + val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L))))
  67 +
  68 + // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
  69 + val filterS_mgcont = " where uid is not null group by uid"
  70 + val fetchVar_mgcont = Array("uid", "min(udate)")
  71 + val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
  72 + import org.apache.spark.sql.functions._
  73 + val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
  74 + val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
  75 +
  76 + // uid, name, follow, debut
  77 + joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{
  78 + (x._1,(x._2._1 ,x._2._2.getOrElse(10L)))
  79 + }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
  80 + }
  81 +
  82 + def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
  83 + //나이의 단위: 일
  84 + dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
  85 + }
  86 +
  87 + def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
  88 + import scala.math.{exp, pow}
  89 + val intoD = raw/(1000*60*60*24)
  90 + val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
  91 + transformed
  92 + }
  93 +
  94 + def main(args:Array[String]) {
  95 + //현재시점 고정해놓기 [계산시점]Long = 1435261946000
  96 + val nowTS: Long = System.currentTimeMillis
  97 + val yesterdayTuple = intoYesterdayMN(nowTS)
  98 + val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
  99 + //에디터 인기 점수 구하기
  100 + val fromDB = editorDB(sqlContext, dateKey)
  101 +
  102 + val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
  103 +
  104 + //에디터 성과 점수 구하기
  105 + val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))
  106 +
  107 + val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")
  108 +
  109 + val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)
  110 +
  111 + val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> {
  112 + val innerSum = x._2.map(a => {
  113 + (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
  114 + a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
  115 + a._2("comment").replaceAll("^$", "0").toDouble.toLong)
  116 + }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
  117 + (x._1, innerSum)
  118 + }).map(x=>{
  119 + val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
  120 + tr_Weibull(x._2._2.toDouble, 1, 0.5)+
  121 + tr_Weibull(x._2._3.toDouble, 1, 0.5)+
  122 + tr_Weibull(x._2._4.toDouble, 1, 0.5)+
  123 + tr_Weibull(x._2._5.toDouble, 1, 0.5)
  124 + (x._1, score/5)
  125 + })
  126 +
  127 + val finalScore = ePopularity.map(x=>{
  128 + (x._1._1,x._2)
  129 + }).leftOuterJoin(performance).map(x=>(x._1,(x._2._1+x._2._2.getOrElse(0.5D))/2)).filter(x=>x._1.nonEmpty && ! x._2.isNaN)
  130 +
  131 + val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))
  132 +
  133 + TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)
  134 +
  135 + val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect
  136 + val test = new HbaseInserter("editor_score")
  137 + test.insert(insertArray)
  138 +
  139 + }
  140 +
  141 +}
combinedRun.sh View file @ 001a8f7
  1 +#!/bin/bash
  2 +
  3 +BASEDIR=$(dirname $0)
  4 +
  5 +if [ $1 == 'time' ]
  6 +then
  7 + sh $BASEDIR/score.sh com.piki_ds.ver1.TimeScore
  8 + sh $BASEDIR/score.sh com.piki_ds.ver1.TimeHyperParam
  9 +fi
  10 +
  11 +if [ $1 == 'content' ]
  12 +then
  13 + sh $BASEDIR/score.sh com.piki_ds.ver1.DashRelatedScore
  14 +
  15 + sh $BASEDIR/score.sh com.piki_ds.ver1.ConRejectScore
  16 + sh $BASEDIR/score.sh com.piki_ds.ver1.ConEfficiencyScore
  17 + sh $BASEDIR/score.sh com.piki_ds.ver1.ContentScore
  18 +fi
ggh_related.sh View file @ 001a8f7
  1 +#!/bin/bash
  2 +
  3 +BASEDIR=$(dirname $0)
  4 +sh $BASEDIR/score.sh com.piki_ds.preprocess.MakingLapse
  5 +sh $BASEDIR/score.sh com.piki_ds.preprocess.MapWithRank
  6 +
  7 +sh $BASEDIR/combinedRun.sh content
  8 +sh $BASEDIR/score.sh com.piki_ds.ver1.EditorScore
  9 +sh $BASEDIR/score.sh com.piki_ds.ver1.QualityScore