Commit 8e31f0b44f2941f75a837347517c2fd343fe64b5

Authored by Joanne ago
1 parent 417d410163
Exists in master

recent score load

Showing 1 changed file with 19 additions and 0 deletions Side-by-side Diff

app/com/piki_ds/utils/TempScoreSaveLoad.scala View file @ 8e31f0b
1 1 package com.piki_ds.utils
2 2  
  3 +import org.apache.hadoop.fs.{Path, FileStatus, FileSystem}
3 4 import org.apache.spark.rdd.RDD
4 5 import org.apache.spark.SparkContext
5 6  
6 7  
7 8  
8 9  
... ... @@ -7,15 +8,33 @@
7 8 * Created by jungwon on 11/18/15.
8 9 */
9 10  
  11 +
  12 +
10 13 object TempScoreSaveLoad {
  14 +
  15 + def recentlyUpdatedPath(path:String , isParted:Boolean = true, hdfs:FileSystem): FileStatus = {
  16 + val list = hdfs.listStatus(new Path(path))
  17 + list.filter(x=>x.isDirectory && (!isParted || (isParted && hdfs.exists(x.getPath.suffix("/_SUCCESS"))))).maxBy(x=>x.getModificationTime)
  18 + }
  19 +
11 20 def scoreSave(doi:String, scoreType:String, subScore: String, input:RDD[(String,Long)], coal:Int) = {
12 21 input.map{
13 22 case (cid, score) =>
14 23 s"$cid,$score"
15 24 }.coalesce(coal).saveAsTextFile(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/$doi")
16 25 }
  26 +
17 27 def scoreLoad(sc:SparkContext, doi:String, scoreType:String, subScore: String) = {
18 28 val scoreTxt = sc.textFile(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/$doi")
  29 + scoreTxt.map(x=>{
  30 + val s = x.split(",")
  31 + (s(0).toInt, s(1).toInt)
  32 + })
  33 + }
  34 +
  35 + def recentScoreLoad(sc:SparkContext, hdfs:FileSystem, scoreType:String, subScore:String) = {
  36 + val scorePath = recentlyUpdatedPath(s"hdfs://pikinn/preprocess/timelineScore/$scoreType/$subScore/", true, hdfs).getPath.toString
  37 + val scoreTxt = sc.textFile(scorePath)
19 38 scoreTxt.map(x=>{
20 39 val s = x.split(",")
21 40 (s(0).toInt, s(1).toInt)