Commit e4f0b08233df63f091060adfc379574536de48d1

Authored by evan ago
1 parent 60c7dbd340
Exists in master

getLog def error filter

Showing 1 changed file with 9 additions and 3 deletions Inline Diff

app/com/piki_ds/ver2ggh/expConTime.scala View file @ e4f0b08
/** 1 1 /**
* Created by Evan on 2016. 5. 10.. 2 2 * Created by Evan on 2016. 5. 10..
*/ 3 3 */
package com.piki_ds.ver2ggh 4 4 package com.piki_ds.ver2ggh
import org.apache.spark.SparkContext 5 5 import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD 6 6 import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} 7 7 import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.functions._ 8 8 import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType} 9 9 import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType}
10 10
import scala.collection.immutable.Iterable 11 11 import scala.collection.immutable.Iterable
12 12
object expConTime { 13 13 object expConTime {
14 14
val sc = SparkContext.getOrCreate() 15 15 val sc = SparkContext.getOrCreate()
val sqlContext = SQLContext.getOrCreate(sc) 16 16 val sqlContext = SQLContext.getOrCreate(sc)
val hadoopConf = new org.apache.hadoop.conf.Configuration() 17 17 val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new 18 18 val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://pikinn"), hadoopConf) 19 19 java.net.URI("hdfs://pikinn"), hadoopConf)
import sqlContext.implicits._ 20 20 import sqlContext.implicits._
21 21
/** 22 22 /**
* uuid 별 노출, 오픈, 소비 정보를 불러옴 23 23 * uuid 별 노출, 오픈, 소비 정보를 불러옴
* 24 24 *
* @param date 25 25 * @param date
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume) 26 26 * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume)
*/ 27 27 */
def getLogParse(date: String): DataFrame = { 28 28 def getLogParse(date: String): DataFrame = {
val current = date 29 29 val current = date
val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8)) 30 30 val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8))
val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz") 31 31 val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz")
32 32
val format = new java.text.SimpleDateFormat("yyyyMMdd") 33 33 val format = new java.text.SimpleDateFormat("yyyyMMdd")
val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60)) 34 34 val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60))
val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8) 35 35 val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8)
val currentday = year + "-" + month + "-" + day 36 36 val currentday = year + "-" + month + "-" + day
37 37
val action = log.map(x => { 38 38 val action = log.map(x => {
try { 39 39 try {
val s = x.split("\\|", -1) 40 40 val s = x.split("\\|", -1)
var mapping = scala.collection.mutable.Map.empty[String, String] 41 41 var mapping = scala.collection.mutable.Map.empty[String, String]
if (s.size > 4 && ( 42 42 if (s.size > 4 && (
s(2).forall(Character.isDigit(_)) && 43 43 s(2).forall(Character.isDigit(_)) &&
s(3).forall(Character.isDigit(_)) && 44 44 s(3).forall(Character.isDigit(_)) &&
s(4).forall(Character.isDigit(_)))) { 45 45 s(4).forall(Character.isDigit(_)))) {
val s2 = if (s(2).size == 10) s(2) + "000" else s(2) 46 46 val s2 = if (s(2).size == 10) s(2) + "000" else s(2)
mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4)) 47 47 mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4))
if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE") 48 48 if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
&& s(9).forall(Character.isDigit(_)) && s(9).size < 4) { 49 49 && s(9).forall(Character.isDigit(_)) && s(9).size < 4) {
def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString 50 50 def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell()) 51 51 mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell())
Some(mapping) 52 52 Some(mapping)
} else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE") 53 53 } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
&& s(9).forall(Character.isLetter(_)) && s(8).size < 4) { 54 54 && s(9).forall(Character.isLetter(_)) && s(8).size < 4) {
def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString 55 55 def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell()) 56 56 mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell())
Some(mapping) 57 57 Some(mapping)
} else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) { 58 58 } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) {
mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7)) 59 59 mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7))
Some(mapping) 60 60 Some(mapping)
} else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) { 61 61 } else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0 62 62 def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
mapping = mapping ++ Map("consume" -> consumeTime(30).toString) 63 63 mapping = mapping ++ Map("consume" -> consumeTime(30).toString)
Some(mapping) 64 64 Some(mapping)
} else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) { 65 65 } else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0 66 66 def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
mapping = mapping ++ Map("consume" -> consumeTime2(30).toString) 67 67 mapping = mapping ++ Map("consume" -> consumeTime2(30).toString)
Some(mapping) 68 68 Some(mapping)
} else { 69 69 } else {
None 70 70 None
} 71 71 }
} 72 72 }
else { 73 73 else {
None 74 74 None
} 75 75 }
} catch { 76 76 } catch {
case e: Exception => 77 77 case e: Exception =>
None 78 78 None
} 79 79 }
}).flatMap(x => x) 80 80 }).flatMap(x => x)
81 81
val df = action.filter(x => x.getOrElse("ERROR", null) == null).map { x => 82 82 val df = action.filter(x => x != None).filter(x => x.getOrElse("ERROR", null) == null).map { x =>
(x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong, 83 83 try {
x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt,x.getOrElse("consume", "0").toInt)} 84 84 Some(x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong,
85 x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt, x.getOrElse("consume", "0").toInt)
86 } catch {
87 case e: Exception =>
88 None
89 }
90 }.filter(x => x != None).map(x => x.get)
85 91
// 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택 86 92 // 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택
val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0"). 87 93 val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0").
where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")). 88 94 where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")).
where(s"timeStamp >= '$currentday'") 89 95 where(s"timeStamp >= '$currentday'")
90 96
df1 91 97 df1
} 92 98 }
93 99
/** 94 100 /**
* uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑 95 101 * uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑
* uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에 96 102 * uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에
* 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑 97 103 * 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑
* 98 104 *
* (오픈 , 소비 정보 활용) 99 105 * (오픈 , 소비 정보 활용)
* 100 106 *
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime) 101 107 * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
*/ 102 108 */
def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = { 103 109 def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = {
val out = base.map { y => 104 110 val out = base.map { y =>
val info = y._2.map { x => 105 111 val info = y._2.map { x =>
val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L) 106 112 val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
(event, time, key, position, cid) 107 113 (event, time, key, position, cid)
}.filter(x => x._1 == "OPEN") 108 114 }.filter(x => x._1 == "OPEN")
109 115
var idx = -1 110 116 var idx = -1
val result1 = y._2.map { x => 111 117 val result1 = y._2.map { x =>
def idxExc(idx: Int) = if (idx == -1) 0 else idx 112 118 def idxExc(idx: Int) = if (idx == -1) 0 else idx
if (x._1 == "OPEN") { 113 119 if (x._1 == "OPEN") {
idx += 1 114 120 idx += 1
Some(x.copy(_9 = info(idx)._2, _8 = 0)) 115 121 Some(x.copy(_9 = info(idx)._2, _8 = 0))
} else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) { 116 122 } else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) {
Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4)) 117 123 Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4))
} else None 118 124 } else None
} 119 125 }
val result2 = result1.filter(x => x != None).map(x => x.get) 120 126 val result2 = result1.filter(x => x != None).map(x => x.get)
(y._1, result2) 121 127 (y._1, result2)
}.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime") 122 128 }.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime")
123 129
val StringToInt = udf[Int, String]( _.toInt) 124 130 val StringToInt = udf[Int, String]( _.toInt)
125 131
val out1 = out.withColumn("position", StringToInt(out("position"))) 126 132 val out1 = out.withColumn("position", StringToInt(out("position")))
127 133
out1 128 134 out1
} 129 135 }
130 136
/** 131 137 /**
* uuid 별 컨텐츠 노출에 대한 노출 시간 계산 132 138 * uuid 별 컨텐츠 노출에 대한 노출 시간 계산
* uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가? 133 139 * uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가?
* uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산 134 140 * uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산
* 135 141 *
* (노출 , 오픈 정보 활용) 136 142 * (노출 , 오픈 정보 활용)
* 137 143 *
* @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime) 138 144 * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
*/ 139 145 */
def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = { 140 146 def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = {
val expTime = base.flatMap { x => 141 147 val expTime = base.flatMap { x =>
try { 142 148 try {
val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y => 143 149 val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y =>
val sortY = y._2.sortWith(_._2 < _._2) 144 150 val sortY = y._2.sortWith(_._2 < _._2)
145 151
val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN") 146 152 val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN")
val out = if (lastIdx == -1) { 147 153 val out = if (lastIdx == -1) {
y._2 148 154 y._2
} else { 149 155 } else {
y._2.take(lastIdx + 1) 150 156 y._2.take(lastIdx + 1)
} 151 157 }
152 158
val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum 153 159 val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum 154 160 val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size 155 161 val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size 156 162 val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
157 163
(y._1._1, y._1._2, y._1._3, result1, size1, result2, size2) 158 164 (y._1._1, y._1._2, y._1._3, result1, size1, result2, size2)
}.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7)) 159 165 }.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7))
ar 160 166 ar
} catch { 161 167 } catch {
case e:Exception => None 162 168 case e:Exception => None
} 163 169 }
}.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2"). 164 170 }.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2").
where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0") 165 171 where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0")
166 172
expTime 167 173 expTime
} 168 174 }
169 175
def main(args: Array[String]): Unit = { 170 176 def main(args: Array[String]): Unit = {
171 177
// 로그 파스 172 178 // 로그 파스
val saveDay = if(args.length == 1) args(0) else "20160518" 173 179 val saveDay = if(args.length == 1) args(0) else "20160518"
val df1 = getLogParse(saveDay) 174 180 val df1 = getLogParse(saveDay)
175 181
// 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔 176 182 // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
val base1 = df1.map { x => 177 183 val base1 = df1.map { x =>
(x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"), 178 184 (x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"),
x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L) 179 185 x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L)
}.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2))) 180 186 }.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2)))
181 187
val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))). 182 188 val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))).
filter { x => x._2.map(y => y._1).contains("OPEN") } 183 189 filter { x => x._2.map(y => y._1).contains("OPEN") }
184 190
val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN")))) 185 191 val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN"))))
186 192
// (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑 187 193 // (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑
val openConsumeIdxed = consumeIndexing(base2) 188 194 val openConsumeIdxed = consumeIndexing(base2)
189 195
val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'"). 190 196 val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'").
groupBy("uuid", "cid", "fromKey", "position"). 191 197 groupBy("uuid", "cid", "fromKey", "position").
agg(expr("sum(consume) as consume")) 192 198 agg(expr("sum(consume) as consume"))
193 199
// (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산 194 200 // (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산
val expTime = exposureTime(base3) 195 201 val expTime = exposureTime(base3)
196 202
val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1") 197 203 val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1")
198 204
val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") && 199 205 val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") &&
exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter"). 200 206 exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position")) 201 207 drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
202 208
expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay") 203 209 expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
204 210
} 205 211 }
} 206 212 }
207 213
///////////////////////ㅣ 208 214 ///////////////////////ㅣ
209 215
/* 로그 변환하여 종합 210 216 /* 로그 변환하여 종합
val tst3 = base3.flatMap { x => 211 217 val tst3 = base3.flatMap { x =>
val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x => 212 218 val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x =>
val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN") 213 219 val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN")
val out = if (lastIdx == -1) { 214 220 val out = if (lastIdx == -1) {
x._2 215 221 x._2
} else { 216 222 } else {
x._2.take(lastIdx + 1) 217 223 x._2.take(lastIdx + 1)
} 218 224 }
(x._1, out) 219 225 (x._1, out)
}.flatMap(x => x._2).toArray.sortWith(_._2 < _._2) 220 226 }.flatMap(x => x._2).toArray.sortWith(_._2 < _._2)
ar2 221 227 ar2
}.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime") 222 228 }.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime")
val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'"). 223 229 val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'").
withColumn("position", StringToInt(tst3("position"))). 224 230 withColumn("position", StringToInt(tst3("position"))).
selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking"). 225 231 selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking").
groupBy("uuid","cid","ranking"). 226 232 groupBy("uuid","cid","ranking").
agg(expr("sum(dwell) as dwell")) 227 233 agg(expr("sum(dwell) as dwell"))
228 234
val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10), 229 235 val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10), 230 236 ("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) ) 231 237 ("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) )
232 238
sqlContext.createDataFrame(ar).show 233 239 sqlContext.createDataFrame(ar).show
234 240
val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x => 235 241 val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x =>
val lastIdx = x._2.lastIndexWhere(x => x._1 == "2") 236 242 val lastIdx = x._2.lastIndexWhere(x => x._1 == "2")
val out = if( lastIdx == -1) { 237 243 val out = if( lastIdx == -1) {
x._2 238 244 x._2
} else x._2.take(lastIdx+1) 239 245 } else x._2.take(lastIdx+1)
(x._1, out) 240 246 (x._1, out)
}.flatMap(x => x._2).toArray 241 247 }.flatMap(x => x._2).toArray
242 248
sqlContext.createDataFrame(gg).show 243 249 sqlContext.createDataFrame(gg).show
244 250
ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x) 245 251 ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x)
246 252
val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10)) 247 253 val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10))
248 254
val tt1 = tt.take(lastIdx+1) 249 255 val tt1 = tt.take(lastIdx+1)
250 256
val tt = consumeIndexing(df2) 251 257 val tt = consumeIndexing(df2)
val dff = base2.take 252 258 val dff = base2.take
253 259
val tmp2 = df2._2.map { x => 254 260 val tmp2 = df2._2.map { x =>
val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L) 255 261 val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
(event, time, key, position, cid) 256 262 (event, time, key, position, cid)
}.filter(x => x._1 == "OPEN") 257 263 }.filter(x => x._1 == "OPEN")
258 264
var idx = -1 259 265 var idx = -1
val gg = df2._2.map { x => 260 266 val gg = df2._2.map { x =>
println(x) 261 267 println(x)
println(idx) 262 268 println(idx)
def idxExc(idx: Int) = if (idx == -1) 0 else idx 263 269 def idxExc(idx: Int) = if (idx == -1) 0 else idx
if (x._1 == "OPEN") { 264 270 if (x._1 == "OPEN") {
idx += 1 265 271 idx += 1