Commit 2917ca52b2a8206b70766e080f2a1437b2453994

Authored by Joanne ago
1 parent b9b5ce9223
Exists in master

editor related

Showing 2 changed files with 6 additions and 7 deletions Side-by-side Diff

app/com/piki_ds/ver1/EditorScore.scala View file @ 2917ca5
... ... @@ -42,7 +42,7 @@
42 42 list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
43 43 }
44 44  
45   - def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean = true) = {
  45 + def followGetter(sQLContext: SQLContext, dateKey:String, fs:FileSystem, fileSave:Boolean) = {
46 46 val fromExisting = sQLContext.read.format("json").load(recentlyUpdatedPath("/preprocess/followInfo",false,fs).getPath.toString)
47 47 val fromUpdated = getDashTable(sQLContext, "EDITOR_FOLLOW", dateKey)
48 48 val unionFrom = fromExisting.unionAll(fromUpdated)
... ... @@ -61,7 +61,7 @@
61 61 follow_info
62 62 }
63 63  
64   - def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String): RDD[(String, String, Long, Long)] = {
  64 + def editorDB(sQLContext: SQLContext, fs:FileSystem, dateKey:String, fileSave:Boolean): RDD[(String, String, Long, Long)] = {
65 65 // DB에서 USER table 파싱해오기, 에디터 가져오기 (필수적인 단계아님....-_-)
66 66 val levels = Array("ADMIN_O","EDITOR_O","PARTNER_O", "PRESS_O","STAFF_O")
67 67 val filterS_user = s"where level in (${levels.map(x=>"'" + x+ "'").mkString(",")})"
... ... @@ -70,7 +70,7 @@
70 70 val user_info: RDD[(String, String)] = user_tableGet.map(x=>(x.getAs[Long]("uid").toString, x.getAs[String]("name")))
71 71  
72 72 // DB에서 FOLLOW table 파싱해오기, 팔로워 수 가져오기
73   - val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs)
  73 + val follow_info: RDD[(String, Long)] = followGetter(sQLContext, dateKey,fs,fileSave)
74 74  
75 75 val joinedFollowInfo: RDD[(String, (String, Long))] = user_info.fullOuterJoin(follow_info).map(x=>(x._1,(x._2._1.getOrElse(""),x._2._2.getOrElse(10L))))
76 76  
77 77  
... ... @@ -107,11 +107,12 @@
107 107 val hadoopConf = sc.hadoopConfiguration
108 108 val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
109 109  
  110 + val followFileSave = if (args.nonEmpty && args.head.toInt.equals(0)) false else true
110 111 val nowTS: Long = System.currentTimeMillis
111 112 val dateKey = getDateKey(nowTS)
112 113  
113 114 //에디터 인기 점수 구하기
114   - val fromDB = editorDB(sqlContext, fs, dateKey)
  115 + val fromDB = editorDB(sqlContext, fs, dateKey,followFileSave)
115 116  
116 117 val ePopularity: RDD[((String, String), Double)] = make_0to1_2Key(popularity(fromDB, nowTS).map(x=>(x._1,1-MatrixFunctions.tanh(math.max(1,x._2))*(-1)-1)))
117 118  
... ... @@ -132,8 +133,6 @@
132 133 }).reduce((a,b)=>(a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5))
133 134 (x._1, innerSum)
134 135 }).join(uidAndCids)
135   -
136   -
137 136 val avgPerformance: RDD[(String, (Double, Double, Double, Double, Double))] = performanceRaw.map(x=> {
138 137 val ap: (Double, Double, Double, Double, Double) = (x._2._1._1.toDouble / x._2._2, x._2._1._2.toDouble / x._2._2,
139 138 x._2._1._3.toDouble / x._2._2, x._2._1._4.toDouble / x._2._2,
editor_score.sh View file @ 2917ca5
... ... @@ -14,7 +14,7 @@
14 14 #HADOOP_CONF_DIR=/etc/hadoop/conf
15 15  
16 16 /data/spark/bin/spark-submit \
17   ---class com.piki_ds.ver1.$1 \
  17 +--class com.piki_ds.ver1.EditorSore \
18 18 --master yarn-client \
19 19 --conf "spark.default.parallelism=250" \
20 20 --conf "spark.driver.memory=2g" \