MakingLapse.scala 3.5 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
package com.piki_ds.preprocess

import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import com.piki_ds.utils.GetTextFile.getLog
import com.piki_ds.utils.DateTimeEtc.intoYesterdayMN

/**
* making lapse
*
* Created by jungwon on 6/16/15.
*/

object MakingLapse {

def getSparkConf = {
//System.setProperty("SPARK_YARN_MODE", "true")
val conf = new SparkConf().setAppName("MakingLapse")
conf.setMaster("yarn-client")
conf.set("master", "yarn-client")
conf.set("spark.app.name", "MakingLapse")
}

val sc = new SparkContext(getSparkConf)

def main(args:Array[String]) {
val nowTS: Long = System.currentTimeMillis
val yesterdayTuple: (Long, String) = intoYesterdayMN(nowTS)
println(sc.getConf.toDebugString)
val a1 = if (args.length >0) args(0) else ""
val yesterday = if(a1.length == 8) a1 else yesterdayTuple._2.replaceAll("[^0-9]", "").take(8)

//optional: 큰덩치 로그(ex.15000줄 이상) 골라내기
val log = filterBig(getLog(sc, yesterday), countUser(getLog(sc, yesterday), 15000))
val groupedAndSorted = groupAndSort(log)
val lapse = addLapse(groupedAndSorted)
//map -> csv
val csvLapse = lapse.map(x=>{
s"${x("category")}|${x("eventType")}|${x("actionTime")}|${x("uuid")}|${x("field0")}|${x("field1")}|${x("lapse")}"
})
csvLapse.coalesce(100).saveAsTextFile(s"hdfs://pikinn/preprocess/lapse/$yesterday", classOf[GzipCodec])
}


/**
* 과하게 덩치 큰 유저를 걸러내기 위해 uuid당 로그 줄 수 계산
*
* @param log
* @param dungchi : 얼마나 큰?
* @return
*/
def countUser(log:RDD[Array[String]], dungchi: Long): scala.collection.Map[String, Long] = {
log.filter(x => x.length > 3).map(x => (x(3), "-")).countByKey().filter(x => x._2 > dungchi)
}

/**
* 계산한 덩치를 기준으로 필터링
*
* @param log
* @param bigUserMap
* @return
*/
def filterBig(log:RDD[Array[String]], bigUserMap:scala.collection.Map[String, Long]) ={
log.filter(x=> !bigUserMap.contains(x(3)))
}

/**
* uuid로 groupBy, 시간 순으로 sortBy
*
* @param log
* @return
*/
def groupAndSort(log:RDD[Array[String]]) = {
val groupedRanker = log.groupBy(x => x(3))
groupedRanker.map(x => (x._1, x._2.toSeq.sortBy(a => a(2))))
}

/** 해당 action에서 다음 action으로 넘어가기까지 걸린 시간
*
* @param gsLog : grouped and sorted log
* @return
*/
def addLapse(gsLog: RDD[(String, Seq[Array[String]])]) ={
gsLog.map(x=>{
val c = x._2
val withLapse = c.indices.map(i => {
val l = c(i)
val lapse = if (i > 0) {
try {
l(2).toLong - c(i - 1)(2).toLong
} catch {
case e: Exception => 0L
}
} else 0L

val category = l(0)
val eventType = l(1)
val actionTime = l(2)
val uuid = l(3)
val field0 = l(4)
val field1 = if (l.length > 5) l(5) else ""

if (category.nonEmpty && eventType.nonEmpty && actionTime.nonEmpty && uuid.nonEmpty){
Some(
Map("category" -> category,
"eventType" ->eventType,
"actionTime" -> actionTime,
"uuid" -> uuid,
"lapse" -> math.max(0, math.min(lapse,1000L * 60L * 1L)).toString,
"field0"->field0,
"field1"->field1))
} else None
}).flatMap(x=>x)
withLapse
}).flatMap(x=>x)
}

}