Commit ddba312d2607abb2cc6dbd4f35c76d9a4ad77757

Authored by Joanne ago
1 parent e34ddac2d1
Exists in master

coneffic error

Showing 4 changed files with 25 additions and 42 deletions Side-by-side Diff

app/com/piki_ds/preprocess/CidValidation.scala View file @ ddba312
... ... @@ -11,9 +11,9 @@
11 11  
12 12 object CidValidation {
13 13  
14   - def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = {
  14 + def getCidByStatuses(sQLContext: SQLContext, filterStatus:Array[String], filterUdate:(String,String)) = {
15 15 import org.apache.spark.sql.functions._
16   - val whereStr = s"udate is not null and title is not null and" +
  16 + val whereStr = s"udate between ${filterUdate._1} and ${filterUdate._2} and title is not null and" +
17 17 s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " +
18 18 s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})"
19 19 val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr)
... ... @@ -25,6 +25,7 @@
25 25 }).reduceByKey((a,b) => {
26 26 import math.{min,max}
27 27 (max(a._1,b._1), min(a._2,b._2))
  28 +
28 29 })
29 30 }
30 31 }
app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala View file @ ddba312
... ... @@ -69,7 +69,7 @@
69 69 val fp = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_CARD",true,fs).getPath.toString
70 70 val totalTable = sqlContext.read.format("parquet").load(fp).where("status='ACTV'")
71 71 import org.apache.spark.sql.functions._
72   - val selectedTable = totalTable.groupBy("contents_id").agg(count("ordering"))
  72 + val selectedTable = totalTable.groupBy("contents_id").agg(count("ordering")).na.drop()
73 73 val selectedRDD = selectedTable.map(x=>(x.getAs[Long](0).toString,x.getAs[Long](1).toInt))
74 74 selectedRDD
75 75 }
app/com/piki_ds/ver1/ConEfficiencyScore.scala View file @ ddba312
... ... @@ -50,10 +50,10 @@
50 50 */
51 51  
52 52 def getRankMean(mappedRanges: RDD[(Int, (Int, Int), (Long, Long))]) = {
53   - val preCal = mappedRanges.filter(x => x._3._1 != 0).groupBy(x => x._2).map(x => (x._1, (x._2.map(a => a._3._1).sum, x._2.size)))
54   - preCal.groupBy(x=>x._1._1).map(x=>(x._1, (x._2.map(a=>a._2._1).sum,x._2.map(a=>a._2._2).sum))).map(x=>{
55   - val mean = x._2._1.toDouble/x._2._2
56   - (x._1, mean.toLong)
  53 + mappedRanges.filter(x => x._3._1 != 0).groupBy(_._2._1).map(x=>{
  54 + val etSum = x._2.map(_._3._1).sum
  55 + val etCnt = x._2.size
  56 + (x._1, etSum.toDouble/etCnt)
57 57 })
58 58 }
59 59  
60 60  
61 61  
62 62  
63 63  
64 64  
65 65  
... ... @@ -68,53 +68,36 @@
68 68 }
69 69  
70 70 def joinCardSize(mappedRanges: RDD[(Int, (Int, Int), (Long, Long))], cidAndCardSize: RDD[(Int, Int)]) = {
71   - mappedRanges.map(x => (x._2._2, x)).join(cidAndCardSize).map(x => (x._2._1._1, x._2._1._2, x._2._1._3, x._2._2))
  71 + mappedRanges.map(x => (x._2._2, x)).join(cidAndCardSize).map(x => {
  72 + //(uuid, (노출위치, cid), (노출시간,소비시간), 카드크기)
  73 + (x._2._1._1, x._2._1._2, x._2._1._3, x._2._2)
  74 + })
72 75 }
73 76  
74   -
75 77 /**
76 78 *
77 79 * @param withCardSize
78 80 * @param cardSizeMean
79 81 */
80 82 def consumeTimeByCid(withCardSize:RDD[(Int, (Int, Int), (Long, Long), Int)],
81   - etByRank: Map[Int, Long],
  83 + etByRank: Map[Int, Double],
82 84 cardSizeMean: Map[Int, Long]) = {
83   - val getRankM: RDD[((Int, (Int, Int), (Long, Long), Int), Long)] = withCardSize.map(x=>{
  85 + val getRankM: RDD[((Int, (Int, Int), (Long, Long), Int), Double)] = withCardSize.map(x=>{
84 86 try{
85   - Some((x,etByRank.getOrElse(x._2._1,10L)))
  87 + //(uuid, (노출위치, cid), (노출시간,소비시간), 카드크기)
  88 + Some((x,etByRank.getOrElse(x._2._1,10D)))
86 89 } catch {
87 90 case e:Exception => None
88 91 }
89 92 }).flatMap(x=>x)
90   - val result = getRankM.groupBy(x=>(x._1._2._2,x._1._4)).map(x=>{
  93 + val result = getRankM.groupBy(x=>{
  94 + //cid, 카드크기
  95 + (x._1._2._2,x._1._4)
  96 + }).map(x=>{
91 97 try {
92 98 val denom = x._2.map(a => a._1._3._1.toDouble / a._2).sum
93 99 val numer = x._2.map(a => a._1._3._2.toDouble).sum
94 100 val ggh = numer / (cardSizeMean.getOrElse(x._1._2, 10L) * denom)
95   - Some(x._1._1, denom, numer, cardSizeMean.getOrElse(x._1._2, 10L))
96   - } catch {
97   - case e:Exception => None}
98   - }).flatMap(x=>x)
99   - result
100   - }
101   -
102   -
103   - def consumeTimeByCid_(withCardSize:RDD[(String, (Int, String), (Long, Long), Int)],
104   - etByRank: Map[Int, Long],
105   - cardSizeMean: Map[Int, Long]) = {
106   - val getRankM: RDD[((String, (Int, String), (Long, Long), Int), Long)] = withCardSize.map(x=>{
107   - try{
108   - Some((x,etByRank.getOrElse(x._2._1,10L)))
109   - } catch {
110   - case e:Exception => None
111   - }
112   - }).flatMap(x=>x)
113   - val result = getRankM.groupBy(x=>(x._1._2._2,x._1._4)).map(x=>{
114   - try {
115   - val denom = x._2.map(a => a._1._3._1.toDouble / a._2).sum
116   - val numer = x._2.map(a => a._1._3._2.toDouble).sum
117   - val ggh = numer / (cardSizeMean.getOrElse(x._1._2, 10L) * denom)
118 101 Some(x._1._1, ggh)
119 102 } catch {
120 103 case e:Exception => None}
... ... @@ -130,9 +113,9 @@
130 113  
131 114 val ecWithRank = getECWithRank(doi)
132 115  
133   - val etByRank: Map[Int, Long] = getRankMean(ecWithRank).collectAsMap()
134   - val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatus(sqlContext, Array("ACTV","HOLD"))
135   - val ggh: RDD[(Int, Double, Double, Long)] = consumeTimeByCid(joinCardSize(ecWithRank,getCardSize(doi)),etByRank,getCSMean(doi))
  116 + val etByRank: Map[Int, Double] = getRankMean(ecWithRank).collectAsMap()
  117 + val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatuses(sqlContext, Array("ACTV","HOLD"), ("'2016-05-12 00:00:00'", "'2016-05-19 00:00:00'"))
  118 + val ggh = consumeTimeByCid(joinCardSize(ecWithRank,getCardSize(doi)),etByRank,getCSMean(doi))
136 119 val cidAndScoreFormatted = ggh.map(x=>(x._1.toLong, make_logit(x._2))).join(cidOfInterest).map(x=>(x._1, x._2._1)).map(x=>{
137 120 (x._1,math.min(1000L,(x._2*1000).toLong))
138 121 }).map(x=>(x._1.toString, x._2))
app/com/piki_ds/ver1/QualityScore.scala View file @ ddba312
... ... @@ -66,7 +66,7 @@
66 66  
67 67 def main(args: Array[String]) {
68 68 val nowTS: Long = System.currentTimeMillis
69   - /*val doi = getDateKey(nowTS)
  69 + val doi = getDateKey(nowTS)
70 70  
71 71 val content: RDD[(Int, Int)] = scoreLoad(sc,doi,"content","all")
72 72 val comment: RDD[(Int, Int)] = scoreLoad(sc,doi,"comment","").groupByKey().map(x=>(x._1,0))
... ... @@ -76,8 +76,7 @@
76 76  
77 77 val finalScore: RDD[(Int, Long)] = combineScores(content,comment,editor,param).map(x=>(x._1.toInt,x._2.toLong)).filter(_._1 != 0)
78 78 scoreSave(doi,"quality","",finalScore.map(x=>(x._1.toString,x._2)),1)
79   - */
80   - val finalScore= scoreLoad(sc,"20160516","content","all")
  79 +
81 80 val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect()
82 81 val test = new HbaseInserter("cid_quality")
83 82 test.insert(insertArray)