Blame view

app/com/piki_ds/ver2ggh/expConTime.scala 13.7 KB
856dd1fc5   evan   ggh ver2 first co...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
  /**
    * Created by Evan on 2016. 5. 10..
    */
  package com.piki_ds.ver2ggh
  import org.apache.spark.SparkContext
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types.{StructField, StructType, DataTypes, TimestampType}
  
  import scala.collection.immutable.Iterable
  
  object expConTime {
  
    val sc = SparkContext.getOrCreate()
    val sqlContext = SQLContext.getOrCreate(sc)
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new
        java.net.URI("hdfs://pikinn"), hadoopConf)
    import sqlContext.implicits._
  
    /**
      * uuid 별 노출, 오픈, 소비 정보를 불러옴
      *
      * @param date
      * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume)
      */
    def getLogParse(date: String): DataFrame = {
      val current = date
      val (ymd, year, month, day) = (current, current.slice(0, 4), current.slice(4, 6), current.slice(6, 8))
      val log = sc.textFile(s"hdfs://pikinn/log/dev=app/y=$year/mo=$month/d=$day/h=*/mi=*/*.gz")
  
      val format = new java.text.SimpleDateFormat("yyyyMMdd")
      val yester = format.format(format.parse(current).getTime - (24 * 1000 * 60 * 60))
      val yesterday = yester.slice(0, 4) + "-" + yester.slice(4,6) + "-" + yester.slice(6,8)
      val currentday = year + "-" + month + "-" + day
  
      val action = log.map(x => {
        try {
          val s = x.split("\\|", -1)
          var mapping = scala.collection.mutable.Map.empty[String, String]
          if (s.size > 4 && (
            s(2).forall(Character.isDigit(_)) &&
              s(3).forall(Character.isDigit(_)) &&
              s(4).forall(Character.isDigit(_)))) {
            val s2 = if (s(2).size == 10) s(2) + "000" else s(2)
            mapping = mapping ++ Map("category" -> s(0).replace("version=2&logArray=", ""), "event" -> s(1), "time" -> s2, "uuid" -> s(3), "cid" -> s(4))
            if (s.size > 7  && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
              && s(9).forall(Character.isDigit(_)) && s(9).size < 4) {
              def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
              mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(9), "dwell" -> dwell())
              Some(mapping)
            } else if (s.size > 7  && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("EXPOSURE")
              && s(9).forall(Character.isLetter(_)) && s(8).size < 4) {
              def dwell() = math.max(math.min(s(5).toLong - s(6).toLong, 10000L), 0L).toString
              mapping = mapping ++ Map("fromKey" -> s(7), "position" -> s(8), "dwell" -> dwell())
              Some(mapping)
            } else if (s.size > 7 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CONTENT") && s(1).equals("OPEN") && s(7).size < 4) {
              mapping = mapping ++ Map("fromKey" -> s(5), "position" -> s(7))
              Some(mapping)
            } else if (s.size > 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
              def consumeTime(i: Long) = if (s(7).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
              mapping = mapping ++ Map("consume" -> consumeTime(30).toString)
              Some(mapping)
            } else if (s.size == 8 && s(4).size <= 6 && s(4).size > 0 && s(0).equals("CARD") && s(1).equals("CONSUME")) {
              def consumeTime2(i:Long) = if(s(6).forall(Character.isDigit(_))) math.min(s(7).toLong, 1000L * i) else 1000L * 0
              mapping = mapping ++ Map("consume" -> consumeTime2(30).toString)
              Some(mapping)
            } else {
              None
            }
          }
          else {
            None
          }
        } catch {
          case e: Exception =>
            None
        }
      }).flatMap(x => x)
e4f0b0823   evan   getLog def error ...
81
82
83
84
85
86
87
88
89
      val df = action.filter(x => x != None).filter(x => x.getOrElse("ERROR", null) == null).map { x =>
        try {
          Some(x("event"), x("time").toLong, x("uuid").toLong, x("cid").toLong,
            x.getOrElse("fromKey", null), x.getOrElse("position", null), x.getOrElse("dwell", "0").toInt, x.getOrElse("consume", "0").toInt)
        } catch {
          case e: Exception =>
            None
        }
      }.filter(x => x != None).map(x => x.get)
856dd1fc5   evan   ggh ver2 first co...
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
  
      // 로그가 밀리는 현상이 발생 5/16일 기준 로그에서 5/16일 로그가 90.9% 5/15일 로그가 6.5% 를 차지하고 214개의 다른 날이 존재(오류포함)하기 때문에 기준일 t에 대한 데이터(91%)만 선택
      val df1 = df.toDF("event","time","uuid","cid","fromKey","position","dwell","consume").where("uuid != 0").
        where("event != 'EXPOSURE' or dwell != 0").where("event != 'CONSUME' or consume != 0").withColumn("timeStamp", column("time").cast(DataTypes.TimestampType).as("time")).
        where(s"timeStamp >= '$currentday'")
  
      df1
    }
  
    /**
      * uuid 별 컨텐츠 오픈에 대한 카드 소비 정보를 매핑
      * uuid 별 로그순서로 나열하고 일반적으로 컨텐츠 오픈 후에 카드 소비가 발생하기 때문에
      * 컨텐츠가 오픈되면 오픈경로, 위치, cid, 오픈시간을 저장한 후 다음 cid에 매핑
      *
      * (오픈 , 소비 정보 활용)
      *
      * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
      */
    def consumeIndexing(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]): DataFrame = {
      val out = base.map { y =>
        val info = y._2.map { x =>
          val (event, time, key, position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
          (event, time, key, position, cid)
        }.filter(x => x._1 == "OPEN")
  
        var idx = -1
        val result1 = y._2.map { x =>
          def idxExc(idx: Int) = if (idx == -1) 0 else idx
          if (x._1 == "OPEN") {
            idx += 1
            Some(x.copy(_9 = info(idx)._2, _8 = 0))
          } else if (x._1 == "CONSUME" && x._4 == info(idxExc(idx))._5) {
            Some(x.copy(_9 = info(idxExc(idx))._2, _5 = info(idxExc(idx))._3, _6 = info(idxExc(idx))._4))
          } else None
        }
        val result2 = result1.filter(x => x != None).map(x => x.get)
        (y._1, result2)
      }.flatMap(x => x._2).toDF("event","time","uuid","cid","fromKey","position", "dwell", "consume","openTime")
  
      val StringToInt = udf[Int, String]( _.toInt)
  
      val out1 = out.withColumn("position", StringToInt(out("position")))
  
      out1
    }
  
    /**
      * uuid 별 컨텐츠 노출에 대한 노출 시간 계산
      * uuid가 컨텐츠를 소비 후 재 노출되는 컨텐츠의 노출 시간은 유효한가?
      * uuid별 노출 된 컨텐츠 및 위치에 따른 노출시간 과 uuid별 컨텐츠를 마지막으로 오픈하기 전까지의 컨텐츠 및 위치에 따른 노출 시간 두가지 모두 계산
      *
      * (노출 , 오픈 정보 활용)
      *
      * @return : DataFrame(event, time, uuid, cid, fromKey, position, dwell, consume, openTime)
      */
    def exposureTime(base: RDD[(Long, Array[(String, Long, Long, Long, String, String, Int, Int, Long)])]) = {
      val expTime = base.flatMap { x =>
        try {
          val ar = x._2.groupBy(y => (y._4, y._5, y._6)).map(y => (y._1, y._2)).map { y =>
            val sortY = y._2.sortWith(_._2 < _._2)
  
            val lastIdx = y._2.lastIndexWhere(y => y._1 == "OPEN")
            val out = if (lastIdx == -1) {
              y._2
            } else {
              y._2.take(lastIdx + 1)
            }
  
            val result1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
            val result2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).map(y => y._7).sum
            val size1 = y._2.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
            val size2 = out.filter(y => y._1 == "EXPOSURE" && y._7 > 1000).size
  
            (y._1._1, y._1._2, y._1._3, result1, size1, result2, size2)
          }.map(y => (x._1, y._1, y._2, y._3.toInt, y._4, y._5, y._6, y._7))
          ar
        } catch {
          case e:Exception => None
        }
      }.filter(x => x != None).toDF("uuid","cid","fromKey","position","expTime1","expSize1","expTime2","expSize2").
        where("expTime1 !=0 and expSize1 != 0").where("expTime2 != 0 and expSize2 != 0")
  
      expTime
    }
  
    def main(args: Array[String]): Unit = {
  
      // 로그 파스
      val saveDay = if(args.length == 1) args(0) else "20160518"
      val df1 = getLogParse(saveDay)
  
      // 로그를 (오픈, 소비) 정보와 (소비, 오픈) 정보 둘로 나눔
      val base1 = df1.map { x =>
        (x.getAs[String]("event"), x.getAs[Long]("time"), x.getAs[Long]("uuid"), x.getAs[Long]("cid"),
          x.getAs[String]("fromKey"), x.getAs[String]("position"), x.getAs[Int]("dwell"), x.getAs[Int]("consume"), 0L)
      }.groupBy(x => x._3).map(x => (x._1, x._2.toArray.sortWith(_._2 < _._2)))
  
      val base2 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("OPEN") || y._1.equals("CONSUME")))).
        filter { x => x._2.map(y => y._1).contains("OPEN") }
  
      val base3 = base1.map(x => (x._1, x._2.filter(y => y._1.equals("EXPOSURE") || y._1.equals("OPEN"))))
  
      // (오픈, 소비) 정보에서 uuid별 로그 시간을 이용해 오픈 다음에 발생하는 같은 컨텐츠의 소비에 대해여 위치 및 오픈 경로 그리고 오픈타임 정보를 매핑
      val openConsumeIdxed = consumeIndexing(base2)
75bb1e19b   evan   change save direc...
194
      val openConsume = openConsumeIdxed.where("fromKey in ('h','m') and position != -1 and event = 'CONSUME'").
856dd1fc5   evan   ggh ver2 first co...
195
196
197
198
199
        groupBy("uuid", "cid", "fromKey", "position").
        agg(expr("sum(consume) as consume"))
  
      // (노출, 오픈) 정보에서 uuid별 컨텐츠 오픈 위치 및 경로에 따른 노출시간 계산
      val expTime = exposureTime(base3)
75bb1e19b   evan   change save direc...
200
      val exposureInfo = expTime.where("fromKey in ('h','m') and position != -1")
856dd1fc5   evan   ggh ver2 first co...
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
  
      val expCon = exposureInfo.join(openConsume, exposureInfo("uuid") === openConsume("uuid") && exposureInfo("cid") === openConsume("cid") &&
        exposureInfo("fromKey") === openConsume("fromKey") && exposureInfo("position") === openConsume("position"), "leftouter").
        drop(openConsume("uuid")).drop(openConsume("cid")).drop(openConsume("fromKey")).drop(openConsume("position"))
  
      expCon.write.mode(SaveMode.Overwrite).parquet(s"hdfs://pikinn/user/evan/Features/table=expConTime/dt=$saveDay")
  
    }
  }
  
    ///////////////////////ㅣ
  
     /* 로그 변환하여 종합
     val tst3 = base3.flatMap { x =>
        val ar2: Array[(String, Long, Long, Long, String, String, Int, Int, Long)] = x._2.groupBy(x => (x._4, x._5, x._6)).map(x => (x._1, x._2)).map { x =>
          val lastIdx = x._2.lastIndexWhere(x => x._1 == "OPEN")
          val out = if (lastIdx == -1) {
            x._2
          } else {
            x._2.take(lastIdx + 1)
          }
          (x._1, out)
        }.flatMap(x => x._2).toArray.sortWith(_._2 < _._2)
        ar2
      }.toDF("event","time","uuid","cid","fromKey","position","dwell","consume","openTime")
      val tst5 = tst3.where("fromKey in ('h','m') and position != -1 and position < 1000 and event = 'EXPOSURE'").
        withColumn("position", StringToInt(tst3("position"))).
        selectExpr("*", "case when fromKey = 'h' then 0 when fromKey = 'm' then position + 1 else null end as ranking").
        groupBy("uuid","cid","ranking").
        agg(expr("sum(dwell) as dwell"))
  
      val ar = Array( ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
        ("2",21,"m",2, 0), ("1",20,"m",1, 10), ("1",21,"m",2, 10), ("1",22,"m",3, 10),
        ("1",23,"m",4, 10), ("2",23,"m",4, 0), ("1",21,"m",2, 10), ("1",23,"m",4, 10) )
  
      sqlContext.createDataFrame(ar).show
  
      val gg = ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map { x =>
        val lastIdx = x._2.lastIndexWhere(x => x._1 == "2")
        val out = if( lastIdx == -1) {
          x._2
        } else x._2.take(lastIdx+1)
        (x._1, out)
      }.flatMap(x => x._2).toArray
  
      sqlContext.createDataFrame(gg).show
  
      ar.groupBy(x=> (x._2,x._3,x._4)).map(x => (x._1, x._2)).map(x => x)
  
      val tt = Array(("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10), ("2",23,"m",4,0), ("1",23,"m",4,10))
  
      val tt1 = tt.take(lastIdx+1)
  
      val tt = consumeIndexing(df2)
      val dff = base2.take
  
      val tmp2 = df2._2.map { x =>
        val (event,time,key,position, cid) = if (x._1 == "OPEN") (x._1, x._2, x._5, x._6, x._4) else ("CONSUME", 0L, "snull", "snull", 0L)
        (event, time, key, position, cid)
      }.filter(x => x._1 == "OPEN")
  
      var idx = -1
      val gg = df2._2.map { x =>
        println(x)
        println(idx)
        def idxExc(idx: Int) = if (idx == -1) 0 else idx
        if (x._1 == "OPEN") {
          idx += 1
          Some(x.copy(_8 = tmp2(idx)._2))
        } else if (x._1 == "CONSUME" && x._4 == tmp2(idxExc(idx))._5) {
          Some(x.copy(_8 = tmp2(idxExc(idx))._2, _5 = tmp2(idxExc(idx))._3, _6 = tmp2(idxExc(idx))._4))
        } else None
      }
  
      val a = gg.filter(x => x != None).map(x => x.get)
      sqlContext.createDataFrame(a).show(100)
  
      val ar = Array( ("1",20,"h",0), ("2",21,null,0), ("2",22,null,0), ("2",23,null,0),
        ("1",30,"h",0), ("2",31,null,0), ("2",32,null,0), ("2",33,null,0),
        ("1",40,"h",0), ("2",41,null,0), ("2",42,null,0), ("2",43,null,0) )
  
      //val tst = sc.parallelize(ar)
  
  
      tst.map {x=>
        val (opentime, fromkey, position) = if(x._1 == "1") {
          val opentime = x._2
          val fromkey = x._3
          val position = x._4
          (opentime, fromkey, position)
        }
  
        if(x._1 == "1") {
          x.copy(_4 = opentime)
        } else {
          x.copy(_2 = fromkey, _3 = position, _4 = opentime)
        }
      }
  
      val t1 = ar.map { x =>
        val (event,time,key) = if (x._1 == "1") (x._1, x._2, x._3) else ("2", 0, "0")
        (event, time, key)
      }.filter(x => x._1 == "1")
  
      var idx = -1
      ar.map{ x=>
        println(x)
        println(idx)
        if (x._1 == "1") {
          idx += 1
          x.copy(_4 = t1(idx)._2)
        } else {
          x.copy(_4 = t1(idx)._2, _3=t1(idx)._3)
        }
      }
    */