HourlyCommentCnt.scala 6.47 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
package com.piki_ds.preprocess

import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

import com.piki_ds.utils.SqlContextConf._

/**
* 코멘트 점수 계산을 위한 preprocessing
* output:
* RDD[((cid, yyyy-MM-dd HH),(countAComment, countPComment, countUComment))
*
* 위치:
* /preprocess/cmtCountByHour/y=$yyyy/mo=$MM/d=$dd/h=$HH
*
* Created by jungwon on 8/19/15.
*/

object HourlyCommentCnt {

def getSparkConf= {
//System.setProperty("SPARK_YARN_MODE", "true")
val conf = new SparkConf().setAppName("HourlyCommentCnt")
conf.setMaster("yarn-client")
conf.set("master", "yarn-client")
conf.set("spark.app.name", "HourlyCommentCnt")
conf.set("spark.akka.frameSize", "1024")
}

val sc = new SparkContext(getSparkConf)
val sqlContext: SQLContext = SQLContext.getOrCreate(sc)
val hadoopConf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
val list = hdfs.listStatus(new Path(path))
list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
}


/**
* 한시간 단위로 MG_COMMENT에서 댓글 정보 가져오기
*
* @param sttDate : yyyy-MM-dd HH
* @param endDate : yyyy-MM-dd HH
* @return : RDD[
* Map[컬럼,값]
* ]
*/

def getCommentDB(sttDate:String,endDate:String) = {
val filterStr = s"where (cdate between '$sttDate:00' and '$endDate:00') and status ='ACTV'"
val fetchCol = Array("contents_id", "comment_id", "parent_comment_id", "uid", "cdate")
val maxCmtId = getMaxId(sqlContext,"comment")
val totalTable = readPartialTableBig(sqlContext,"REPL","new_pikicast_common","MG_COMMENT",
fetchCol,filterStr, "comment_id", maxCmtId,1000)
val selectedRDD: RDD[Map[String, Long]] = totalTable.select(totalTable("contents_id"), totalTable("parent_comment_id"),
totalTable("uid"), unix_timestamp(totalTable("cdate"))).map(x=>{
Map(
"contents_id"-> x.getAs[Long]("contents_id"),
"parent_comment_id" -> x.getAs[Long]("parent_comment_id"),
"uid" -> x.getAs[Long]("uid"),
"cdate" -> x.getAs[Long]("unixtimestamp(cdate,yyyy-MM-dd HH:mm:ss)")
)
})
selectedRDD.map(x=>{
val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val pid: String = try {
x("parent_comment_id").toString
} catch {
case e:Exception => "0"
}
Map(
"contents_id"-> x("contents_id").toString,
"parent_comment_id" -> pid,
"uid" -> x("uid").toString,
"cdate" -> format_date.format(x("cdate"))
)
})
}

def getCommentParquet(sttDate:String, endDate:String) = {
val filePath = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/db/table=MG_COMMENT",true,fs).getPath.toString
val totalTable = sqlContext.read.format("parquet").load(filePath)

val selectedTable = totalTable.where(s"(cdate between '$sttDate:00' and '$endDate:00') and status ='ACTV'").select(totalTable("contents_id"), totalTable("parent_comment_id"),
totalTable("uid"), unix_timestamp(totalTable("cdate"))).na.fill(0)

val selectedRDD = selectedTable.map(x=>{
Map(
"contents_id"-> x.getAs[Long]("contents_id"),
"parent_comment_id" -> x.getAs[Long]("parent_comment_id"),
"uid" -> x.getAs[Long]("uid"),
"cdate" -> x.getAs[Long]("unixtimestamp(cdate,yyyy-MM-dd HH:mm:ss)")
)
})

selectedRDD.map(x=>{
val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val pid = x("parent_comment_id").toString
Map(
"contents_id"-> x("contents_id").toString,
"parent_comment_id" -> pid,
"uid" -> x("uid").toString,
"cdate" -> format_date.format(x("cdate"))
)
})
}

/**
* 전체 작성 코멘트 수
*
* @param parsedDB : getCommentDB로 가져온 정보
* @return : RDD[
* ((cid, yyyy-MM-dd HH),
* (전체코멘트수, parent comment 수, comment 작성 유저 수)
* )]
*/
def dailyComment_DB(parsedDB:RDD[Map[String, String]]) = {
parsedDB.groupBy(x => {
val datehr = x("cdate").take(13)
(datehr, x("contents_id"))
}).map(x =>{
val countAComment = x._2.size.toLong
val countPComment = x._2.count(a => a("parent_comment_id")== "0").toLong
val countUComment = x._2.map(a=>a("uid")).toSet.size.toLong
(x._1, (countAComment, countPComment, countUComment))
})
}

/**
* 어제 날짜 전체 분량 계산, update
*
* @param nowTS: currentTimeMillis
*/
def updateYesterdays(nowTS:Long) {
val format_d = new java.text.SimpleDateFormat("yyyy-MM-dd")
val format_date = new java.text.SimpleDateFormat("yyyy-MM-dd HH")
val c = java.util.Calendar.getInstance
val midnight = format_d.format(nowTS)+" 00"
c.setTime(format_d.parse(midnight))

val timestamps = (1 to 24).map(x => {
val cur = format_date.format(c.getTime)
c.add(java.util.Calendar.HOUR_OF_DAY, -1)
(cur, format_date.format(c.getTime))
})

timestamps.map(s=>{
val stt = s._2
val edt = s._1
val byHour: RDD[((String, String), (Long, Long, Long))] = dailyComment_DB(getCommentParquet(stt,edt))
val stdate = stt.split(" ")(0).split("-")
val sttime = stt.split(" ")(1)
val filepath = s"hdfs://pikinn/preprocess/cmtCountByHour/y=${stdate(0)}/mo=${stdate(1)}/d=${stdate(2)}/h=$sttime/"
import org.apache.spark.sql.types.{StructType,StructField, LongType}
val schema = StructType(Array("contents_id", "all_comment", "parent_comment", "unique_comment").map(fieldName =>{
StructField(fieldName, LongType, true)
}))
val byHourFmt= byHour.map{
case ((datehr,cid),(a,p,u)) =>{
Row(cid.toLong, a, p, u)
}
}.coalesce(1)
val byHourDF = sqlContext.createDataFrame(byHourFmt,schema)
byHourDF.write.parquet(filepath)
})
}

def main (args: Array[String]) {
val nowTS: Long = System.currentTimeMillis
/*
makeDateList("20151202",13,true,"yyyyMMdd").map(d=>{
val format_d = new java.text.SimpleDateFormat("yyyyMMdd")
val ts = format_d.parse(d).getTime
updateYesterdays(ts)
})
*/
updateYesterdays(nowTS)
}

}