Commit 7393595ae89530e18e8b51ee9a08b3f34aa0ff22
1 parent
bec43ca4ca
Exists in
master
error in sql
Showing 2 changed files with 6 additions and 9 deletions Inline Diff
app/com/piki_ds/preprocess/CidValidation.scala
View file @
7393595
package com.piki_ds.preprocess | 1 | 1 | package com.piki_ds.preprocess | |
2 | 2 | |||
import org.apache.spark.sql.SQLContext | 3 | 3 | import org.apache.spark.sql.SQLContext | |
4 | 4 | |||
import com.piki_ds.utils.GetTextFile.getDBDump | 5 | 5 | import com.piki_ds.utils.GetTextFile.getDBDump | |
6 | 6 | |||
7 | 7 | |||
/** | 8 | 8 | /** | |
* Created by jungwon on 4/21/16. | 9 | 9 | * Created by jungwon on 4/21/16. | |
*/ | 10 | 10 | */ | |
11 | 11 | |||
object CidValidation { | 12 | 12 | object CidValidation { | |
13 | 13 | |||
def getCidByStatuses(sQLContext: SQLContext, filterStatus:Array[String], filterUdate:(String,String)) = { | 14 | 14 | def getCidByStatuses(sQLContext: SQLContext, filterStatus:Array[String], filterUdate:(String,String)) = { | |
import org.apache.spark.sql.functions._ | 15 | 15 | import org.apache.spark.sql.functions._ | |
val whereStr = s"udate between ${filterUdate._1} and ${filterUdate._2} and title is not null and" + | 16 | 16 | val whereStr = s"udate between ${filterUdate._1} and ${filterUdate._2} and title is not null and" + | |
s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " + | 17 | 17 | s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " + | |
s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})" | 18 | 18 | s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})" | |
val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr) | 19 | 19 | val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr) | |
val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate"))) | 20 | 20 | val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate"))) | |
mgContents.map(x=>{ | 21 | 21 | mgContents.map(x=>{ | |
val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)") | 22 | 22 | val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)") | |
val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0 | 23 | 23 | val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0 | |
(x.getAs[Long]("contents_id"), (status, ts)) | 24 | 24 | (x.getAs[Long]("contents_id"), (status, ts)) | |
}).reduceByKey((a,b) => { | 25 | 25 | }).reduceByKey((a,b) => { | |
import math.{min,max} | 26 | 26 | import math.{min,max} | |
(max(a._1,b._1), min(a._2,b._2)) | 27 | 27 | (max(a._1,b._1), min(a._2,b._2)) | |
28 | 28 | |||
}) | 29 | 29 | }) | |
} | 30 | 30 | } | |
31 | 31 | |||
def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = { | 32 | 32 | def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = { | |
import org.apache.spark.sql.functions._ | 33 | 33 | import org.apache.spark.sql.functions._ | |
val whereStr = s"udate is not null and id is not null and title is not null and" + | 34 | 34 | val whereStr = s"udate is not null and contents_id is not null and title is not null and" + | |
s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " + | 35 | 35 | s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " + | |
s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})" | 36 | 36 | s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})" | |
val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr) | 37 | 37 | val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr) | |
val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate"))) | 38 | 38 | val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate"))) | |
mgContents.map(x=>{ | 39 | 39 | mgContents.map(x=>{ | |
val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)") | 40 | 40 | val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)") | |
val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0 | 41 | 41 | val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0 |
app/com/piki_ds/ver1/EditorScore.scala
View file @
7393595
package com.piki_ds.ver1 | 1 | 1 | package com.piki_ds.ver1 | |
2 | 2 | |||
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} | 3 | |||
import org.jblas.MatrixFunctions | 4 | |||
5 | ||||
import org.apache.spark.rdd.RDD | 6 | 3 | import org.apache.spark.rdd.RDD | |
4 | import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} | |||
import org.apache.spark.sql.SQLContext | 7 | 5 | import org.apache.spark.sql.SQLContext | |
import org.apache.spark.{SparkConf, SparkContext} | 8 | 6 | import org.apache.spark.{SparkConf, SparkContext} | |
7 | import org.jblas.MatrixFunctions | |||
9 | 8 | |||
import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN | 10 | 9 | import com.piki_ds.utils.DateTimeEtc.getDateKey | |
import com.piki_ds.utils.GetTextFile.getDashTable | 11 | 10 | import com.piki_ds.utils.GetTextFile.getDashTable | |
import com.piki_ds.utils.SqlContextConf.readPartialTable | 12 | 11 | import com.piki_ds.utils.SqlContextConf.readPartialTable | |
import com.piki_ds.utils.TempScoreSaveLoad | 13 | 12 | import com.piki_ds.utils.TempScoreSaveLoad | |
import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key | 14 | 13 | import com.piki_ds.utils.GeneralTransformers.make_0to1_2Key | |
import com.piki_ds.ver1.DashRelatedScore.previousDash | 15 | 14 | import com.piki_ds.ver1.DashRelatedScore.previousDash | |
import com.piki_ds.utils.hbase.HbaseInserter | 16 | 15 | import com.piki_ds.utils.hbase.HbaseInserter | |
17 | 16 | |||
/** | 18 | 17 | /** | |
* Created by jungwon on 5/2/16. | 19 | 18 | * Created by jungwon on 5/2/16. | |
*/ | 20 | 19 | */ | |
21 | 20 | |||
object EditorScore { | 22 | 21 | object EditorScore { | |
23 | 22 | |||
def getSparkConf= { | 24 | 23 | def getSparkConf= { | |
val conf = new SparkConf().setAppName("EditorScore") | 25 | 24 | val conf = new SparkConf().setAppName("EditorScore") | |
conf.setMaster("local[3]") | 26 | 25 | conf.setMaster("local[3]") | |
conf.set("master", "local[3]") | 27 | 26 | conf.set("master", "local[3]") | |
conf.set("spark.app.name", "EditorScore") | 28 | 27 | conf.set("spark.app.name", "EditorScore") | |
conf.set("spark.driver.allowMultipleContexts", "true") | 29 | 28 | conf.set("spark.driver.allowMultipleContexts", "true") | |
} | 30 | 29 | } | |
31 | 30 | |||
val sc = new SparkContext(getSparkConf) | 32 | 31 | val sc = new SparkContext(getSparkConf) | |
val sqlContext = SQLContext.getOrCreate(sc) | 33 | 32 | val sqlContext = SQLContext.getOrCreate(sc) | |
val hadoopConf = sc.hadoopConfiguration | 34 | 33 | val hadoopConf = sc.hadoopConfiguration | |
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) | 35 | 34 | val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) | |
36 | 35 | |||
def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { | 37 | 36 | def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = { | |
val list = hdfs.listStatus(new Path(path)) | 38 | 37 | val list = hdfs.listStatus(new Path(path)) | |
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) | 39 | 38 | list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime) | |
} | 40 | 39 | } | |
41 | 40 | |||
def followGetter(sQLContext: SQLContext, dateKey:String) = { | 42 | 41 | def followGetter(sQLContext: SQLContext, dateKey:String) = { | |
43 | ||||
val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString) | 44 | 42 | val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",true,fs).getPath.toString) | |
val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) | 45 | 43 | val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey) | |
val unionFrom = fromExisting.unionAll(fromUpdated) | 46 | 44 | val unionFrom = fromExisting.unionAll(fromUpdated) | |
val intoMap = unionFrom.map(x=>{ | 47 | 45 | val intoMap = unionFrom.map(x=>{ | |
Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) | 48 | 46 | Map("cdate" -> x.getAs[String](0).toString, "follow_cnt" -> x.getAs[Long](1).toString, "uid" -> x.getAs[Long](3).toString) | |
}) | 49 | 47 | }) | |
val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum)) | 50 | 48 | val follow_info = intoMap.groupBy(x=>x("uid")).map(x=>(x._1, x._2.map(a=>a("follow_cnt").toLong).sum)) | |
unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") | 51 | 49 | unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") | |
follow_info | 52 | 50 | follow_info | |
} | 53 | 51 | } | |
54 | 52 | |||
def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = { | 55 | 53 | def editorDB(sQLContext: SQLContext, dateKey:String): RDD[(String, String, Long, Long)] = { | |
// DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) | 56 | 54 | // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-) | |
val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") | 57 | 55 | val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") | |
val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" | 58 | 56 | val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" | |
val fetchVar = Array("uid","level","name") | 59 | 57 | val fetchVar = Array("uid","level","name") | |
val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) | 60 | 58 | val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) | |
val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) | 61 | 59 | val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) | |
62 | 60 | |||
// DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 | 63 | 61 | // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기 | |
val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey) | 64 | 62 | val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey) | |
65 | 63 | |||
val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L)))) | 66 | 64 | val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.leftOuterJoin(follow_info).map(x=>(x._1,(x._2._1,x._2._2.getOrElse(10L)))) | |
67 | 65 | |||
// DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 | 68 | 66 | // DB에서 MG_CONTENTS table 파싱해오기, 에디터 debut date 가져오기 | |
val filterS_mgcont = " where uid is not null group by uid" | 69 | 67 | val filterS_mgcont = " where uid is not null group by uid" | |
val fetchVar_mgcont = Array("uid", "min(udate)") | 70 | 68 | val fetchVar_mgcont = Array("uid", "min(udate)") | |
val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) | 71 | 69 | val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) | |
import org.apache.spark.sql.functions._ | 72 | 70 | import org.apache.spark.sql.functions._ | |
val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) | 73 | 71 | val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) | |
val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | 74 | 72 | val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | |
75 | 73 | |||
// uid, name, follow, debut | 76 | 74 | // uid, name, follow, debut | |
joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{ | 77 | 75 | joinedFollowInfo.leftOuterJoin(debut_info).map(x=>{ | |
(x._1,(x._2._1 ,x._2._2.getOrElse(10L))) | 78 | 76 | (x._1,(x._2._1 ,x._2._2.getOrElse(10L))) | |
}).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) | 79 | 77 | }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) | |
} | 80 | 78 | } | |
81 | 79 | |||
def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { | 82 | 80 | def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { | |
//나이의 단위: 일 | 83 | 81 | //나이의 단위: 일 | |
dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) | 84 | 82 | dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) | |
} | 85 | 83 | } | |
86 | 84 | |||
def tr_Weibull(raw:Double, lambd:Int, k:Double) = { | 87 | 85 | def tr_Weibull(raw:Double, lambd:Int, k:Double) = { | |
import scala.math.{exp, pow} | 88 | 86 | import scala.math.{exp, pow} | |
val intoD = raw/(1000*60*60*24) | 89 | 87 | val intoD = raw/(1000*60*60*24) | |
val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) | 90 | 88 | val transformed = (k/lambd)*pow(k/lambd,k-lambd)*exp(-pow(intoD/lambd,k)) | |
transformed | 91 | 89 | transformed | |
} | 92 | 90 | } | |
93 | 91 | |||
def main(args:Array[String]) { | 94 | 92 | def main(args:Array[String]) { | |
//현재시점 고정해놓기 [계산시점]Long = 1435261946000 | 95 | |||
val nowTS: Long = System.currentTimeMillis | 96 | 93 | val nowTS: Long = System.currentTimeMillis | |
val yesterdayTuple = intoYesterdayMN(nowTS) | 97 | 94 | val dateKey = getDateKey(nowTS) | |
val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8) | 98 | 95 | ||
//에디터 인기 점수 구하기 | 99 | 96 | //에디터 인기 점수 구하기 | |
val fromDB = editorDB(sqlContext, dateKey) | 100 | 97 | val fromDB = editorDB(sqlContext, dateKey) | |
101 | 98 | |||
val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | 102 | 99 | val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | |
103 | 100 | |||
//에디터 성과 점수 구하기 | 104 | 101 | //에디터 성과 점수 구하기 | |
val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) | 105 | 102 | val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) | |
106 | 103 | |||
val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") | 107 | 104 | val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") | |
108 | 105 | |||
val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) | 109 | 106 | val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) | |
110 | 107 | |||
val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> { | 111 | 108 | val performance: RDD[(String, Double)] = cPDB.groupBy(_._1).map(x=> { | |
val innerSum = x._2.map(a => { | 112 | 109 | val innerSum = x._2.map(a => { | |
(a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, | 113 | 110 | (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, | |
a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, | 114 | 111 | a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, | |
a._2("comment").replaceAll("^$", "0").toDouble.toLong) | 115 | 112 | a._2("comment").replaceAll("^$", "0").toDouble.toLong) |