MapWithRank.scala 7.78 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
package com.piki_ds.preprocess

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.json4s.jackson.JsonMethods.parse

import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN
import com.piki_ds.utils.GetTextFile.{getLapse,getLog}

/**
* ggh를 위한 preprocessing
* Created by jungwon on 8/25/15.
*/

object MapWithRank {

def getSparkConf = {
val conf = new SparkConf().setAppName("MapWithRank")
conf.setMaster("yarn-client")
conf.set("master", "yarn-client")
conf.set("spark.app.name", "MapWithRank")
}

val sc = SparkContext.getOrCreate()

/**
* filter users with log line size larger than limit
*
* @param log : 로그를 넣어서
* @param limit : groupBy uuid해서 limit보다 큰 로그규모를 가지고 있으면,
* @return : 그 uuid, count를 Map(String,Int)으로 출력
*/
def countUser(log: RDD[Array[String]], limit: Int) = {
log.filter(x => x.length > 3).map(x => (x(3), "-")).countByKey().filter(x => x._2 > limit)
}


/**
* 구해놓은 lapse와 log를 join
*
* @param log : 동일한 doi
* @param lapse : 동일한 doi
* @return : CONSUME의 경우 replace된 log
*/

def joinLogWithLapse(log:RDD[Array[String]],lapse: RDD[((String, String, String, String), String)]) ={
val tempJoined = log.filter(_.size>5).map(x=>((x(1),x(2),x(3),x(5)),x)).leftOuterJoin(lapse)
tempJoined.map(x=>{
val l: Array[String] = x._2._1
if (x._2._2.isEmpty) {
Some(l)
}
else {
if (l.length==6){
Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), x._2._2.get))
} else if (l.length<9 && l.length>6 && l(6).forall(_.isDigit)){
Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), scala.math.min(x._2._2.get.toLong,l(6).toLong).toString))
} else if (l.length==9 && l(7).forall(_.isDigit)){
Some(Array(l(0), l(1), l(2), l(3), l(4), l(5), scala.math.min(x._2._2.get.toLong,l(7).toLong).toString))
}
else None
}})
.flatMap(x=>x)
}


/**
* processed log를 다듬는 과정: formatting, min-max처리, 큰덩어리 filtering
*
* @param log: processed log
* @param bigUser: countUser로 큰 덩어리 유저는 제외
* @return : (
* mapping: Map[String,String],
* Some(loadInfo: List[(timeStamp:String, rank:Int)]),
* Some((expTime:Long, conTime:Long))
* )
*/

def mapActions(log:RDD[Array[String]], bigUser:scala.collection.Map[String,Long]) = {
log.map(s=>{
if (s.length > 5 && !bigUser.contains(s(3))) {
try {

val format_date = new java.text.SimpleDateFormat("yyyyMMdd HHmm")
val genTime = if(s(2).length == 10) s(2)+"000" else s(2)
val mapping = Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1),
"time" -> genTime, "uuid" -> s(3),
"dt" -> format_date.format(new java.util.Date(genTime.toLong)).take(11))
if (s(0).equals("COMMON") && s(1).equals("LOAD") && s(4).equals("m")){
val jsonString = s(5).replace("\"content\"", "\"contents\"")
val parsed = parse(jsonString).values.asInstanceOf[Map[String, AnyRef]]("contents")
val inner: List[(BigInt, Int)] = parsed.asInstanceOf[List[Map[String, List[BigInt]]]].flatMap(x=>x.filter(x=> !x._1.startsWith("r")).values).flatten.zipWithIndex
Some((mapping, inner.map(a=>(a._1, scala.math.min(a._2, 50))), null)) //50 이상의 노출랭킹은 의미없다고 간주
}
else if (s(4).forall(_.isDigit)) {
import scala.math.{min,max}
//log version별 처리
if (s.length == 10 && s(1).equals("EXPOSURE")) {
Some((mapping, null, (s(4), (min(max(0L,s(5).toLong-s(6).toLong),1000*10), 0L), Some(s(9).toInt))))
}
else if (s.length > 6 && s(1).equals("EXPOSURE")){
Some((mapping, null, (s(4), (min(max(0L,s(5).toLong-s(6).toLong),1000*10), 0L), None)))
}
else if (s.length > 6 && s(1).equals("CONSUME")) {
Some((mapping, null, (s(4), (0L, min(1000L*30, if (s(6).isEmpty) 0L else s(6).toLong)), None)))
}
else None
}
else None
} catch {
case e: Exception => None
}
}
else None
}
).flatMap(x => x)
}


/**
* LOAD의 노출위치 정보에 따라 로그 매핑
*
* @param mappedLog:
* RDD[
* (mapping: Map[String,String]
* , Some(loadInfo: List[(timeStamp:String, rank:Int)])
* , Some((expTime:Long, conTime:Long)))]
* @return :
* RDD[
* ((자리: Int, cid: String)
* , 노출총시간: Long)]
*/

def mapRanges(mappedLog: RDD[(Map[String, String], List[(BigInt, Int)], (String, (Long, Long), Option[Int]))]) = {
mappedLog.groupBy(x => x._1("uuid")).map(x => {
try {
// COMMON|LOAD 로그만 가져와서 각 rank 마다 매핑하는 과정
val loads = x._2.filter(a => a._3 == null)
if (loads.nonEmpty) {
val loadList = loads.map(a => (a._1("time").toLong, a))
val rangeLoads = loadList.foldLeft((
scala.collection.mutable.ArrayBuffer.empty[(Long, Long, List[(BigInt, Int)])], 0L))(
(a, b) => {
a._1 += ((a._2, b._1, b._2._2))
(a._1, b._1)
})._1
//최대값을 찾아서 매핑될 마지막 부분을 rangeLoads에 추가해준다. (최대값~ )에 해당하는 콘텐츠
val max = loads.maxBy(a => a._1("time").toLong)
rangeLoads += ((max._1("time").toLong, Long.MaxValue, max._2))

//COMMON|LOAD가 아닌 그외의 로그들(현재로써는 EXPOSURE & CONSUME)을 rangeLoads에 매핑시켜준다
val expcon = x._2.filter(a => a._2 == null)
val mapWithLoad = expcon.map(a => {
val cid = a._3._1
if (a._3._3.isEmpty){
val location: List[(BigInt, Int)] = rangeLoads.filter(b => {
b._1 <= a._1("time").toLong && b._2 > a._1("time").toLong
}).head._3.filter(c => c._1.toString().equals(cid))
(location, (a._1, a._3))
}
else (List((BigInt(0),a._3._3.get)),(a._1, a._3))
}).filter(_._1.nonEmpty).map(a => (a._1.head._2, (a._2._2._1, a._2._2._2))) //expcon의 ._2 는 필요없으므로(null) 매핑시 제외

//cid에 따른 각 자리(rank)마다 노출시간을 넣어준다
val rankedMapping = mapWithLoad.groupBy(m => (m._1, m._2._1)).map(m=>{
val cuml = m._2.map(a=>a._2._2).reduce((q,w)=>(q._1+w._1,q._2+w._2))
val consumeTime = scala.math.min(cuml._2, 4000000L)
(x._1, m._1, (cuml._1, consumeTime))
})
Some(rankedMapping)
}
else None
}
catch {
case e: Exception =>
None
}
}).flatMap(x => x).flatMap(x => x)
//최종형태(uuid, (노출위치, cid), (노출시간, 소비시간))
}

def main(args: Array[String]) {
val nowTS: Long = System.currentTimeMillis
val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)

val doi = yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)

val log = getLog(sc, doi)
val joinedLog = joinLogWithLapse(log, getLapse(sc, doi, "CONSUME"))
val mapped: RDD[(Map[String, String], List[(BigInt, Int)], (String, (Long, Long), Option[Int]))] = mapActions(joinedLog, countUser(log, 10000))
val ecWithRank: RDD[(String, (Int, String), (Long, Long))] = mapRanges(mapped).filter(x=>x._3._1 != 0L && x._3._2 != 0L)

val csvMapRank = ecWithRank.map(x=>s"${x._1},${x._2._1},${x._2._2},${x._3._1},${x._3._2}")
csvMapRank.saveAsTextFile(s"hdfs://pikinn/preprocess/ecWithRank/$doi/")

}

}