Commit 332b71d3c5adb1a46f251bd4f5e7f1a51e52b536
1 parent
0e8d5024de
Exists in
master
error fix in weeklyct
Showing 1 changed file with 3 additions and 17 deletions Side-by-side Diff
app/com/piki_ds/preprocess/WeeklyECTbyGroup.scala
View file @
332b71d
... | ... | @@ -9,7 +9,6 @@ |
9 | 9 | import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} |
10 | 10 | |
11 | 11 | import com.piki_ds.utils.DateTimeEtc.{intoYesterdayMN,makeDateList} |
12 | -import com.piki_ds.utils.SqlContextConf._ | |
13 | 12 | import com.piki_ds.utils.GetTextFile.getDashTable |
14 | 13 | |
15 | 14 | /** |
16 | 15 | |
... | ... | @@ -48,27 +47,15 @@ |
48 | 47 | val infoWeek: IndexedSeq[RDD[Map[String, String]]] = oneWeek.map(d=>{ |
49 | 48 | try { |
50 | 49 | val updatesDF = getDashTable(sqlContext,"CONTENTS_REPORT",d) |
51 | - val fetchVar = Array("cid", | |
52 | - "expCnt", | |
53 | - "expTime", | |
54 | - "consumeTime", | |
55 | - "ctr", | |
56 | - "view", | |
57 | - "uview", | |
58 | - "bookmark", | |
59 | - "share", | |
60 | - "comment", | |
61 | - "like") | |
62 | - val selectedDF= updatesDF.select(fetchVar(0),fetchVar(1),fetchVar(2),fetchVar(3),fetchVar(4),fetchVar(5), | |
63 | - fetchVar(6),fetchVar(7),fetchVar(8),fetchVar(9),fetchVar(10)) | |
64 | - val updatesRaw: RDD[Map[String, String]] = dFToRDDMap(selectedDF,fetchVar) | |
50 | + val selectedDF= updatesDF.select("cid","view","consumeTime").na.drop() | |
51 | + val updatesRaw: RDD[Map[String, String]] = selectedDF.map(x=>Map("cid"->x.getAs[String](0),"consumeTime"->x.getAs[Double](2).toString)) | |
65 | 52 | updatesRaw |
66 | 53 | } catch { |
67 | 54 | case e:Exception => sc.emptyRDD[Map[String, String]] |
68 | 55 | } |
69 | 56 | }) |
70 | 57 | val weekInfo = sc.union(infoWeek) |
71 | - weekInfo.map(x=>(x("cid"),x("consumeTime").toLong)).reduceByKey((a,b) => a+b).map(x=>(x._1, math.min(x._2,4000000L))) | |
58 | + weekInfo.map(x=>(x("cid"),x("consumeTime").toDouble.toLong)).reduceByKey((a,b) => a+b).map(x=>(x._1, math.min(x._2,4000000L))) | |
72 | 59 | } |
73 | 60 | |
74 | 61 | /** |
... | ... | @@ -110,7 +97,6 @@ |
110 | 97 | val yesterdayTuple = intoYesterdayMN(nowTS) |
111 | 98 | val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8) |
112 | 99 | |
113 | - val maxCid: Long = getMaxId(sqlContext,"content") | |
114 | 100 | val weeklyCT = getWeeklyCT(dateKey) |
115 | 101 | |
116 | 102 | val cardSize = getCardSize() |