Blame view

app/com/piki_ds/preprocess/CidValidation.scala 1.92 KB
282422a45   Joanne   first commit of p...
1
2
3
  package com.piki_ds.preprocess
  
  import org.apache.spark.sql.SQLContext
3f5548ee2   Joanne   temp change for i...
4
  import com.piki_ds.utils.GetTextFile.getDBDump
282422a45   Joanne   first commit of p...
5
6
7
8
9
10
11
  
  
  /**
    * Created by jungwon on 4/21/16.
    */
  
  object CidValidation {
ddba312d2   Joanne   coneffic error
12
    def getCidByStatuses(sQLContext: SQLContext, filterStatus:Array[String], filterUdate:(String,String)) = {
282422a45   Joanne   first commit of p...
13
      import org.apache.spark.sql.functions._
ddba312d2   Joanne   coneffic error
14
      val whereStr = s"udate between ${filterUdate._1} and ${filterUdate._2} and title is not null and" +
2fd2d149e   Joanne   cid valid
15
        s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " +
8da4773a0   Joanne   cid validation
16
        s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})"
3f5548ee2   Joanne   temp change for i...
17
      val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr)
282422a45   Joanne   first commit of p...
18
19
20
21
22
23
24
25
      val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate")))
      mgContents.map(x=>{
        val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)")
        val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0
        (x.getAs[Long]("contents_id"), (status, ts))
      }).reduceByKey((a,b) => {
        import math.{min,max}
        (max(a._1,b._1), min(a._2,b._2))
ddba312d2   Joanne   coneffic error
26

282422a45   Joanne   first commit of p...
27
28
      })
    }
417d41016   Joanne   cid by status
29
30
31
  
    def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = {
      import org.apache.spark.sql.functions._
7393595ae   Joanne   error in sql
32
      val whereStr = s"udate is not null and contents_id is not null and title is not null and" +
417d41016   Joanne   cid by status
33
34
35
36
37
38
39
40
41
42
43
44
45
46
        s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " +
        s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})"
      val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr)
      val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate")))
      mgContents.map(x=>{
        val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)")
        val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0
        (x.getAs[Long]("contents_id"), (status, ts))
      }).reduceByKey((a,b) => {
        import math.{min,max}
        (max(a._1,b._1), min(a._2,b._2))
  
      })
    }
282422a45   Joanne   first commit of p...
47
  }