Commit 7ea56555757131d4b1a5cb0ff6e2b323d885fc42

Authored by hadoop ago
0 parents
Exists in master

first commit

Showing 8 changed files with 663 additions and 0 deletions Side-by-side Diff

DBConnector.py View file @ 7ea5655
... ... @@ -0,0 +1,28 @@
  1 +import MySQLdb
  2 +
  3 +# DB Information
  4 +HOST = "recsys.chpskqjvla14.us-east-1.rds.amazonaws.com"
  5 +USER = "recsys"
  6 +PASS = "youlike123"
  7 +DB_NAME = "PIKI_RECSYS"
  8 +
  9 +# DB Information
  10 +PHOST = "127.0.0.1"
  11 +PUSER = "root"
  12 +PPASS = "zmfflr2715"
  13 +PDB_NAME = "piki_contents_recommend"
  14 +PORT = 43306
  15 +
  16 +class RecSYS_DBConnector:
  17 + def __init__(self):
  18 + self.conn = MySQLdb.connect(host=HOST, user=USER, passwd=PASS, port=3306, db=DB_NAME)
  19 + self.conn.set_character_set("utf8")
  20 + self.cursor = self.conn.cursor()
  21 +
  22 +
  23 +
  24 +class PIKI_DBConnector:
  25 + def __init__(self):
  26 + self.conn = MySQLdb.connect(host=PHOST, user=PUSER, passwd=PPASS, port=43306, db=PDB_NAME)
  27 + self.conn.set_character_set("utf8")
  28 + self.cursor = self.conn.cursor()
MRRawToConsumeLog.py View file @ 7ea5655
... ... @@ -0,0 +1,38 @@
  1 +#!/usr/bin/python
  2 +from mrjob.job import MRJob
  3 +
  4 +class MRRawToConsumeLog(MRJob):
  5 + def mapper(self, _, line):
  6 + try:
  7 + if line.startswith('CARD|CONSUME'):
  8 + tmp = line.rstrip('\n').split('|')
  9 + card_no_int = 0
  10 + timestamp, uid, cid, card_no = ("","","","")
  11 + try:
  12 + timestamp, uid, cid, card_no = tmp[2:6]
  13 + card_no_int = int(card_no)
  14 + except ValueError:
  15 + print "Error Value: %s" % line
  16 + pass
  17 +
  18 + yield (uid, (cid, card_no_int))
  19 + except Error:
  20 + print "catch EOFError"
  21 + pass
  22 + def reducer(self, uid, consume_info):
  23 + try:
  24 + consume_map = {}
  25 + for cid, card_no in consume_info:
  26 + consume_map.setdefault(cid, 0)
  27 + if consume_map[cid] < card_no:
  28 + consume_map[cid] = card_no
  29 +
  30 + clicked_cids = ["%s|%d" % (cid, card_no) for cid, card_no in consume_map.iteritems()]
  31 + if len(clicked_cids) >= 5:
  32 + print '\t'.join([uid, ' '.join(clicked_cids)])
  33 + except Error:
  34 + print "catch EOFError"
  35 + pass
  36 +
  37 +if __name__ == '__main__':
  38 + MRRawToConsumeLog.run()
... ... @@ -0,0 +1,203 @@
  1 +#!coding=utf8
  2 +from datetime import datetime, timedelta
  3 +from mrjob.job import MRJob
  4 +from math import sqrt, log
  5 +import mmh3
  6 +import fileinput as f
  7 +import time
  8 +from subprocess import call
  9 +
  10 +################################################################
  11 +## User-Based Collaborative Filtering (MapReduce)
  12 +## Coded by Trey (2014.12.)
  13 +#################################################################
  14 +
  15 +class MRRecSys(MRJob):
  16 + def cluster_mapper_init(self):
  17 + """MinHash Clustering을 위한 파라미터 및 Seed를 미리 정의"""
  18 + p, q = 3, 15
  19 + # 랜덤하게 뽑아둔 seed 값을 미리 저장해둔다. 실시간으로 random 처리할 경우 Hadoop 노드마다 다른 값을 낸다.
  20 + seeds = [45, 32, 77, 29, 11, 85, 3, 0, 39, 41, 39, 28, 27, 60, 35, 91, 22, 23, 3, 70, 56, 54, 36, 71, 22, 48, 62, 19, 7, 44, 63, 91, 24, 42, 20, 8, 64, 34, 12, 94, 68, 43, 45, 48, 95, 17, 89, 93, 48, 20, 59, 83, 84, 57, 20, 45, 46, 58, 22, 71, 16, 79, 73, 88, 20, 20, 24, 85, 45, 72, 87, 82, 86, 18, 77, 73, 29, 47, 94, 18, 70, 79, 78, 82, 60, 13, 70, 44, 89, 79, 65, 91, 73, 66, 60, 90, 42, 18, 48, 80]
  21 + self.random_seeds = [seeds[i*p:i*p+p] for i in range(q)] # Nested List로 저장해둔다. p * q 리스트
  22 +
  23 + def cluster_mapper(self, _, line):
  24 + """p개의 MinHash값을 q번 추출한다."""
  25 + uid, cids_raw = line.rstrip('\n').split('\t')
  26 + cids = cids_raw.split(' ')
  27 + for seeds in self.random_seeds:
  28 + signature = []
  29 + for seed in seeds:
  30 + minhash = float("Inf")
  31 + for cid in cids:
  32 + h = abs(mmh3.hash(cid, seed))
  33 + if h < minhash:
  34 + minhash = h
  35 + signature.append(str(minhash))
  36 + yield (''.join(signature), uid)
  37 +
  38 + def cluster_reducer(self, signature, uids):
  39 + """Signiture 값에 속한 사용자를 출력한다."""
  40 + uids_list = [uid for uid in uids]
  41 + # 너무 많은 공통 사용자 그룹을 가진 signature는 PASS
  42 + if len(uids_list) > 1 and len(uids_list) < 100000:
  43 + yield (signature, uids_list)
  44 +
  45 + def recommend_mapper(self, signature, uids_list):
  46 + """해당 클러스터에 대해 각 사용자 uid와 나머지 사용자 리스트를 Mapping"""
  47 + for i in range(len(uids_list)):
  48 + yield (uids_list[i], uids_list[:i] + uids_list[i+1:])
  49 +
  50 + def recommend_reducer_init(self):
  51 + """Click Log와 IDF정보를 메모리에 올려둔다."""
  52 + self.click_logs_map = {}
  53 + self.idf_map = {}
  54 + self.exp_log = {}
  55 + self.filtered_contents_id = {}
  56 +
  57 + for line in f.input('click_logs.txt'):
  58 + uid, cids = line.rstrip('\n').split('\t')
  59 + cids_list = [x.split('(')[0] for x in cids.split(' ')]
  60 + self.click_logs_map[uid] = cids_list
  61 + for line in f.input('idf_map.txt'):
  62 + cid, cnt, idf = line.rstrip('\n').split('\t')
  63 + self.idf_map[cid] = float(idf)
  64 + for line in f.input('contents_exp.txt'):
  65 + cid, exp_date = line.rstrip('\n').split('\t')
  66 + self.exp_log[cid] = time.mktime(datetime.strptime(exp_date, '%Y-%m-%d %H:%M:%S').timetuple())
  67 + for line in f.input('filtered_contents.txt'):
  68 + cid = line.rstrip('\n')
  69 + self.filtered_contents_id[cid] = 0
  70 +
  71 + # Set Parameters for BM25 (Similarity Measure)
  72 + self.BMK = 0.3
  73 + self.BMB = 0.75
  74 + self.BML = 5.0
  75 + self.BMT = 2.2
  76 +
  77 + # 추천에 고려할 비슷한 사용자 수와 유사도 threshold
  78 + self.N = 1000
  79 + self.sim_th = 0.2
  80 +
  81 + # 랭킹에 쓰일 파라미터
  82 + self.max_idf = 1.5
  83 + self.max_cnt = 5.0
  84 + self.w_cnt = 0.2
  85 + self.RDS = 0.2
  86 + self.RDR = pow(100, self.RDS)
  87 + self.w_idf = 0.8
  88 + self.w_rcc = 0.35
  89 +
  90 + self.num_recommend = 100 # 최대 추천 개수
  91 + self.freq_threshold = 1 # cluster에서 threshold 이상 함께 등장한 사용자만 고려
  92 +
  93 + def recommend_reducer_count(self, uid, t_uids_lists):
  94 + """p, q 값의 조정에 따라 한 클러스터에 얼마나 묶이는지 비교 목적의 reducer"""
  95 + t_uids_map = {}
  96 + for t_uids_list in t_uids_lists:
  97 + for t_uid in t_uids_list:
  98 + t_uids_map.setdefault(t_uid, 0)
  99 + t_uids_map[t_uid] += 1
  100 +
  101 + print '\t'.join([uid, str(len(t_uids_map))])
  102 +
  103 + def recommend_reducer(self, uid, t_uids_lists):
  104 + """클러스터 내에서 유사도를 계산하고, top n명을 활용하여 추천 컨텐츠를 구한다."""
  105 + # 기준이 되는 유저에 대한 정보를 로드한다.
  106 + #if not uid in self.click_logs_map:
  107 + # print "KEYERROR!!!!!!!"
  108 + # print "error key: %s" % uid
  109 + # pass
  110 + try:
  111 + q_user_click_log = self.click_logs_map[uid]
  112 + idf_sum = sum([self.idf_map[ucid] for ucid in q_user_click_log])
  113 +
  114 + t_uids_map = {}
  115 + for t_uids_list in t_uids_lists:
  116 + for t_uid in t_uids_list:
  117 + # 이미 고려된 사용자는 PASS
  118 + t_uids_map.setdefault(t_uid, 0)
  119 + t_uids_map[t_uid] += 1
  120 +
  121 + t_user_cosine_list = []
  122 + for t_uid, cnt in t_uids_map.iteritems():
  123 + if cnt < self.freq_threshold:
  124 + continue
  125 + if not t_uid in self.click_logs_map:
  126 + continue
  127 + t_user_click_log = self.click_logs_map[t_uid]
  128 + tf_idf_sum = 0.0
  129 + doc_length = float(len(t_user_click_log))
  130 + ideal_doc_length = float(len(q_user_click_log) + 2) # query가 되는 유저보다 두개정도 더 본 것을 가장 우대(적합성 측면)
  131 + tf = min(1.0, (self.BMK + 1.0) / (self.BMK * ((1.0-self.BMB) + self.BMB*(doc_length / ideal_doc_length)) + 1.0)) # TF는 고정
  132 + for tcid in t_user_click_log:
  133 + if tcid in q_user_click_log:
  134 + idf = self.idf_map[tcid]
  135 + tf_idf_sum += tf * idf
  136 +
  137 + cosine_score = tf_idf_sum / max(0.1, idf_sum)
  138 +
  139 + t_user_cosine_list.append((t_uid, cosine_score))
  140 +
  141 + sorted_uids = sorted(t_user_cosine_list, key=lambda x: -x[1])
  142 + #print '\t'.join([uid, ' '.join(["%s(%.2f)" % similar_user for similar_user in sorted_uids[:50]]), 'usr'])
  143 +
  144 + candidates_map = {}
  145 + for t_uid, sim in sorted_uids[:self.N]:
  146 + if sim < self.sim_th:
  147 + break
  148 + for cid in self.click_logs_map[t_uid]:
  149 + # 추천 대상자가 이미 소비한 콘텐츠는 Skip
  150 + if cid in self.click_logs_map[uid] or cid in self.filtered_contents_id:
  151 + continue
  152 + #candidates_map.setdefault(cid, {"cnt": 0.0, "wgt_cnt": 0.0})
  153 + candidates_map.setdefault(cid, {"cnt": 0.0})
  154 + candidates_map[cid]["cnt"] += 1.0
  155 +
  156 + recommended_items = []
  157 + last_active_time = time.mktime((datetime.today() - timedelta(10)).timetuple())
  158 + for cid in self.click_logs_map[uid]:
  159 + if cid in self.exp_log:
  160 + active_time = self.exp_log[cid]
  161 + if active_time > last_active_time:
  162 + last_active_time = active_time
  163 +
  164 + for cid, info in candidates_map.iteritems():
  165 + if info["cnt"] < 3.0:
  166 + continue
  167 + idf = self.idf_map[cid]
  168 +
  169 + recency = 1.0
  170 + if cid in self.exp_log:
  171 + u_time = self.exp_log[cid]
  172 + hour_diff = max(0.0, (last_active_time - u_time) / (60.0 * 60.0) - 12.0)
  173 + recency = min(pow(hour_diff, self.RDS) / self.RDR, 1.0)
  174 +
  175 + # ranking functions
  176 + cnt_score = min(1.0, log(max(1.0, info['cnt'])) / log(self.max_cnt))
  177 + idf_score = min(self.max_idf, float(idf)) / self.max_idf
  178 +
  179 + final_score = self.w_cnt * cnt_score + self.w_idf * idf_score * recency
  180 + if final_score >= 0.8:
  181 + recommended_items.append((cid, final_score, idf, info['cnt'], recency))
  182 + if len(recommended_items) > 10:
  183 + sorted_recommended_items = sorted(recommended_items, key=lambda x: (-x[1], -x[2]))
  184 + print '\t'.join([uid, ' '.join(["%s|%.2f|%.2f|%.1f|%.2f" % recommended for recommended in sorted_recommended_items[:self.num_recommend]])])
  185 +
  186 + except KeyError:
  187 + #print "KEY ERROR!!!!!!!"
  188 + #print "ERROR KEY: %s" % uid
  189 + pass
  190 +
  191 + #print '\t'.join([uid, ' '.join(["%s(%.2f)" % uinfo for uinfo in sorted_uids])])
  192 +
  193 + def steps(self):
  194 + steps_list = []
  195 + steps_list.append(self.mr(mapper_init=self.cluster_mapper_init, mapper=self.cluster_mapper, reducer=self.cluster_reducer))
  196 + #steps_list.append(self.mr(mapper=self.recommend_mapper, reducer_init=self.recommend_reducer_init, reducer=self.recommend_reducer_count))
  197 +
  198 + steps_list.append(self.mr(mapper=self.recommend_mapper, reducer_init=self.recommend_reducer_init, reducer=self.recommend_reducer))
  199 + return steps_list
  200 +
  201 +
  202 +if __name__ == '__main__':
  203 + MRRecSys.run()
... ... @@ -0,0 +1,16 @@
  1 +#!/usr/bin/env python
  2 +from mrjob.job import MRJob
  3 +
  4 +class MRRawToConsumeLog(MRJob):
  5 + def mapper(self, _, line):
  6 + if line.startswith('CARD|CONSUME'):
  7 + tmp = line.rstrip('\n').split('|')
  8 + timestamp, uid, cid, card_no = tmp[2:6]
  9 + yield (uid, (cid, int(card_no)))
  10 +
  11 + def reducer(self, uid, consume_info):
  12 + consume_map = {}
  13 + print "%s, %s, %s" % (uid, consume_info.cid, consume_info.card_no)
  14 +
  15 +if __name__ == '__main__':
  16 + MRRawToConsumeLog.run()
... ... @@ -0,0 +1,3 @@
  1 +runners:
  2 + hadoop:
  3 + hadoop_home: /usr/
mrjob_default.conf View file @ 7ea5655
... ... @@ -0,0 +1,11 @@
  1 +runners:
  2 + emr:
  3 + aws_access_key_id: AKIAJZ4KUNKDU52DR2AQ
  4 + aws_region: us-east-1
  5 + aws_secret_access_key: lKEBl2dvMC+sqeQmgRvlfh7HAQA5WCcJLu2A9BN8
  6 + ec2_key_pair: pikicast-verginia
  7 + ec2_key_pair_file: ~/.ssh/pikicast-verginia.pem # ~/ and $ENV_VARS allowed here
  8 + ssh_tunnel_to_job_tracker: true
  9 + ec2_instance_type: m1.xlarge
  10 + num_ec2_instances: 7
  11 + enable_emr_debugging: true
run_daily_batch.sh View file @ 7ea5655
... ... @@ -0,0 +1,3 @@
  1 +cd /home/hadoop/piki_mrrecsys
  2 +#python clean_log_buckets.py
  3 +python run_recsys.py batch
run_recsys.py View file @ 7ea5655
... ... @@ -0,0 +1,361 @@
  1 +#!coding=utf8
  2 +import subprocess
  3 +from subprocess import call
  4 +from datetime import datetime, timedelta
  5 +from math import log
  6 +from glob import glob
  7 +from email.mime.multipart import MIMEMultipart
  8 +from email.mime.base import MIMEBase
  9 +from email.mime.text import MIMEText
  10 +from email import Encoders
  11 +from email import Utils
  12 +from email.header import Header
  13 +import smtplib
  14 +import sys
  15 +import fileinput as f
  16 +import os
  17 +import time
  18 +import DBConnector
  19 +import traceback
  20 +
  21 +#call("hdfs dfs -rm -r hdfs://kr-data-h1:9000//pikidata/rec_output/%s" % target_date, shell=True)
  22 +
  23 +def remove_zero_size_file(target_date):
  24 + file_list = subprocess.Popen(["hadoop", "fs", "-ls", "/log/dev=app/y=%s/mo=%s/d=%s/h=*/mi=*/*" % (target_date[0:4], target_date[4:6], target_date[6:])], stdout=subprocess.PIPE)
  25 + line_number = 0
  26 + zero_size_file_list = {}
  27 + for line in file_list.stdout:
  28 + if line_number == 0:
  29 + line_number += 1
  30 + continue
  31 + file_size = line.split()[4]
  32 + file_path = line.split()[7]
  33 + if file_size.strip() == "0":
  34 + print datetime.today(),"remove zero-size file : hdfs://kr-data-h1:9000/%s" % file_path
  35 + call("hadoop dfs -rm hdfs://kr-data-h1:9000/%s" % file_path, shell=True)
  36 + print (datetime.today(),"removed")
  37 + line_number += 1
  38 +
  39 +def run_log_converter(target_date):
  40 + st = time.time()
  41 + print datetime.today(),"Converting raw logs to consume and click logs..."
  42 + input_path = "hdfs://KR-DATA-H1:9000/log/dev=app/y=%s/mo=%s/d=%s/h=*/mi=*/idx=*" % (target_date[0:4], target_date[4:6], target_date[6:])
  43 + output_path = "hdfs://KR-DATA-H1:9000/pikidata/rec_input_raw/%s" % target_date
  44 +
  45 + call("/usr/bin/hadoop dfs -rm -r %s" % output_path, shell=True)
  46 + call("export HADOOP_HOME=/usr/", shell=True)
  47 + call("python MRRawToConsumeLog.py -r hadoop --hadoop-bin /usr/bin/hadoop --conf mrjob.conf %s --output-dir=%s --no-output --jobconf mapred.reduce.tasks=9" % (input_path, output_path), shell=True)
  48 +
  49 + print datetime.today(),"Converting logs completed. Elapsed time: %.2f" % (time.time() - st)
  50 +
  51 + return True
  52 +
  53 +
  54 +def run_postprocessing(target_date):
  55 + """IDF 계산 / 클릭로그 따로 저장(추천시스템 마지막 Reducer용) / 컨슘로그분리(카드 절반이상 소비)"""
  56 + st = time.time()
  57 + print datetime.today(),"Post-processing..."
  58 +
  59 + # Calculate IDF and split data into consume and click logs
  60 +
  61 + df_map = {}
  62 + user_map = {}
  63 + text = subprocess.Popen(["/usr/bin/hadoop", "fs", "-text", "hdfs://KR-DATA-H1:9000/pikidata/rec_input_raw/%s/part-*" % target_date], stdout=subprocess.PIPE)
  64 + for line in text.stdout:
  65 + uid, cids = line.rstrip('\n').split('\t')
  66 + for cid_info in cids.split():
  67 + cid = cid_info.split('|')[0]
  68 + df_map.setdefault(cid, 0)
  69 + df_map[cid] += 1
  70 + user_map.setdefault(uid, 0)
  71 + user_map[uid] += 1
  72 +
  73 + N = len(user_map)
  74 + print datetime.today(),"# Users: %d" % N
  75 +
  76 + # IDF맵 생성
  77 + fout = open('/home/hadoop/piki_mrrecsys/idf/idf_map.txt', 'w')
  78 + idf_map = {}
  79 + for cid, cnt in df_map.iteritems():
  80 + idf = max(0.0, log(N / float(cnt), 10))
  81 + fout.write('\t'.join([cid, str(cnt), str(idf)]) + '\n')
  82 + idf_map[cid] = idf
  83 +
  84 + # 콘텐츠 별 카드 수 로드
  85 + contents_length = {}
  86 + for line in f.input('/home/hadoop/piki_mrrecsys/metadata/contents_length.txt'):
  87 + cid, length = line.rstrip('\n').split('\t')
  88 + contents_length[cid] = float(length)
  89 +
  90 + # 파일 작성
  91 + fout_click = open('/home/hadoop/piki_mrrecsys/click_logs/click_logs.txt', 'w')
  92 + fout_consume = open('/home/hadoop/piki_mrrecsys/consume_logs/consume_logs_%s.txt' % target_date, 'w')
  93 + text = subprocess.Popen(["hdfs", "dfs", "-text", "hdfs://KR-DATA-H1:9000/pikidata/rec_input_raw/%s/part-*" % target_date], stdout=subprocess.PIPE)
  94 + for line in text.stdout:
  95 + uid, cids = line.rstrip('\n').split('\t')
  96 + clicked_list = []
  97 + consumed_list = []
  98 + max_idf = 0.0
  99 + for cid_info in cids.split():
  100 + cid, max_card = cid_info.split('|')
  101 + if cid in idf_map:
  102 + clicked_list.append((cid, idf_map[cid]))
  103 + if idf_map[cid] > max_idf:
  104 + max_idf = idf_map[cid]
  105 + consume_ratio = 1.0
  106 + if cid in contents_length:
  107 + consume_ratio = (float(max_card) + 1.0) / contents_length[cid]
  108 + if consume_ratio >= 0.5:
  109 + consumed_list.append(cid)
  110 + #if len(clicked_list) > 5 and max_idf > 0.8:
  111 + if len(clicked_list) > 5 and max_idf >= 1.0:
  112 + sorted_clicked_list = sorted(clicked_list, key=lambda x: -x[1])
  113 + cids_str = ' '.join(["%s(%.2f)" % x for x in sorted_clicked_list])
  114 + fout_click.write('\t'.join([uid, cids_str]) + '\n')
  115 + fout_consume.write('\t'.join([uid, ' '.join(consumed_list)]) + '\n')
  116 +
  117 + fout_click.close()
  118 + fout_consume.close()
  119 +
  120 + print datetime.today(),"Post-processing completed. Elapsed time: %.2f" % (time.time() - st)
  121 +
  122 +
  123 +def run_recsys(target_date):
  124 + st = time.time()
  125 + print datetime.today(),"Running User-Based Recommender System..."
  126 +
  127 + # Run MRRecsys
  128 + input_path = '/home/hadoop/piki_mrrecsys/consume_logs/consume_logs_%s.txt' % target_date
  129 + output_path = "hdfs://KR-DATA-H1:9000/test/pikidata/rec_output/%s" % target_date
  130 + call('/usr/bin/hadoop dfs -rm -r %s --recursive' % output_path, shell=True)
  131 + call('python MRRecSys.py -r hadoop --conf mrjob.conf --jobconf mapred.reduce.tasks=27 %s --output-dir=%s --file /home/hadoop/piki_mrrecsys/click_logs/click_logs.txt --file /home/hadoop/piki_mrrecsys/idf/idf_map.txt --file /home/hadoop/piki_mrrecsys/metadata/contents_exp.txt --file /home/hadoop/piki_mrrecsys/metadata/filtered_contents.txt --no-output' % (input_path, output_path), shell=True)
  132 +
  133 + print datetime.today(),"RecSYS Completed. Elapsed time: %.2f" % (time.time() - st)
  134 +
  135 + return True
  136 +
  137 +def run_db_uploader(target_date):
  138 + st = time.time()
  139 + print "Sending results to DB..."
  140 + db_conn = DBConnector.PIKI_DBConnector()
  141 + table_name = "reco_by_user"
  142 + # Create Table
  143 + sql = """DROP TABLE reco_by_user"""
  144 + db_conn.cursor.execute(sql)
  145 + sql = """CREATE TABLE IF NOT EXISTS %s (
  146 + uid INT NOT NULL,
  147 + recommended_items TEXT NULL,
  148 + clicked_items TEXT NULL,
  149 + similar_users TEXT NULL,
  150 + PRIMARY KEY (uid));""" % table_name
  151 + db_conn.cursor.execute(sql)
  152 + print datetime.today(),"Create table %s" % table_name
  153 +
  154 + # Send Data to DB
  155 + #results = glob('recsys_result/part*')
  156 + result = subprocess.Popen(["/usr/bin/hdfs", "dfs", "-text", "hdfs://KR-DATA-H1:9000/pikidata/rec_output/%s/part-*" % target_date], stdout=subprocess.PIPE)
  157 + #sql_format = """INSERT IGNORE INTO reco_by_user_%s_new (uid, recommended_items, clicked_items, similar_users) VALUES %s"""
  158 + sql_format = """INSERT IGNORE INTO %s (uid, recommended_items, clicked_items) VALUES %s"""
  159 + values_format = """(%s, "%s", "%s")"""
  160 + #sql_format = """INSERT IGNORE INTO %s (uid, recommended_items) VALUES %s"""
  161 + #values_format = """(%s, "%s")"""
  162 + cnt = 0
  163 + clicked_map = {}
  164 + print datetime.today(),"Loading user click history"
  165 + for line in f.input('click_logs/click_logs.txt'):
  166 + uid, clicked_items = line.rstrip('\n').split('\t')
  167 + clicked_map[uid] = clicked_items
  168 +
  169 + last_uid = None
  170 + user_info = {}
  171 + values = []
  172 + #for result in results:
  173 + for line in result.stdout:
  174 + uid, items = line.rstrip('\n').split('\t')
  175 + values.append(values_format % (uid, items, clicked_map[uid]))
  176 + cnt += 1
  177 + if cnt % 100 == 0:
  178 + sql = sql_format % (table_name, ', '.join(values))
  179 + db_conn.cursor.execute(sql)
  180 + values = []
  181 + if cnt % 1000 == 0:
  182 + print cnt
  183 + db_conn.conn.commit()
  184 +
  185 + print datetime.today(),"Recommendations for %d users inserted to DB. Elapsed time: %.2f" % (cnt, time.time() - st)
  186 +
  187 +def run_piki_db_uploader(target_date):
  188 + st = time.time()
  189 + print datetime.today(),"Sending results to DB..."
  190 + db_conn = DBConnector.PIKI_DBConnector()
  191 + # Send Data to DB
  192 + result = subprocess.Popen(["/usr/bin/hadoop", "fs", "-text", "hdfs://KR-DATA-H1:9000/pikidata/rec_output/%s/part-*" % target_date], stdout=subprocess.PIPE)
  193 + sql_format = """REPLACE INTO recsys_by_user_test (uid, recommended_items) VALUES %s"""
  194 + values_format = """(%s, "%s")"""
  195 + cnt = 0
  196 +
  197 + values = []
  198 + for line in result.stdout:
  199 + uid, items = line.rstrip('\n').split('\t')
  200 + items = ' '.join([x.split('|')[0] for x in items.split()])
  201 + values.append(values_format % (uid, items))
  202 + cnt += 1
  203 + if cnt % 100 == 0:
  204 + sql = sql_format % ', '.join(values)
  205 + db_conn.cursor.execute(sql)
  206 + values = []
  207 + if cnt % 10000 == 0:
  208 + print cnt
  209 + db_conn.conn.commit()
  210 +
  211 + print datetime.today(),"Recommendations for %d users inserted to DB. Elapsed time: %.2f" % (cnt, time.time() - st)
  212 +
  213 +def run_dev_batch(target_date=(datetime.today() - timedelta(1)).strftime('%Y%m%d')):
  214 + st_batch = time.time()
  215 + st = time.time()
  216 + print datetime.today(),"Remove zero size .gz files"
  217 + remove_zero_size_file(target_date)
  218 + print datetime.today(),"Converting Logs for %s..." % target_date
  219 + is_log = run_log_converter(target_date)
  220 + if not is_log:
  221 + raise SystemExit("Error! MRRawtoConsumeLog is not properly completed!")
  222 + print datetime.today(),"Converting Logs Completed. (%.2f)" % (time.time() - st)
  223 + print datetime.today(),"Postprocessing..."
  224 + run_postprocessing(target_date)
  225 + print datetime.today(),"Postprocessing Completed!"
  226 + print datetime.today(),"RecSYS for %s Started" % target_date
  227 + st = time.time()
  228 + is_recsys = run_recsys(target_date)
  229 + if not is_recsys:
  230 + raise SystemExit("Error! MRRecSYS is not properly completed!")
  231 + print datetime.today(),"RecSYS completed. (%.2f)" % (time.time() - st)
  232 + print datetime.today(),"Upload to DB"
  233 + run_db_uploader(target_date)
  234 + print datetime.today(),"Done! Elapsed Time: %.2f" % (time.time() - st_batch)
  235 +
  236 +def sendmail():
  237 + COMMASPACE = ", "
  238 + fromaddr = 'noreply2@pikicast.com'
  239 + toaddr = 'data_team@pikicast.com'
  240 + cc = 'trey@pikicast.com'
  241 + text = """
  242 +
  243 + 안녕하세요, JOEL 입니다.
  244 +
  245 + 금일 MRRECSYS 수행중 문제가 발생하였습니다.
  246 +
  247 + (본 메일은 문제발생시 메일이 보내지도록 자동화되어 있음을 참고해주시기 바랍니다. )
  248 +
  249 + 감사합니다.
  250 +
  251 + """
  252 + msg = MIMEMultipart()
  253 + msg['From'] = fromaddr
  254 + msg['To'] = toaddr
  255 + #msg["Cc"] = cc
  256 + msg['Subject'] = Header('[Datateam] 금일 MRRECSYS 수행 오류 발생','utf-8')
  257 + toaddrs=[toaddr,cc]
  258 +
  259 + msg.attach(MIMEText(text, "plain", _charset="utf-8"))
  260 +
  261 + try:
  262 + smtp_server = 'smtp.gmail.com'
  263 + smtp_username = 'noreply2@pikicast.com'
  264 + smtp_password = 'zmfflr2715'
  265 + smtp_port = '587'
  266 + smtp_do_tls = True
  267 +
  268 + server = smtplib.SMTP(
  269 + host = smtp_server,
  270 + port = smtp_port,
  271 + timeout = 1000
  272 + )
  273 + server.set_debuglevel(10)
  274 + server.starttls()
  275 + server.ehlo()
  276 + server.login(smtp_username, smtp_password)
  277 + result = server.sendmail(fromaddr, toaddrs , msg.as_string())
  278 + except Exception:
  279 + return (404,'email failed')
  280 + return server.quit()
  281 +
  282 +def run_batch(target_date=(datetime.today() - timedelta(1)).strftime('%Y%m%d')):
  283 + st_batch = time.time()
  284 + st = time.time()
  285 + print datetime.today(),"Remove zero size .gz files"
  286 + remove_zero_size_file(target_date)
  287 + print datetime.today(),"Converting Logs for %s..." % target_date
  288 + is_log = run_log_converter(target_date)
  289 + if not is_log:
  290 + raise SystemExit("Error! MRRawtoConsumeLog is not properly completed!")
  291 + print datetime.today(),"Converting Logs Completed. (%.2f)" % (time.time() - st)
  292 + print datetime.today(),"Postprocessing..."
  293 + run_postprocessing(target_date)
  294 + print datetime.today(),"Postprocessing Completed!"
  295 + print datetime.today(),"RecSYS for %s Started" % target_date
  296 + try:
  297 + is_recsys = run_recsys(target_date)
  298 + if not is_recsys:
  299 + raise SystemExit("Error! MRRecSYS is not properly completed!")
  300 + print datetime.today(),"RecSYS completed. (%.2f)" % (time.time() - st)
  301 + print datetime.today(),"Upload to DB"
  302 + run_piki_db_uploader(target_date)
  303 + #run_db_uploader()
  304 + except Exception:
  305 + traceback.print_exc()
  306 + sendmail()
  307 +
  308 +
  309 + print datetime.today(),"Done! Elapsed Time: %.2f" % (time.time() - st_batch)
  310 +
  311 +if __name__ == '__main__':
  312 + if len(sys.argv) >= 2 and sys.argv[1] == "batch":
  313 + if len(sys.argv) == 3:
  314 + target_date = sys.argv[2].strip()
  315 + run_batch(target_date=target_date)
  316 + else:
  317 + run_batch()
  318 + elif len(sys.argv) >= 2 and sys.argv[1] == "dev_batch":
  319 + if len(sys.argv) == 3:
  320 + target_date = sys.argv[2].strip()
  321 + run_dev_batch(target_date=target_date)
  322 + else:
  323 + run_batch()
  324 + elif len(sys.argv) == 3:
  325 + target_date = sys.argv[1].strip()
  326 + action = sys.argv[2].strip()
  327 + if action == "log":
  328 + run_log_converter(target_date)
  329 + elif action == "postprocess":
  330 + run_postprocessing(target_date)
  331 + elif action == "recsys":
  332 + run_recsys(target_date)
  333 + elif action == "recsys_test":
  334 + run_test(target_date)
  335 + elif action == "db":
  336 + run_db_uploader()
  337 + elif action == "pikidb":
  338 + run_piki_db_uploader()
  339 + else:
  340 + print "Invalid action name. Please check the manual."
  341 + else:
  342 + print "usage for individual jobs: python %s <target date - YYYYMMDD> <action - log | postprocess | recsys | db | pikidb>" % sys.argv[0]
  343 + print "usage for batch: python %s batch" % sys.argv[0]
  344 +#
  345 +#print "remove zero size files!!!!!"
  346 +#remove_zero_size_file(target_date)
  347 +
  348 +#print "convert raw to log!!!!!"
  349 +#run_log_converter(target_date)
  350 +
  351 +#print "run post processing!!!!!"
  352 +#run_postprocessing(target_date)
  353 +
  354 +#print "run recommend!!!!!!"
  355 +#run_recsys('20150601')
  356 +
  357 +#print "run_db_uploader()"
  358 +#run_db_uploader(target_date)
  359 +
  360 +#print "run_piki_db_uploader()"
  361 +#run_piki_db_uploader()