/**
* Created by Evan on 2016. 5. 10..
*/
package com.piki_ds.ver2ggh
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType}
import scala.collection.immutable.Iterable
object expConTime {
val sc = SparkContext.getOrCreate()
val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://pikinn"), hadoopConf)
import sqlContext.implicits._
/**
* uuid 별 노출, 오픈, 소비 정보를 불러옴
*
* @param date
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume)
*/
def getLogParse(date: String): DataFrame = {
val current = date
val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8))
val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz")
val format = new java.text.SimpleDateFormat("yyyyMMdd")
val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60))
val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8)
val currentday = year + "-" + month + "-" + day
val action = log.map(x => {
try {
val s = x.split("\\|", -1)
var mapping = scala.collection.mutable.Map.empty[String, String]
if (s.size > 4 && (
s(2).forall(Character.isDigit(_)) &&
s(3).forall(Character.isDigit(_)) &&
s(4).forall(Character.isDigit(_)))) {
val s2 = if (s(2).size == 10) s(2) + "000" else s(2)
mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4))
if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
&& s(9).forall(Character.isDigit(_)) && s(9).size < 4) {
def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell())
Some(mapping)
} else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
&& s(9).forall(Character.isLetter(_)) && s(8).size < 4) {
def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell())
Some(mapping)
} else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) {
mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7))
Some(mapping)
} else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
mapping = mapping ++ Map("consume" -> consumeTime(30).toString)
Some(mapping)
} else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
mapping = mapping ++ Map("consume" -> consumeTime2(30).toString)
Some(mapping)
} else {
None
}
}
else {
None
}
} catch {
case e: Exception =>
None
}
}).flatMap(x => x)
val df = action.filter(x => x != None).filter(x => x.getOrElse("ERROR", null) == null).map { x =>
try {
Some(x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong,
x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt, x.getOrElse("consume", "0").toInt)
} catch {
case e: Exception =>
None
}
}.filter(x => x != None).map(x => x.get)
// 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택
val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0").
where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")).
where(s"timeStamp >= '$currentday'")
df1
}
/**
* uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑
* uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에
* 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑
*
* (오픈 , 소비 정보 활용)
*
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
*/
def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = {
val out = base.map { y =>
val info = y._2.map { x =>
val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
(event, time, key, position, cid)
}.filter(x => x._1 == "OPEN")
var idx = -1
val result1 = y._2.map { x =>
def idxExc(idx: Int) = if (idx == -1) 0 else idx
if (x._1 == "OPEN") {
idx += 1
Some(x.copy(_9 = info(idx)._2, _8 = 0))
} else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) {
Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4))
} else None
}
val result2 = result1.filter(x => x != None).map(x => x.get)
(y._1, result2)
}.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime")
val StringToInt = udf[Int, String]( _.toInt)
val out1 = out.withColumn("position", StringToInt(out("position")))
out1
}
/**
* uuid 별 컨텐츠 노출에 대한 노출 시간 계산
* uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가?
* uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산
*
* (노출 , 오픈 정보 활용)
*
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
*/
def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = {
val expTime = base.flatMap { x =>
try {
val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y =>
val sortY = y._2.sortWith(_._2 < _._2)
val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN")
val out = if (lastIdx == -1) {
y._2
} else {
y._2.take(lastIdx + 1)
}
val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
(y._1._1, y._1._2, y._1._3, result1, size1, result2, size2)
}.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7))
ar
} catch {
case e:Exception => None
}
}.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2").
where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0")
expTime
}
def main(args: Array[String]): Unit = {
// 로그 파스
val saveDay = if(args.length == 1) args(0) else "20160518"
val df1 = getLogParse(saveDay)
// 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
val base1 = df1.map { x =>
(x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"),
x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L)
}.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2)))
val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))).
filter { x => x._2.map(y => y._1).contains("OPEN") }
val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN"))))
// (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑
val openConsumeIdxed = consumeIndexing(base2)
val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'").
groupBy("uuid", "cid", "fromKey", "position").
agg(expr("sum(consume) as consume"))
// (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산
val expTime = exposureTime(base3)
val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1")
val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") &&
exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
}
}
///////////////////////ㅣ
/* 로그 변환하여 종합
val tst3 = base3.flatMap { x =>
val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x =>
val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN")
val out = if (lastIdx == -1) {
x._2
} else {
x._2.take(lastIdx + 1)
}
(x._1, out)
}.flatMap(x => x._2).toArray.sortWith(_._2 < _._2)
ar2
}.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime")
val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'").
withColumn("position", StringToInt(tst3("position"))).
selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking").
groupBy("uuid","cid","ranking").
agg(expr("sum(dwell) as dwell"))
val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) )
sqlContext.createDataFrame(ar).show
val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x =>
val lastIdx = x._2.lastIndexWhere(x => x._1 == "2")
val out = if( lastIdx == -1) {
x._2
} else x._2.take(lastIdx+1)
(x._1, out)
}.flatMap(x => x._2).toArray
sqlContext.createDataFrame(gg).show
ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x)
val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10))
val tt1 = tt.take(lastIdx+1)
val tt = consumeIndexing(df2)
val dff = base2.take
val tmp2 = df2._2.map { x =>
val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
(event, time, key, position, cid)
}.filter(x => x._1 == "OPEN")
var idx = -1
val gg = df2._2.map { x =>
println(x)
println(idx)
def idxExc(idx: Int) = if (idx == -1) 0 else idx
if (x._1 == "OPEN") {
idx += 1
Some(x.copy(_8 = tmp2(idx)._2))
} else if (x._1 == "CONSUME" && x._4 == tmp2(idxExc(idx))._5) {
Some(x.copy(_8 = tmp2(idxExc(idx))._2, _5 = tmp2(idxExc(idx))._3, _6 = tmp2(idxExc(idx))._4))
} else None
}
val a = gg.filter(x => x != None).map(x => x.get)
sqlContext.createDataFrame(a).show(100)
val ar = Array( ("1",20,"h",0), ("2",21,null,0), ("2",22,null,0), ("2",23,null,0),
("1",30,"h",0), ("2",31,null,0), ("2",32,null,0), ("2",33,null,0),
("1",40,"h",0), ("2",41,null,0), ("2",42,null,0), ("2",43,null,0) )
//val tst = sc.parallelize(ar)
tst.map {x=>
val (opentime, fromkey, position) = if(x._1 == "1") {
val opentime = x._2
val fromkey = x._3
val position = x._4
(opentime, fromkey, position)
}
if(x._1 == "1") {
x.copy(_4 = opentime)
} else {
x.copy(_2 = fromkey, _3 = position, _4 = opentime)
}
}
val t1 = ar.map { x =>
val (event,time,key) = if (x._1 == "1") (x._1, x._2, x._3) else ("2", 0, "0")
(event, time, key)
}.filter(x => x._1 == "1")
var idx = -1
ar.map{ x=>
println(x)
println(idx)
if (x._1 == "1") {
idx += 1
x.copy(_4 = t1(idx)._2)
} else {
x.copy(_4 = t1(idx)._2, _3=t1(idx)._3)
}
}
*/