Commit 3f5548ee2360aa9042266fc3528d2f4a64980d7e

Authored by Joanne ago
1 parent 8230469489
Exists in master

temp change for insertcidList

Showing 4 changed files with 51 additions and 4 deletions Inline Diff

app/com/piki_ds/preprocess/CidValidation.scala View file @ 3f5548e
package com.piki_ds.preprocess 1 1 package com.piki_ds.preprocess
2 2
import org.apache.spark.sql.SQLContext 3 3 import org.apache.spark.sql.SQLContext
4 4
import com.piki_ds.utils.GetTextFile.getDashDump 5 5 import com.piki_ds.utils.GetTextFile.getDBDump
6 6
7 7
/** 8 8 /**
* Created by jungwon on 4/21/16. 9 9 * Created by jungwon on 4/21/16.
*/ 10 10 */
11 11
object CidValidation { 12 12 object CidValidation {
13 13
def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = { 14 14 def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = {
import org.apache.spark.sql.functions._ 15 15 import org.apache.spark.sql.functions._
val whereStr = s"udate is not null and title is not null and" + 16 16 val whereStr = s"udate is not null and title is not null and" +
s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " + 17 17 s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " +
s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})" 18 18 s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})"
val mgc = getDashDump(sQLContext,"MG_CONTENTS").where(whereStr) 19 19 val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr)
val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate"))) 20 20 val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate")))
mgContents.map(x=>{ 21 21 mgContents.map(x=>{
val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)") 22 22 val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)")
val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0 23 23 val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0
(x.getAs[Long]("contents_id"), (status, ts)) 24 24 (x.getAs[Long]("contents_id"), (status, ts))
}).reduceByKey((a,b) => { 25 25 }).reduceByKey((a,b) => {
import math.{min,max} 26 26 import math.{min,max}
app/com/piki_ds/utils/GetTextFile.scala View file @ 3f5548e
package com.piki_ds.utils 1 1 package com.piki_ds.utils
2 2
import org.apache.spark.SparkContext 3 3 import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD 4 4 import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext,DataFrame} 5 5 import org.apache.spark.sql.{SQLContext,DataFrame}
6 6
/** 7 7 /**
* TextFile을 가져와서 RDD로 변환하는 모듈. log와 lapse를 가져올 때 활용 8 8 * TextFile을 가져와서 RDD로 변환하는 모듈. log와 lapse를 가져올 때 활용
* 9 9 *
* Created by jungwon on 7/14/15. 10 10 * Created by jungwon on 7/14/15.
*/ 11 11 */
12 12
object GetTextFile { 13 13 object GetTextFile {
14 14
def getDashDump(sQLContext: SQLContext, tableName:String) = { 15 15 def getDBDump(sQLContext: SQLContext, tableName:String) = {
sQLContext.read.parquet(s"hdfs://pikinn/preprocess/db/table=$tableName/") 16 16 sQLContext.read.parquet(s"hdfs://pikinn/preprocess/db/table=$tableName/")
} 17 17 }
18 18
/** 19 19 /**
* 관심 날짜의 로그를 가져온다 20 20 * 관심 날짜의 로그를 가져온다
* 21 21 *
* @param dateOfInterest : yyyyMMdd 22 22 * @param dateOfInterest : yyyyMMdd
* @return : delimiter로 split된 로그를 뱉어준다 23 23 * @return : delimiter로 split된 로그를 뱉어준다
*/ 24 24 */
def getLog(sc:SparkContext, dateOfInterest:String) = { 25 25 def getLog(sc:SparkContext, dateOfInterest:String) = {
println("DEBUG getLog : " + dateOfInterest ) 26 26 println("DEBUG getLog : " + dateOfInterest )
val yr = dateOfInterest.take(4) 27 27 val yr = dateOfInterest.take(4)
val mo = dateOfInterest.slice(4,6) 28 28 val mo = dateOfInterest.slice(4,6)
val day = dateOfInterest.slice(6,8) 29 29 val day = dateOfInterest.slice(6,8)
sc.textFile(s"hdfs://pikinn/log/dev=app/y=$yr/mo=$mo/d=$day/h=*/mi=*/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{ 30 30 sc.textFile(s"hdfs://pikinn/log/dev=app/y=$yr/mo=$mo/d=$day/h=*/mi=*/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
x.size>4 && x(2).forall(_.isDigit) && x(2).nonEmpty && 31 31 x.size>4 && x(2).forall(_.isDigit) && x(2).nonEmpty &&
x(3).forall(_.isDigit) && x(3).forall(_.isDigit)}).map(x=>{ 32 32 x(3).forall(_.isDigit) && x(3).forall(_.isDigit)}).map(x=>{
val genTime = if(x(2).length == 10) x(2)+"000" else x(2) 33 33 val genTime = if(x(2).length == 10) x(2)+"000" else x(2)
x(2) = genTime 34 34 x(2) = genTime
x 35 35 x
}) 36 36 })
} 37 37 }
38 38
/** 39 39 /**
* 미리 구해놓은 lapse 가져오기 40 40 * 미리 구해놓은 lapse 가져오기
* 41 41 *
* @param dateKey : yyyyMMdd 42 42 * @param dateKey : yyyyMMdd
* @return : ((EVENT, actionTime, uuid), lapse) 43 43 * @return : ((EVENT, actionTime, uuid), lapse)
*/ 44 44 */
45 45
def getLapse(sc:SparkContext, dateKey:String, action:String)= { 46 46 def getLapse(sc:SparkContext, dateKey:String, action:String)= {
val parseFilter = sc.textFile(s"hdfs://pikinn/preprocess/lapse/$dateKey/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{ 47 47 val parseFilter = sc.textFile(s"hdfs://pikinn/preprocess/lapse/$dateKey/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
x.size==7 && x(2).forall(_.isDigit) && x(3).forall(_.isDigit) && x(4).forall(_.isDigit) 48 48 x.size==7 && x(2).forall(_.isDigit) && x(3).forall(_.isDigit) && x(4).forall(_.isDigit)
}) 49 49 })
parseFilter.map(x=>{ 50 50 parseFilter.map(x=>{
((x(1),x(2),x(3),x(4)),x(6)) 51 51 ((x(1),x(2),x(3),x(4)),x(6))
}) 52 52 })
} 53 53 }
54 54
def getUserInfo(sc:SparkContext, dateOfInterest:String) = { 55 55 def getUserInfo(sc:SparkContext, dateOfInterest:String) = {
val c = java.util.Calendar.getInstance 56 56 val c = java.util.Calendar.getInstance
val format_date = new java.text.SimpleDateFormat("yyyyMMdd") 57 57 val format_date = new java.text.SimpleDateFormat("yyyyMMdd")
val startingPoint = format_date.parse(dateOfInterest) 58 58 val startingPoint = format_date.parse(dateOfInterest)
c.setTime(startingPoint) 59 59 c.setTime(startingPoint)
c.add(java.util.Calendar.DATE, -1) 60 60 c.add(java.util.Calendar.DATE, -1)
val fetchDate = format_date.format(c.getTime) 61 61 val fetchDate = format_date.format(c.getTime)
val textF:RDD[(String,Long)] = sc.objectFile(s"hdfs://pikinn/user/joanne/userReadAmt/$fetchDate/part*") 62 62 val textF:RDD[(String,Long)] = sc.objectFile(s"hdfs://pikinn/user/joanne/userReadAmt/$fetchDate/part*")
textF 63 63 textF
} 64 64 }
65 65
def getCardSize(sc:SparkContext, doi: String) = { 66 66 def getCardSize(sc:SparkContext, doi: String) = {
val cidAndCS: RDD[(String, Int)] = sc.objectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi/part*") 67 67 val cidAndCS: RDD[(String, Int)] = sc.objectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi/part*")
cidAndCS 68 68 cidAndCS
} 69 69 }
70 70
def parseHueDB(sc: SparkContext, dbDate:String, tableName:String, dbLabel:Array[String]) = { 71 71 def parseHueDB(sc: SparkContext, dbDate:String, tableName:String, dbLabel:Array[String]) = {
val dbDir = sc.textFile(s"/data/db/$dbDate/$tableName/\\*.gz") 72 72 val dbDir = sc.textFile(s"/data/db/$dbDate/$tableName/\\*.gz")
val dbRaw = dbDir.map(x=>{ 73 73 val dbRaw = dbDir.map(x=>{
import au.com.bytecode.opencsv.CSVParser 74 74 import au.com.bytecode.opencsv.CSVParser
val parser = new CSVParser(',',''') 75 75 val parser = new CSVParser(',',''')
val s = parser.parseLine(x) 76 76 val s = parser.parseLine(x)
if (s.size==dbLabel.length) Some(dbLabel.zip(s).toMap) else None 77 77 if (s.size==dbLabel.length) Some(dbLabel.zip(s).toMap) else None
}).flatMap(x=>x) 78 78 }).flatMap(x=>x)
dbRaw 79 79 dbRaw
} 80 80 }
81 81
def getContentsViewTable(sqlContext:SQLContext, table:String, saveDate:String) = { 82 82 def getContentsViewTable(sqlContext:SQLContext, table:String, saveDate:String) = {
app/com/piki_ds/ver1/InsertCidList.scala View file @ 3f5548e
File was created 1 package com.piki_ds.ver1
2
3 import com.piki_ds.utils.hbase.HbaseInserter
4 import org.apache.spark.SparkContext
5 import org.apache.spark.sql.SQLContext
6
7 /**
8 * Created by jungwon on 5/12/16.
9 */
10
11 object InsertCidList {
12
13 var sc: SparkContext = SparkContext.getOrCreate()
14 var sqlContext: SQLContext = SQLContext.getOrCreate(sc)
15
16 val modelName: Map[String, Seq[String]] = Map(
17 "uniform" ->
18 Seq("quality"),
19 "single"->
20 Seq(
21 "cf",
22 "topic",
23 "age",
24 "sex",
25 "w2v"
26 ),
27 "ensemble" ->
28 Seq(
29 "ensemble1",
30 "ensemble2",
31 "ensemble3",
32 "ensemble4"
33 )
34 )
35
36 modelName("single").foreach(model=> {
37 val doi = "20160508"
38 val getMax30 = sc.objectFile[(Int, String)](s"/user/joanne/clols/$doi/$model")
39 getMax30.groupBy(x=>x._1%1000).foreach(x=>{
40 val tableName = s"uuid-cidlist_$model"
app/com/piki_ds/ver1/QualityScore.scala View file @ 3f5548e
package com.piki_ds.ver1 1 1 package com.piki_ds.ver1
2 2
import org.apache.spark.{SparkContext, SparkConf} 3 3 import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD 4 4 import org.apache.spark.rdd.RDD
5 5
import com.piki_ds.utils.hbase.HbaseInserter 6 6 import com.piki_ds.utils.hbase.HbaseInserter
import com.piki_ds.utils.GeneralTransformers.make_0to1 7 7 import com.piki_ds.utils.GeneralTransformers.make_0to1
import com.piki_ds.utils.DateTimeEtc.getDateKey 8 8 import com.piki_ds.utils.DateTimeEtc.getDateKey
import com.piki_ds.utils.TempScoreSaveLoad._ 9 9 import com.piki_ds.utils.TempScoreSaveLoad._
10 10
/** 11 11 /**
* Created by jungwon on 5/2/16. 12 12 * Created by jungwon on 5/2/16.
*/ 13 13 */
14 14
object QualityScore { 15 15 object QualityScore {
16 16
def getSparkConf= { 17 17 def getSparkConf= {
val conf = new SparkConf().setAppName("QualityScore") 18 18 val conf = new SparkConf().setAppName("QualityScore")
conf.setMaster("yarn-client") 19 19 conf.setMaster("yarn-client")
conf.set("master", "yarn-client") 20 20 conf.set("master", "yarn-client")
conf.set("spark.app.name", "QualityScore") 21 21 conf.set("spark.app.name", "QualityScore")
conf.set("spark.akka.frameSize", "1024") 22 22 conf.set("spark.akka.frameSize", "1024")
} 23 23 }
24 24
val sc = new SparkContext(getSparkConf) 25 25 val sc = new SparkContext(getSparkConf)
26 26
def infoJoined_(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = { 27 27 def infoJoined_(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all") 28 28 val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","") 29 29 val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","")
val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap() 30 30 val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
val concom = content.fullOuterJoin(comment).map(x=>(x._1,(x._2._1.getOrElse(9999),x._2._2.getOrElse(9999)))) 31 31 val concom = content.fullOuterJoin(comment).map(x=>(x._1,(x._2._1.getOrElse(9999),x._2._2.getOrElse(9999))))
cuInfo.join(concom).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2._1},${x._2._2._2}"))) 32 32 cuInfo.join(concom).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2._1},${x._2._2._2}")))
} 33 33 }
34 34
def infoJoined(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = { 35 35 def infoJoined(cuInfo:RDD[(Int,Int)], doi:String): RDD[((Int, Int), Map[String, String])] = {
val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all") 36 36 val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap() 37 37 val editor: collection.Map[Int, Int] = scoreLoad(sc,doi,"editor","").collectAsMap()
cuInfo.join(content).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2}"))) 38 38 cuInfo.join(content).map(x=>((x._1,x._2._1), Map(doi -> s"${editor.getOrElse(x._2._1,9999)},${x._2._2}")))
} 39 39 }
40 40
def getScoresByDates(cuInfo: RDD[(Int,Int)], dateList:Array[String]) = { 41 41 def getScoresByDates(cuInfo: RDD[(Int,Int)], dateList:Array[String]) = {
val scores = dateList.map(d=> infoJoined(cuInfo, d)) 42 42 val scores = dateList.map(d=> infoJoined(cuInfo, d))
val groupedScore: RDD[((Int, Int), Map[String, String])] = sc.union(scores).reduceByKey(_++_) 43 43 val groupedScore: RDD[((Int, Int), Map[String, String])] = sc.union(scores).reduceByKey(_++_)
groupedScore.map(x=>s"${x._1._1},${x._1._2},${dateList.map(x._2.getOrElse(_,",,")).mkString(",")}") 44 44 groupedScore.map(x=>s"${x._1._1},${x._1._2},${dateList.map(x._2.getOrElse(_,",,")).mkString(",")}")
} 45 45 }
46 46
def joinEditorCid(editorScore: RDD[(String, Long)], editorInfo:RDD[(String, String)]) = { 47 47 def joinEditorCid(editorScore: RDD[(String, Long)], editorInfo:RDD[(String, String)]) = {
editorInfo.join(editorScore).map(_._2) 48 48 editorInfo.join(editorScore).map(_._2)
} 49 49 }
50 50
def combineScores(content:RDD[(Int,Int)], comment:RDD[(Int,Int)], editor:RDD[(Int,Int)], 51 51 def combineScores(content:RDD[(Int,Int)], comment:RDD[(Int,Int)], editor:RDD[(Int,Int)],
param:Map[String,Double]): RDD[(String, Long)] = { 52 52 param:Map[String,Double]): RDD[(String, Long)] = {
val combine = content.fullOuterJoin(comment).map(x=>{ 53 53 val combine = content.fullOuterJoin(comment).map(x=>{
(x._1, (x._2._1.getOrElse(120),x._2._2.getOrElse(1))) 54 54 (x._1, (x._2._1.getOrElse(120),x._2._2.getOrElse(1)))
}).fullOuterJoin(editor).map(x=>{ 55 55 }).fullOuterJoin(editor).map(x=>{
val b: (Int, (Option[(Int, Int)], Option[Int])) = x 56 56 val b: (Int, (Option[(Int, Int)], Option[Int])) = x
(x._1, (x._2._1.getOrElse((110,112)),x._2._2.getOrElse(121))) 57 57 (x._1, (x._2._1.getOrElse((110,112)),x._2._2.getOrElse(121)))
}).map(x=> { 58 58 }).map(x=> {
(x._1, (x._2._1._1.toDouble, x._2._1._2.toDouble, x._2._2.toDouble)) 59 59 (x._1, (x._2._1._1.toDouble, x._2._1._2.toDouble, x._2._2.toDouble))
}).map(x=>{ 60 60 }).map(x=>{
val score:Double = param("content")*x._2._1+param("comment")*x._2._2+param("editor")*x._2._3 61 61 val score:Double = param("content")*x._2._1+param("comment")*x._2._2+param("editor")*x._2._3
(x._1, score) 62 62 (x._1, score)
}).map(x=>(x._1.toString,x._2)) 63 63 }).map(x=>(x._1.toString,x._2))
make_0to1(combine).map(x=>(x._1,(x._2*1000).toLong)) 64 64 make_0to1(combine).map(x=>(x._1,(x._2*1000).toLong))
} 65 65 }
66 66
def main(args: Array[String]) { 67 67 def main(args: Array[String]) {
val nowTS: Long = System.currentTimeMillis 68 68 val nowTS: Long = System.currentTimeMillis