Commit ad90805d895cd19d2dc98fa2c18124e3955fc887

Authored by Joanne ago
Exists in master

Merge branch 'master' of ssh://gitlab.pikicast.com:2222/data-science-team/dsquality

Showing 3 changed files Side-by-side Diff

app/com/piki_ds/ver2ggh/expConTime.scala View file @ ad90805
... ... @@ -2,6 +2,9 @@
2 2 * Created by Evan on 2016. 5. 10..
3 3 */
4 4 package com.piki_ds.ver2ggh
  5 +
  6 +import java.util.Date
  7 +
5 8 import org.apache.spark.SparkContext
6 9 import org.apache.spark.rdd.RDD
7 10 import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
8 11  
... ... @@ -174,9 +177,11 @@
174 177 }
175 178  
176 179 def main(args: Array[String]): Unit = {
177   -
  180 + val format = new java.text.SimpleDateFormat("yyyyMMdd")
  181 + val currentTime = new Date()
  182 + val day_delm = 24 * 1000 * 60 * 60L
178 183 // 로그 파스
179   - val saveDay = if(args.length == 1) args(0) else "20160518"
  184 + val saveDay = if(args.length == 1) args(0) else format.format(currentTime.getTime - day_delm)
180 185 val df1 = getLogParse(saveDay)
181 186  
182 187 // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
app/com/piki_ds/ver2ggh/gghScore.scala View file @ ad90805
... ... @@ -44,7 +44,11 @@
44 44  
45 45 dateSet.map{x =>
46 46 val addrs = s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=${x}"
47   - val out = if(!fs.exists((new Path(addrs)))) {x} else "null"
  47 + val out = if(!fs.exists(new Path(addrs))) {
  48 + x
  49 + } else if (fs.listStatus((new Path(addrs))).size < 100) {
  50 + x
  51 + } else "null"
48 52 out
49 53 }.filter(x => x != "null").map(x => ver2ggh.expConTime.main(Array(s"${x}")))
50 54  
51 55  
52 56  
... ... @@ -155,15 +159,16 @@
155 159 val gghStd = gghVer3.describe().where("summary = 'stddev'").drop("summary").take(1)(0)(4).toString.toDouble
156 160  
157 161 val gghScaled = gghVer3.withColumn("gghScaled", (column("ggh") - gghMean) / gghStd).selectExpr("*", "1000 / (1 + exp(-gghScaled)) as scaledGgh").drop("gghScaled")
158   -/*
  162 +
159 163 gghScaled.map{x =>
160 164 s"${x(0)},${x.getAs[Double]("scaledGgh").toInt}"
161 165 }.saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/content/ggh/$saveDay")
162   -*/
  166 +
163 167 val finalScore = gghScaled.map(x=>(x(0),x.getAs[Double]("scaledGgh").toInt))
164 168 val insertArray = finalScore.map(x=>(x._1.toString, x._2.toString)).collect()
165 169 val test = new HbaseInserter("cid_ggh2")
166 170 test.insert(insertArray)
  171 +
167 172 /*//////// CTR and CTR Time 계산
168 173 val ctr = expConsume.groupBy("cid").
169 174 agg(expr("sum(expTime1) as expTime1"), expr("sum(expSize1) as expSize1"), expr("sum(expTime2) as expTime2"),
... ... @@ -23,6 +23,13 @@
23 23 /data/spark/bin/spark-submit \
24 24 --class $1 \
25 25 --master yarn-client \
  26 +--executor-memory 3000m \
  27 +--conf "spark.dynamicAllocation.minExecutors=90" \
  28 +--conf "spark.dynamicAllocation.maxExecutors=170" \
  29 +--conf "spark.akka.frameSize=2047" \
  30 +--conf "spark.network.timeout=120000" \
  31 +--conf "spark.shuffle.memoryFraction=0.5" \
  32 +--conf "spark.shuffle.manager=hash" \
26 33 --conf "spark.default.parallelism=250" \
27 34 $BASEDIR/target/scala-2.11/dsquality-assembly-0.1.0-SNAPSHOT.jar $2 >> $LOG 2>&1
28 35 #target/scala-2.11/dsmakingscore-assembly-0.1.0-SNAPSHOT.jar >> $LOG 2>&1