GetTextFile.scala 3.25 KB
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
package com.piki_ds.utils

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext,DataFrame}

/**
* TextFile을 가져와서 RDD로 변환하는 모듈. log와 lapse를 가져올 때 활용
*
* Created by jungwon on 7/14/15.
*/

object GetTextFile {

def getDBDump(sQLContext: SQLContext, tableName:String) = {
sQLContext.read.parquet(s"hdfs://pikinn/preprocess/db/table=$tableName/")
}

/**
* 관심 날짜의 로그를 가져온다
*
* @param dateOfInterest : yyyyMMdd
* @return : delimiter로 split된 로그를 뱉어준다
*/
def getLog(sc:SparkContext, dateOfInterest:String) = {
println("DEBUG getLog : " + dateOfInterest )
val yr = dateOfInterest.take(4)
val mo = dateOfInterest.slice(4,6)
val day = dateOfInterest.slice(6,8)
sc.textFile(s"hdfs://pikinn/log/dev=app/y=$yr/mo=$mo/d=$day/h=*/mi=*/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
x.size>4 && x(2).forall(_.isDigit) && x(2).nonEmpty &&
x(3).forall(_.isDigit) && x(3).forall(_.isDigit)}).map(x=>{
val genTime = if(x(2).length == 10) x(2)+"000" else x(2)
x(2) = genTime
x
})
}

/**
* 미리 구해놓은 lapse 가져오기
*
* @param dateKey : yyyyMMdd
* @return : ((EVENT, actionTime, uuid), lapse)
*/

def getLapse(sc:SparkContext, dateKey:String, action:String)= {
val parseFilter = sc.textFile(s"hdfs://pikinn/preprocess/lapse/$dateKey/*.gz").map(x=>x.split("\\|", -1)).filter(x=>{
x.size==7 && x(2).forall(_.isDigit) && x(3).forall(_.isDigit) && x(4).forall(_.isDigit)
})
parseFilter.map(x=>{
((x(1),x(2),x(3),x(4)),x(6))
})
}

def getUserInfo(sc:SparkContext, dateOfInterest:String) = {
val c = java.util.Calendar.getInstance
val format_date = new java.text.SimpleDateFormat("yyyyMMdd")
val startingPoint = format_date.parse(dateOfInterest)
c.setTime(startingPoint)
c.add(java.util.Calendar.DATE, -1)
val fetchDate = format_date.format(c.getTime)
val textF:RDD[(String,Long)] = sc.objectFile(s"hdfs://pikinn/user/joanne/userReadAmt/$fetchDate/part*")
textF
}

def getCardSize(sc:SparkContext, doi: String) = {
val cidAndCS: RDD[(String, Int)] = sc.objectFile(s"hdfs://pikinn/preprocess/cidAndCardSize/$doi/part*")
cidAndCS
}

def parseHueDB(sc: SparkContext, dbDate:String, tableName:String, dbLabel:Array[String]) = {
val dbDir = sc.textFile(s"/data/db/$dbDate/$tableName/\\*.gz")
val dbRaw = dbDir.map(x=>{
import au.com.bytecode.opencsv.CSVParser
val parser = new CSVParser(',',''')
val s = parser.parseLine(x)
if (s.size==dbLabel.length) Some(dbLabel.zip(s).toMap) else None
}).flatMap(x=>x)
dbRaw
}

def getContentsViewTable(sqlContext:SQLContext, table:String, saveDate:String) = {
val pathStr = s"/user/dashboard/db/${table.toLowerCase}/y=${saveDate.take(4)}/mo=${saveDate.slice(4, 6)}/d=${saveDate.slice(6, 8)}/*.json"
val df = sqlContext.read.json(pathStr)
df
}

def getDashTable(sqlContext:SQLContext, table:String, saveDate:String): DataFrame= {
val pathStr = s"/user/dashboard/db/${table.toLowerCase}/y=${saveDate.take(4)}/mo=${saveDate.slice(4, 6)}/$saveDate.json"
val df = sqlContext.read.json(pathStr)
df
}
}