WeeklyECTbyGroup.scala 4.15 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
package com.piki_ds.preprocess


import scala.collection.immutable.IndexedSeq

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}

import com.piki_ds.utils.DateTimeEtc.{intoYesterdayMN,makeDateList}
import com.piki_ds.utils.GetTextFile.getDashTable

/**
* Created by jungwon on 5/2/16.
*/

object WeeklyECTbyGroup {

def getSparkConf= {
//System.setProperty("SPARK_YARN_MODE", "true")
val conf = new SparkConf().setAppName("WeeklyECTbyGroup")
conf.setMaster("yarn-client")
conf.set("master", "yarn-client")
conf.set("spark.app.name", "WeeklyECTbyGroup")
conf.set("spark.akka.frameSize", "1024")
}

val sc = new SparkContext(getSparkConf)
val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
}
/**
* 대쉬보드 기준으로 체류시간 계산
*
* @param whichDate: 계산하는 날 (어제)
* @return : RDD[
* (cid, consumeTime)]
*/
def getWeeklyCT (whichDate:String) = {
val oneWeek = makeDateList(whichDate,7,false,"yyyyMMdd")
val infoWeek: IndexedSeq[RDD[Map[String, String]]] = oneWeek.map(d=>{
try {
val updatesDF = getDashTable(sqlContext,"CONTENTS_REPORT",d)
val selectedDF= updatesDF.select("cid","view","consumeTime").na.drop()
val updatesRaw: RDD[Map[String, String]] = selectedDF.map(x=>Map("cid"->x.getAs[String](0),"consumeTime"->x.getAs[Double](2).toString))
updatesRaw
} catch {
case e:Exception => sc.emptyRDD[Map[String, String]]
}
})
val weekInfo = sc.union(infoWeek)
weekInfo.map(x=>(x("cid"),x("consumeTime").toDouble.toLong)).reduceByKey((a,b) => a+b).map(x=>(x._1, math.min(x._2,4000000L)))
}

/**
* cid의 카드 크기를 계산
*
* @return : RDD[
* (cid, cardSize)]
*/
def getCardSize () = {

val fp = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_CARD",true,fs).getPath.toString
val totalTable = sqlContext.read.format("parquet").load(fp).where("status='ACTV'")
import org.apache.spark.sql.functions._
val selectedTable = totalTable.groupBy("contents_id").agg(count("ordering")).na.drop()
val selectedRDD = selectedTable.map(x=>(x.getAs[Long](0).toString,x.getAs[Long](1).toInt))
selectedRDD
}

/**
* cid와 uid 매핑
*
* @return : RDD[
* (uid, cid)
* ]
*/
def cidWithUid () = {
/*
val filterString = "where uid is not null"
val fetchCol = Array("contents_id", "uid")
val totalTable = readPartialTableBig(sqlContext, "REPL","new_pikicast_common","MG_CONTENTS",fetchCol,filterString,"contents_id",maxCid,1000)
*/
val fp = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_CONTENTS",true,fs).getPath.toString
val totalTable = sqlContext.read.format("parquet").load(fp).where("uid is not null").select("contents_id", "uid")
totalTable.map(x=>(x.getAs[Long](0).toString, x.getAs[Long](1).toString))
}

def main (args: Array[String]) {
val nowTS: Long = System.currentTimeMillis
val yesterdayTuple = intoYesterdayMN(nowTS)
val dateKey = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)

val weeklyCT: RDD[(String, Long)] = getWeeklyCT(dateKey)

val cardSize: RDD[(String, Int)] = getCardSize()
cardSize.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$dateKey")

val ctbyCardSize: RDD[(Int, Long)] = weeklyCT.join(cardSize).groupBy(x=>x._2._2).map(x=>{
val consumeTime = x._2.map(_._2._1).sum
(x._1,consumeTime)
})
ctbyCardSize.filter(_._2 != 0L).saveAsObjectFile(s"hdfs://pikinn/preprocess/ctByCardSize/$dateKey")

val cidwithUid = cidWithUid()
cidwithUid.saveAsObjectFile(s"hdfs://pikinn/preprocess/cidWithUid/$dateKey")
}



}