package com.piki_ds.ver1
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.MatrixFunctions
import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.GetTextFile.getDashTable
import com.piki_ds.utils.SqlContextConf.readPartialTable
import com.piki_ds.utils.TempScoreSaveLoad
import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key
import com.piki_ds.ver1.DashRelatedScore.previousDash
import com.piki_ds.utils.hbase.HbaseInserter
/**
* Created by jungwon on 5/2/16.
*/
object EditorScore {
def getSparkConf= {
val conf = new SparkConf().setAppName("EditorScore")
conf.setMaster("local[3]")
conf.set("master", "local[3]")
conf.set("spark.app.name", "EditorScore")
conf.set("spark.driver.allowMultipleContexts", "true")
}
val sc = new SparkContext(getSparkConf)
val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
}
def followGetter(sQLContext: SQLContext, dateKey:String) = {
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString)
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
val unionFrom = fromExisting.unionAll(fromUpdated)
val intoMap = unionFrom.map(x=>{
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString)
})
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum))
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey")
follow_info
}
def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = {
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
val fetchVar = Array("uid","level","name")
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user)
val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey)
val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L))))
// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기
val filterS_mgcont = " where uid is not null group by uid"
val fetchVar_mgcont = Array("uid", "min(udate)")
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont)
import org.apache.spark.sql.functions._
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)")))
val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)")))
// uid, name, follow, debut
joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{
(x._1,(x._2._1 ,x._2._2.getOrElse(10L)))
}).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2))
}
def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = {
//나이의 단위: 일
dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4)))
}
def tr_Weibull(raw:Double, lambd:Int, k:Double) = {
import scala.math.{exp, pow}
val intoD = raw/(1000*60*60*24)
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k))
transformed
}
def main(args:Array[String]) {
val nowTS: Long = System.currentTimeMillis
val dateKey = getDateKey(nowTS)
//에디터 인기 점수 구하기
val fromDB = editorDB(sqlContext, dateKey)
val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
//에디터 성과 점수 구하기
val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x))
val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/")
val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2)
val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> {
val innerSum = x._2.map(a => {
(a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong,
a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong,
a._2("comment").replaceAll("^$", "0").toDouble.toLong)
}).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
(x._1, innerSum)
}).map(x=>{
val score = tr_Weibull(x._2._1.toDouble, 1, 0.5)+
tr_Weibull(x._2._2.toDouble, 1, 0.5)+
tr_Weibull(x._2._3.toDouble, 1, 0.5)+
tr_Weibull(x._2._4.toDouble, 1, 0.5)+
tr_Weibull(x._2._5.toDouble, 1, 0.5)
(x._1, score/5)
})
val finalScore = ePopularity.map(x=>{
(x._1._1,x._2)
}).leftOuterJoin(performance).map(x=>(x._1,(x._2._1+x._2._2.getOrElse(0.5D))/2)).filter(x=>x._1.nonEmpty && ! x._2.isNaN)
val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong))
TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1)
val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect
val test = new HbaseInserter("editor_score")
test.insert(insertArray)
}
}