Commit 0e839bb26aabda9d445e2c7c43c6c11a08991265

Authored by Joanne ago
1 parent cf5d677136
Exists in master

preprocess

Showing 2 changed files with 5 additions and 7 deletions Side-by-side Diff

app/com/piki_ds/preprocess/MapWithRank.scala View file @ 0e839bb
... ... @@ -11,7 +11,9 @@
11 11 * ggh를 위한 preprocessing
12 12 * Created by jungwon on 8/25/15.
13 13 */
  14 +
14 15 object MapWithRank {
  16 +
15 17 def getSparkConf = {
16 18 val conf = new SparkConf().setAppName("MapWithRank")
17 19 conf.setMaster("yarn-client")
... ... @@ -62,10 +64,6 @@
62 64 }
63 65  
64 66  
65   - def splitByVersion(joinedLog: RDD[Array[String]]) = {
66   -
67   - }
68   -
69 67 /**
70 68 * processed log를 다듬는 과정: formatting, min-max처리, 큰덩어리 filtering
71 69 *
... ... @@ -189,7 +187,7 @@
189 187  
190 188 val log = getLog(sc, doi)
191 189 val joinedLog = joinLogWithLapse(log, getLapse(sc, doi, "CONSUME"))
192   - val mapped = mapActions(joinedLog, countUser(log, 10000))
  190 + val mapped: RDD[(Map[String, String], List[(BigInt, Int)], (String, (Long, Long), Option[Int]))] = mapActions(joinedLog, countUser(log, 10000))
193 191 val ecWithRank: RDD[(String, (Int, String), (Long, Long))] = mapRanges(mapped).filter(x=>x._3._1 != 0L && x._3._2 != 0L)
194 192  
195 193 val csvMapRank = ecWithRank.map(x=>s"${x._1},${x._2._1},${x._2._2},${x._3._1},${x._3._2}")
app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala View file @ 0e839bb
... ... @@ -97,9 +97,9 @@
97 97 val yesterdayTuple = intoYesterdayMN(nowTS)
98 98 val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)
99 99  
100   - val weeklyCT = getWeeklyCT(dateKey)
  100 + val weeklyCT: RDD[(String, Long)] = getWeeklyCT(dateKey)
101 101  
102   - val cardSize = getCardSize()
  102 + val cardSize: RDD[(String, Int)] = getCardSize()
103 103 cardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$dateKey")
104 104  
105 105 val ctbyCardSize: RDD[(Int, Long)] = weeklyCT.join(cardSize).groupBy(x=>x._2._2).map(x=>{