CidValidation.scala 1 KB
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
package com.piki_ds.preprocess

import org.apache.spark.sql.SQLContext

import com.piki_ds.utils.GetTextFile.getDBDump


/**
* Created by jungwon on 4/21/16.
*/

object CidValidation {

def getCidByStatus(sQLContext: SQLContext, filterStatus:Array[String]) = {
import org.apache.spark.sql.functions._
val whereStr = s"udate is not null and title is not null and" +
s" contents_type in ('ALBUM', 'ALBUM.A', 'CHST', 'CHST.A','TOON','TOON.A') and " +
s"status in (${filterStatus.map(x=>s"'$x'").mkString(",")})"
val mgc = getDBDump(sQLContext,"MG_CONTENTS").where(whereStr)
val mgContents = mgc.select(mgc("contents_id"),mgc("status"), unix_timestamp(mgc("udate")))
mgContents.map(x=>{
val ts = x.getAs[Long]("unixtimestamp(udate,yyyy-MM-dd HH:mm:ss)")
val status = if (x.getAs[String]("status").equals("ACTV")) 1 else 0
(x.getAs[Long]("contents_id"), (status, ts))
}).reduceByKey((a,b) => {
import math.{min,max}
(max(a._1,b._1), min(a._2,b._2))
})
}
}