Commit be098b973bdae46e2fab4d4b3187ffc1b84972cb
1 parent
0c319c0d6b
Exists in
master
delete file add
Showing 12 changed files with 168 additions and 66 deletions Side-by-side Diff
- app/com/piki_ds/preprocess/MakingLapse.scala
- app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala
- app/com/piki_ds/utils/DateTimeEtc.scala
- app/com/piki_ds/utils/FileDelete.scala
- app/com/piki_ds/utils/TempScoreSaveLoad.scala
- app/com/piki_ds/ver1/CmtScore.scala
- app/com/piki_ds/ver1/ConEfficiencyScore.scala
- app/com/piki_ds/ver1/ConRejectScore.scala
- app/com/piki_ds/ver1/ContentScore.scala
- app/com/piki_ds/ver1/DashRelatedScore.scala
- app/com/piki_ds/ver1/EditorScore.scala
- app/com/piki_ds/ver1/QualityScore.scala
app/com/piki_ds/preprocess/MakingLapse.scala
View file @
be098b9
... | ... | @@ -6,6 +6,7 @@ |
6 | 6 | |
7 | 7 | import com.piki_ds.utils.GetTextFile.getLog |
8 | 8 | import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN |
9 | +import com.piki_ds.utils.FileDelete.deleteNDaysAgo | |
9 | 10 | |
10 | 11 | /** |
11 | 12 | * making lapse |
... | ... | @@ -40,7 +41,9 @@ |
40 | 41 | val csvLapse = lapse.map(x=>{ |
41 | 42 | s"${x("category")}|${x("eventType")}|${x("actionTime")}|${x("uuid")}|${x("field0")}|${x("field1")}|${x("lapse")}" |
42 | 43 | }) |
43 | - csvLapse.coalesce(150).saveAsTextFile(s"hdfs://pikinn/preprocess/lapse/$yesterday", classOf[GzipCodec]) | |
44 | + val savePath = "hdfs://pikinn/preprocess/lapse/" | |
45 | + csvLapse.coalesce(150).saveAsTextFile(s"$savePath$yesterday", classOf[GzipCodec]) | |
46 | + deleteNDaysAgo(sc,savePath,yesterday,30) | |
44 | 47 | } |
45 | 48 | |
46 | 49 |
app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala
View file @
be098b9
... | ... | @@ -95,21 +95,21 @@ |
95 | 95 | def main (args: Array[String]) { |
96 | 96 | val nowTS: Long = System.currentTimeMillis |
97 | 97 | val yesterdayTuple = intoYesterdayMN(nowTS) |
98 | - val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8) | |
98 | + val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8) | |
99 | 99 | |
100 | - val weeklyCT: RDD[(String, Long)] = getWeeklyCT(dateKey) | |
100 | + val weeklyCT: RDD[(String, Long)] = getWeeklyCT(doi) | |
101 | 101 | |
102 | 102 | val cardSize: RDD[(String, Int)] = getCardSize() |
103 | - cardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$dateKey") | |
103 | + cardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi") | |
104 | 104 | |
105 | 105 | val ctbyCardSize: RDD[(Int, Long)] = weeklyCT.join(cardSize).groupBy(x=>x._2._2).map(x=>{ |
106 | 106 | val consumeTime = x._2.map(_._2._1).sum |
107 | 107 | (x._1,consumeTime) |
108 | 108 | }) |
109 | - ctbyCardSize.filter(_._2 != 0L).saveAsObjectFile(s"hdfs://pikinn/preprocess/ctByCardSize/$dateKey") | |
109 | + ctbyCardSize.filter(_._2 != 0L).saveAsObjectFile(s"hdfs://pikinn/preprocess/ctByCardSize/$doi") | |
110 | 110 | |
111 | 111 | val cidwithUid = cidWithUid() |
112 | - cidwithUid.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidWithUid/$dateKey") | |
112 | + cidwithUid.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidWithUid/$doi") | |
113 | 113 | } |
114 | 114 | |
115 | 115 |
app/com/piki_ds/utils/DateTimeEtc.scala
View file @
be098b9
... | ... | @@ -149,5 +149,38 @@ |
149 | 149 | } |
150 | 150 | hrList.toList |
151 | 151 | } |
152 | + | |
153 | + def intoNDaysDT (nowDT:String, n:Int, goUP:Boolean, formatDate:String) = { | |
154 | + val incre = if (goUP) n else n*(-1) | |
155 | + val format_date = new java.text.SimpleDateFormat(formatDate) | |
156 | + val c = java.util.Calendar.getInstance | |
157 | + c.setTimeInMillis(format_date.parse(nowDT).getTime()) | |
158 | + c.add(java.util.Calendar.DAY_OF_WEEK, incre) | |
159 | + val beforeNd = format_date.format(c.getTime) | |
160 | + (format_date.parse(beforeNd).getTime, beforeNd) | |
161 | + } | |
162 | + | |
163 | + def makeDateTimeList(start:(String,String,String), end:(String,String,String)): List[(String, String, String)] = { | |
164 | + val c = java.util.Calendar.getInstance | |
165 | + val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd") | |
166 | + c.setTimeInMillis(format_date.parse(s"${start._1}-${start._2}-${start._3}").getTime) | |
167 | + val endTimestamp = format_date.parse(s"${end._1}-${end._2}-${end._3}").getTime | |
168 | + var hrList = scala.collection.mutable.ListBuffer.empty[String] += s"${start._1}-${start._2}-${start._3}" | |
169 | + while (c.getTimeInMillis < endTimestamp){ | |
170 | + c.add(java.util.Calendar.DAY_OF_MONTH,1) | |
171 | + hrList += format_date.format(c.getTime) | |
172 | + } | |
173 | + hrList.map(_.split("-")).map(x=>(x(0),x(1),x(2))).toList | |
174 | + } | |
175 | + | |
176 | + def makeDateTimeList_(limit:String, range:Int, goUp:Boolean, intoFormat:String) = { | |
177 | + val c = java.util.Calendar.getInstance | |
178 | + val format_date = new java.text.SimpleDateFormat(intoFormat) | |
179 | + c.setTimeInMillis(format_date.parse(limit).getTime) | |
180 | + (1 to range).map(x=>{ | |
181 | + if (goUp) c.add(java.util.Calendar.DAY_OF_MONTH,1) else c.add(java.util.Calendar.DAY_OF_MONTH,-1) | |
182 | + format_date.format(c.getTime) | |
183 | + }) | |
184 | + } | |
152 | 185 | } |
app/com/piki_ds/utils/FileDelete.scala
View file @
be098b9
1 | +package com.piki_ds.utils | |
2 | + | |
3 | +/** | |
4 | + * Created by jungwon on 24/10/2016. | |
5 | + */ | |
6 | + | |
7 | +import org.apache.spark.SparkContext | |
8 | + | |
9 | +object FileDelete { | |
10 | + | |
11 | + def deleteNDaysAgo(sc:SparkContext, pathName:String, dateKey:String, days:Int) = { | |
12 | + import DateTimeEtc.intoNDaysDT | |
13 | + val nDays = intoNDaysDT(dateKey,30,false,"yyyyMMdd")._2.toLong | |
14 | + val fs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration) | |
15 | + val prevPath = fs.listStatus(new org.apache.hadoop.fs.Path(pathName)).filter(x => { | |
16 | + val dKey = x.getPath.toString.takeRight(8).toLong | |
17 | + dKey < nDays | |
18 | + }) | |
19 | + if(prevPath.nonEmpty) { | |
20 | + prevPath.map(x=>{ | |
21 | + val deletePath = x.getPath.toString | |
22 | + fs.delete(new org.apache.hadoop.fs.Path(deletePath), true) | |
23 | + }) | |
24 | + } | |
25 | + } | |
26 | + | |
27 | + def deleteEarliestPath(sc:SparkContext, pathName:String) = { | |
28 | + val fs = org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration) | |
29 | + val prevPath = fs.listStatus(new org.apache.hadoop.fs.Path(pathName)) | |
30 | + if(prevPath.nonEmpty) { | |
31 | + val deletePath = prevPath.minBy(x => x.getModificationTime).getPath.toString | |
32 | + fs.delete(new org.apache.hadoop.fs.Path(deletePath), true) | |
33 | + } | |
34 | + } | |
35 | + | |
36 | + def main(args: Array[String]) { | |
37 | + | |
38 | + } | |
39 | +} |
app/com/piki_ds/utils/TempScoreSaveLoad.scala
View file @
be098b9
... | ... | @@ -32,6 +32,10 @@ |
32 | 32 | }) |
33 | 33 | } |
34 | 34 | |
35 | + def scorePathMaker(scoreType:String, subScore: String)={ | |
36 | + s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/" | |
37 | + } | |
38 | + | |
35 | 39 | def recentScoreLoad(sc:SparkContext, hdfs:FileSystem, scoreType:String, subScore:String) = { |
36 | 40 | val scorePath = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/", true, hdfs).getPath.toString |
37 | 41 | val scoreTxt = sc.textFile(scorePath) |
app/com/piki_ds/ver1/CmtScore.scala
View file @
be098b9
... | ... | @@ -27,16 +27,15 @@ |
27 | 27 | |
28 | 28 | def main(args:Array[String]) { |
29 | 29 | val nowTS: Long = System.currentTimeMillis |
30 | - val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS) | |
31 | - val dateKey= yesterdayTuple._2.replaceAll("[^0-9]", "").take(8) | |
30 | + val doi= getDateKey(nowTS) | |
32 | 31 | |
33 | - val cumlt= sqlContext.read.load(s"hdfs://pikinn/preprocess/cmtCountByHour/y=${dateKey.take(4)}/mo=${dateKey.slice(4,6)}/d=${dateKey.slice(6,8)}/*") | |
32 | + val cumlt= sqlContext.read.load(s"hdfs://pikinn/preprocess/cmtCountByHour/y=${doi.take(4)}/mo=${doi.slice(4,6)}/d=${doi.slice(6,8)}/*") | |
34 | 33 | val a= cumlt.map(x=>(x.getAs[Long]("contents_id"), |
35 | 34 | (x.getAs[Long]("all_comment") + x.getAs[Long]("parent_comment") + x.getAs[Long]("unique_comment")).toDouble/3)) |
36 | 35 | val cidOfInterest = getCidByStatus(sqlContext, Array("ACTV","HOLD")).map(x=>(x._1.toString,x._2)) |
37 | - val tempScoring: RDD[(String, Double)] = a.map(x=>(x._1.toString, x._2)).join(cidOfInterest).map(x=>(x._1, x._2._1)) | |
38 | - val formatOutput= make_0to1(tempScoring).map(x=>(x._1,(x._2*1000).toLong)) | |
39 | - TempScoreSaveLoad.scoreSave(dateKey,"comment","",formatOutput,5) | |
36 | + val tempScoring: RDD[(Long, Double)] = a.map(x=>(x._1.toString, x._2)).join(cidOfInterest).map(x=>(x._1.toLong, x._2._1)) | |
37 | + val formatOutput= make_0to1(tempScoring).map(x=>(x._1.toString,(x._2*1000).toLong)) | |
38 | + TempScoreSaveLoad.scoreSave(doi,"comment","",formatOutput,5) | |
40 | 39 | } |
41 | 40 | |
42 | 41 |
app/com/piki_ds/ver1/ConEfficiencyScore.scala
View file @
be098b9
... | ... | @@ -10,7 +10,7 @@ |
10 | 10 | import com.piki_ds.utils.GeneralTransformers.make_logit |
11 | 11 | import com.piki_ds.utils.TempScoreSaveLoad |
12 | 12 | import com.piki_ds.preprocess.CidValidation._ |
13 | - | |
13 | +import com.piki_ds.utils.FileDelete.deleteNDaysAgo | |
14 | 14 | /** |
15 | 15 | * temporary ggh generation |
16 | 16 | * Created by jungwon on 7/9/15. |
17 | 17 | |
18 | 18 | |
... | ... | @@ -117,11 +117,10 @@ |
117 | 117 | val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatuses(sqlContext, Array("ACTV","HOLD"), ("'2016-05-12 00:00:00'", "'2016-05-19 00:00:00'")) |
118 | 118 | val ggh = consumeTimeByCid(joinCardSize(ecWithRank,getCardSize(doi)),etByRank,getCSMean(doi)) |
119 | 119 | val cidAndScoreFormatted = ggh.map(x=>(x._1.toLong, make_logit(x._2))).join(cidOfInterest).map(x=>(x._1, x._2._1)).map(x=>{ |
120 | - (x._1,math.min(1000L,(x._2*1000).toLong)) | |
120 | + (x._1, math.min(1000L,(x._2*1000).toLong)) | |
121 | 121 | }).map(x=>(x._1.toString, x._2)) |
122 | - TempScoreSaveLoad.scoreSave(doi,"content", "efficiency",cidAndScoreFormatted,100) | |
123 | - | |
122 | + TempScoreSaveLoad.scoreSave(doi,"content", "efficiency", cidAndScoreFormatted,100) | |
123 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content", "efficiency"),doi,30) | |
124 | 124 | } |
125 | - | |
126 | 125 | } |
app/com/piki_ds/ver1/ConRejectScore.scala
View file @
be098b9
1 | 1 | package com.piki_ds.ver1 |
2 | 2 | |
3 | +import com.piki_ds.utils.FileDelete._ | |
4 | +import com.piki_ds.utils.TempScoreSaveLoad | |
3 | 5 | import org.apache.spark.rdd.RDD |
4 | 6 | import org.apache.spark.sql.SQLContext |
5 | 7 | import org.apache.spark.{SparkConf, SparkContext} |
6 | 8 | |
... | ... | @@ -38,13 +40,14 @@ |
38 | 40 | def main(args:Array[String]) { |
39 | 41 | val nowTS: Long = System.currentTimeMillis |
40 | 42 | val maxCid = getMaxId(sqlContext,"content") |
41 | - val dateKey = getDateKey(nowTS) | |
42 | - val cidAndScore = getAProgress(sqlContext, dateKey,maxCid) | |
43 | + val doi = getDateKey(nowTS) | |
44 | + val cidAndScore = getAProgress(sqlContext, doi,maxCid) | |
43 | 45 | val cidOfInterest = getCidByStatus(sqlContext, Array("ACTV","HOLD")).map(x=>(x._1.toString,x._2)) |
44 | 46 | val cidAndScoreFormatted = cidAndScore.join(cidOfInterest).map(x=>(x._1,x._2._1)).map(x=>{ |
45 | 47 | (x._1, (1000*make_logit(x._2)).toLong) |
46 | 48 | }) |
47 | - scoreSave(dateKey,"content","reject",cidAndScoreFormatted,1) | |
49 | + scoreSave(doi,"content","reject",cidAndScoreFormatted,1) | |
50 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content","reject"),doi,30) | |
48 | 51 | } |
49 | 52 | } |
app/com/piki_ds/ver1/ContentScore.scala
View file @
be098b9
1 | 1 | package com.piki_ds.ver1 |
2 | 2 | |
3 | +import com.piki_ds.utils.FileDelete._ | |
3 | 4 | import org.apache.spark.sql.SQLContext |
4 | 5 | import org.apache.spark.{SparkContext, SparkConf} |
5 | 6 | import org.apache.spark.rdd.RDD |
6 | 7 | |
... | ... | @@ -56,8 +57,9 @@ |
56 | 57 | }) |
57 | 58 | val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatus(sqlContext, Array("ACTV","HOLD")) |
58 | 59 | |
59 | - val cidAndScoreFormatted = make_0to1(contentScore.map(x=> (x._1,x._2.toDouble))).map(x=>(x._1, (x._2*1000).toLong)) | |
60 | + val cidAndScoreFormatted = make_0to1(contentScore.map(x=> (x._1.toLong,x._2.toDouble))).map(x=>(x._1.toString, (x._2*1000).toLong)) | |
60 | 61 | TempScoreSaveLoad.scoreSave(doi,"content","all_ver6",cidAndScoreFormatted,1) |
62 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content","all_ver6"),doi,30) | |
61 | 63 | } |
62 | 64 | |
63 | 65 | } |
app/com/piki_ds/ver1/DashRelatedScore.scala
View file @
be098b9
1 | 1 | package com.piki_ds.ver1 |
2 | 2 | |
3 | +import com.piki_ds.utils.FileDelete._ | |
4 | +import com.piki_ds.utils.TempScoreSaveLoad | |
3 | 5 | import org.apache.spark.rdd.RDD |
4 | 6 | import org.apache.spark.sql.SQLContext |
5 | 7 | import org.apache.spark.sql.functions.udf |
6 | 8 | |
7 | 9 | |
8 | 10 | |
9 | 11 | |
... | ... | @@ -155,16 +157,19 @@ |
155 | 157 | |
156 | 158 | def main(args: Array[String]) { |
157 | 159 | val nowTS: Long = System.currentTimeMillis |
158 | - val dateKey: String = getDateKey(nowTS) | |
160 | + val doi: String = getDateKey(nowTS) | |
159 | 161 | |
160 | - val updatedDash: RDD[Map[String, String]] = dashUpdater(dateKey,true) | |
162 | + val updatedDash: RDD[Map[String, String]] = dashUpdater(doi,true) | |
161 | 163 | val cidOfInterest: RDD[(Long, (Int, Long))] = getCidByStatus(sqlContext, Array("ACTV","HOLD")) |
162 | 164 | val clickS = scoreUpdater(cidOfInterest, updatedDash,"click") |
163 | - scoreSave(dateKey,"content","click",clickS,1) | |
165 | + scoreSave(doi,"content","click",clickS,1) | |
166 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content", "click"),doi,30) | |
164 | 167 | val dwellS = scoreUpdater(cidOfInterest, updatedDash,"dwell") |
165 | - scoreSave(dateKey,"content","dwell",dwellS,1) | |
168 | + scoreSave(doi,"content","dwell",dwellS,1) | |
169 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content", "dwell"),doi,30) | |
166 | 170 | val pAS = scoreUpdater(cidOfInterest, updatedDash,"pa") |
167 | - scoreSave(dateKey,"content","postAction",pAS,1) | |
171 | + scoreSave(doi,"content","postAction",pAS,1) | |
172 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("content", "postAction"),doi,30) | |
168 | 173 | } |
169 | 174 | |
170 | 175 | } |
app/com/piki_ds/ver1/EditorScore.scala
View file @
be098b9
1 | 1 | package com.piki_ds.ver1 |
2 | 2 | |
3 | 3 | //import breeze.linalg.min |
4 | +import com.piki_ds.utils.FileDelete._ | |
4 | 5 | import org.apache.spark.rdd.RDD |
5 | 6 | import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} |
6 | 7 | import org.apache.spark.sql.SQLContext |
... | ... | @@ -11,7 +12,7 @@ |
11 | 12 | import com.piki_ds.utils.GetTextFile.getDashTable |
12 | 13 | import com.piki_ds.utils.SqlContextConf.readPartialTable |
13 | 14 | import com.piki_ds.utils.TempScoreSaveLoad |
14 | -import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Key,make_0to1} | |
15 | +import com.piki_ds.utils.GeneralTransformers.{make_0to1_2Keys,make_0to1} | |
15 | 16 | import com.piki_ds.ver1.DashRelatedScore.previousDash |
16 | 17 | //import com.piki_ds.utils.hbase.HbaseInserter |
17 | 18 | |
... | ... | @@ -53,7 +54,7 @@ |
53 | 54 | val cuml = x._2.map(a=>a("follow_cnt").toLong).sum |
54 | 55 | val ts: Array[Long] = x._2.toArray.sortBy(_("cdate")*(-1)).map(_("follow_cnt").toLong) |
55 | 56 | val followScore = 0.1*cuml+ts.take(10).sum |
56 | - (x._1, followScore.toLong) | |
57 | + (x._1.toLong, followScore.toLong) | |
57 | 58 | }) |
58 | 59 | if (fileSave) { |
59 | 60 | unionFrom.write.format("json").save(s"hdfs://pikinn/preprocess/followInfo/$dateKey") |
60 | 61 | |
61 | 62 | |
62 | 63 | |
63 | 64 | |
64 | 65 | |
65 | 66 | |
66 | 67 | |
... | ... | @@ -61,37 +62,46 @@ |
61 | 62 | follow_info |
62 | 63 | } |
63 | 64 | |
64 | - def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(String, String, Long, Long)] = { | |
65 | + def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(Long, ((String, Long), Long))] = { | |
65 | 66 | // DB์์ USER table ํ์ฑํด์ค๊ธฐ, ์๋ํฐ ๊ฐ์ ธ์ค๊ธฐ (ํ์์ ์ธ ๋จ๊ณ์๋....-_-) |
66 | 67 | val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O") |
67 | - val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})" | |
68 | + val filterS_user = s"where level in (${levels.map(x=>"'"+x+"'").mkString(",")})" | |
68 | 69 | val fetchVar = Array("uid","level","name") |
69 | 70 | val user_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","USER", fetchVar, filterS_user) |
70 | - val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name"))) | |
71 | + // userInfo : (uid, ๊ณ์ ๋ช ) | |
72 | + val userInfo: RDD[(Long, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid"), x.getAs[String]("name"))) | |
71 | 73 | |
72 | - // DB์์ FOLLOW table ํ์ฑํด์ค๊ธฐ, ํ๋ก์ ์ ๊ฐ์ ธ์ค๊ธฐ | |
73 | - val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs,fileSave) | |
74 | - | |
75 | - val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L)))) | |
76 | - | |
77 | 74 | // DB์์ MG_CONTENTS table ํ์ฑํด์ค๊ธฐ, ์๋ํฐ debut date ๊ฐ์ ธ์ค๊ธฐ |
78 | 75 | val filterS_mgcont = " where uid is not null group by uid" |
79 | 76 | val fetchVar_mgcont = Array("uid", "min(udate)") |
80 | 77 | val mgcont_tableGet = readPartialTable(sQLContext, "REPL", "new_pikicast_common","MG_CONTENTS", fetchVar_mgcont, filterS_mgcont) |
81 | 78 | import org.apache.spark.sql.functions._ |
82 | 79 | val mgcont_table= mgcont_tableGet.select(mgcont_tableGet("uid"), unix_timestamp(mgcont_tableGet("min(udate)"))) |
83 | - val debut_info: RDD[(String, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid").toString, x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | |
80 | + //debutInfo: (uid, debut in timestamp) | |
81 | + val debutInfo: RDD[(Long, Long)] = mgcont_table.map(x=>(x.getAs[Long]("uid"), x.getAs[Long]("unixtimestamp(min(udate),yyyy-MM-dd HH:mm:ss)"))) | |
84 | 82 | |
85 | - // uid, name, follow, debut | |
86 | - val rawOut: RDD[(String, String, Long, Long)] = joinedFollowInfo.fullOuterJoin(debut_info).map(x=>{ | |
87 | - (x._1,(x._2._1.getOrElse(("",10L)) ,x._2._2.getOrElse(10L))) | |
88 | - }).map(x=>(x._1,x._2._1._1, x._2._1._2, x._2._2)) | |
89 | - rawOut.map(x=>(x._1,x._2, math.min(20000,x._3),x._4)) | |
83 | + val uidOfInterest: RDD[(Long, (String, Long))] = userInfo.fullOuterJoin(debutInfo).map(x=>{ | |
84 | + (x._1, (x._2._1.getOrElse(""), x._2._2.getOrElse(0L))) | |
85 | + }) | |
86 | + | |
87 | + // DB์์ FOLLOW table ํ์ฑํด์ค๊ธฐ, ํ๋ก์ ์ ๊ฐ์ ธ์ค๊ธฐ | |
88 | + val follow_info: RDD[(Long, Long)] = followGetter(sQLContext, dateKey,fs,fileSave) | |
89 | + // uid, ((name, debut), follow) | |
90 | + val joinedFollowInfo: RDD[(Long, ((String, Long), Long))] = uidOfInterest.leftOuterJoin(follow_info).map(x=>{ | |
91 | + (x._1, (x._2._1, math.min(20000, x._2._2.getOrElse(0L)))) | |
92 | + }) | |
93 | + joinedFollowInfo | |
90 | 94 | } |
91 | 95 | |
92 | - def popularity(dBTable: RDD[(String, String, Long, Long)], currentTS:Long) = { | |
96 | + /** | |
97 | + * | |
98 | + * @param dBTable uid, ((name, debut), follow) | |
99 | + * @param currentTS | |
100 | + * @return | |
101 | + */ | |
102 | + def popularity(dBTable: RDD[(Long, ((String, Long), Long))], currentTS:Long) = { | |
93 | 103 | //๋์ด์ ๋จ์: ์ผ |
94 | - dBTable.map(x=>((x._1, x._2), ((1000*60*60*24).toLong*x._3).toDouble/(currentTS-x._4))) | |
104 | + dBTable.map(x=>((x._1, x._2._1._1), ((1000*60*60*24).toLong*x._2._2).toDouble/(currentTS-x._2._1._2))) | |
95 | 105 | } |
96 | 106 | |
97 | 107 | def tr_Weibull(raw:Double, lambd:Int, k:Double) = { |
98 | 108 | |
99 | 109 | |
100 | 110 | |
101 | 111 | |
102 | 112 | |
103 | 113 | |
104 | 114 | |
105 | 115 | |
106 | 116 | |
107 | 117 | |
... | ... | @@ -109,38 +119,38 @@ |
109 | 119 | |
110 | 120 | val followFileSave = if (args.nonEmpty && args.head.toInt.equals(0)) false else true |
111 | 121 | val nowTS: Long = System.currentTimeMillis |
112 | - val dateKey = getDateKey(nowTS) | |
122 | + val doi = getDateKey(nowTS) | |
113 | 123 | |
114 | 124 | //์๋ํฐ ์ธ๊ธฐ ์ ์ ๊ตฌํ๊ธฐ |
115 | - val fromDB = editorDB(sqlContext, fs, dateKey,followFileSave) | |
125 | + val fromDB: RDD[(Long, ((String, Long), Long))] = editorDB(sqlContext, fs, doi,followFileSave) | |
116 | 126 | |
117 | - val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | |
127 | + val ePopularity: RDD[((Long, String), Double)] = make_0to1_2Keys(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1))) | |
118 | 128 | |
119 | 129 | //์๋ํฐ ์ฑ๊ณผ ์ ์ ๊ตฌํ๊ธฐ |
120 | - val dashInfo: RDD[(String, Map[String, String])] = previousDash(dateKey).map(x=>(x("cid"),x)) | |
130 | + val dashInfo: RDD[(Long, Map[String, String])] = previousDash(doi).map(x=>(x("cid").toLong,x)) | |
121 | 131 | |
122 | - val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$dateKey/") | |
132 | + val cidAndUid: RDD[(String,String)] = sc.objectFile[(String,String)](s"/preprocess/cidWithUid/$doi/") | |
123 | 133 | |
124 | - val uidAndCids: RDD[(String, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1,x._2.size)) | |
134 | + val uidAndContents: RDD[(Long, Int)] = cidAndUid.groupBy(_._2).map(x=>(x._1.toLong,x._2.size)) | |
125 | 135 | |
126 | - val cPDB: RDD[(String, Map[String, String])] = cidAndUid.join(dashInfo).map(_._2) | |
136 | + val cPDB: RDD[(Long, Map[String, String])] = cidAndUid.map(x=>(x._1.toLong, x._2.toLong)).join(dashInfo).map(_._2) | |
127 | 137 | |
128 | - val performanceRaw: RDD[(String, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { | |
138 | + val performanceRaw: RDD[(Long, ((Long, Long, Long, Long, Long), Int))] = cPDB.groupBy(_._1).map(x=> { | |
129 | 139 | val innerSum: (Long, Long, Long, Long, Long) = x._2.map(a => { |
130 | - (a._2("view").replaceAll("^$", "0").toDouble.toLong, a._2("likes").replaceAll("^$", "0").toDouble.toLong, | |
131 | - a._2("share").replaceAll("^$", "0").toDouble.toLong, a._2("bookmark").replaceAll("^$", "0").toDouble.toLong, | |
132 | - a._2("comment").replaceAll("^$", "0").toDouble.toLong) | |
140 | + (a._2("view").replaceAll("^$", "0").toLong, a._2("likes").replaceAll("^$", "0").toLong, | |
141 | + a._2("share").replaceAll("^$", "0").toLong, a._2("bookmark").replaceAll("^$", "0").toLong, | |
142 | + a._2("comment").replaceAll("^$", "0").toLong) | |
133 | 143 | }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) |
134 | 144 | (x._1, innerSum) |
135 | - }).join(uidAndCids) | |
136 | - val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { | |
145 | + }).join(uidAndContents) | |
146 | + val avgPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> { | |
137 | 147 | val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2, |
138 | 148 | x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2, |
139 | 149 | x._2._1._5.toDouble / x._2._2) |
140 | 150 | (x._1, ap) |
141 | 151 | }) |
142 | 152 | |
143 | - val minMaxPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ | |
153 | + val minMaxPerformance: RDD[(Long, (Double, Double, Double, Double, Double))] = avgPerformance.map(x=>{ | |
144 | 154 | import math.{min,max} |
145 | 155 | val s1 = min(max(500, x._2._1),25000) |
146 | 156 | val s2 = min(max(100, x._2._1),2000) |
... | ... | @@ -150,7 +160,7 @@ |
150 | 160 | (x._1, (s1,s2,s3,s4,s5)) |
151 | 161 | }) |
152 | 162 | |
153 | - val sumPerformance: RDD[(String, Double)] = minMaxPerformance.map(x=>{ | |
163 | + val sumPerformance: RDD[(Long, Double)] = minMaxPerformance.map(x=>{ | |
154 | 164 | val score = x._2._1+x._2._2+x._2._3+x._2._4+x._2._5 |
155 | 165 | (x._1, score/5) |
156 | 166 | }) |
157 | 167 | |
158 | 168 | |
159 | 169 | |
... | ... | @@ -166,14 +176,17 @@ |
166 | 176 | (x._1, score/5) |
167 | 177 | }) |
168 | 178 | */ |
179 | + | |
180 | + | |
169 | 181 | val finalScore = ePopularity.map(x=>{ |
170 | 182 | (x._1._1,x._2) |
171 | - }).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D))).filter(x=>x._1.nonEmpty) | |
183 | + }).leftOuterJoin(performance).map(x=>(x._1,0.15*x._2._1+0.85*x._2._2.getOrElse(0.2D))) | |
172 | 184 | |
173 | - val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1,(x._2*1000).toLong)) | |
185 | + val formatOutput: RDD[(String, Long)] = finalScore.map(x=>(x._1.toString,(x._2*1000).toLong)) | |
174 | 186 | |
175 | - TempScoreSaveLoad.scoreSave(dateKey,"editor","",formatOutput,1) | |
176 | - scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$dateKey") | |
187 | + TempScoreSaveLoad.scoreSave(doi,"editor","",formatOutput,1) | |
188 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("editor",""),doi,30) | |
189 | + scala.io.Source.fromURL(s"http://hbase.api.piki.work:9081/insert?table=editor_score&path=/preprocess/timelineScore/editor/$doi") | |
177 | 190 | /* |
178 | 191 | val insertArray: Array[(String, String)] = finalScore.map(x=>(x._1.toString, (x._2*1000).toLong.toString)).collect |
179 | 192 | val test = new HbaseInserter("editor_score") |
app/com/piki_ds/ver1/QualityScore.scala
View file @
be098b9
1 | 1 | package com.piki_ds.ver1 |
2 | 2 | |
3 | +import com.piki_ds.utils.FileDelete._ | |
4 | +import com.piki_ds.utils.TempScoreSaveLoad | |
3 | 5 | import org.apache.spark.{SparkContext, SparkConf} |
4 | 6 | import org.apache.spark.rdd.RDD |
5 | 7 | |
... | ... | @@ -49,7 +51,7 @@ |
49 | 51 | } |
50 | 52 | |
51 | 53 | def combineScores(content:RDD[(Int,Int)], comment:RDD[(Int,Int)], editor:RDD[(Int,Int)], |
52 | - param:Map[String,Double]): RDD[(String, Long)] = { | |
54 | + param:Map[String,Double]): RDD[(Long, Long)] = { | |
53 | 55 | val combine = content.leftOuterJoin(comment).map(x=>{ |
54 | 56 | (x._1, (x._2._1,x._2._2.getOrElse(1))) |
55 | 57 | }).leftOuterJoin(editor).map(x=>{ |
... | ... | @@ -60,7 +62,7 @@ |
60 | 62 | }).map(x=>{ |
61 | 63 | val score:Double = param("content")*x._2._1+param("comment")*x._2._2+param("editor")*x._2._3 |
62 | 64 | (x._1, score) |
63 | - }).map(x=>(x._1.toString,x._2)) | |
65 | + }).map(x=>(x._1.toLong,x._2)) | |
64 | 66 | make_0to1(combine).map(x=>(x._1,(x._2*1000).toLong)) |
65 | 67 | } |
66 | 68 | |
... | ... | @@ -76,7 +78,7 @@ |
76 | 78 | |
77 | 79 | val finalScore: RDD[(Int, Long)] = combineScores(content,comment,editor,param).map(x=>(x._1.toInt,x._2.toLong)).filter(_._1 != 0) |
78 | 80 | scoreSave(doi,"quality","",finalScore.map(x=>(x._1.toString,x._2)),1) |
79 | - | |
81 | + deleteNDaysAgo(sc,TempScoreSaveLoad.scorePathMaker("quality",""),doi,30) | |
80 | 82 | val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect() |
81 | 83 | val test = new HbaseInserter("cid_quality") |
82 | 84 | test.insert(insertArray) |