项目的目的是根据各影响因子和权重计算出对应用户的信用分值。数据源涉及到库表(Hive&Mysql)的数据、打点的日志数据,数据分别在Hive及Mysql中装载及计算,相对有些复杂。
可以借鉴的点:
打点日志从oss上获取及日数数据的处理、装载;
通过Python在mysql和Hive中进行数据的传输和计算;
日期、月份、x天以前日期的获取;
数据从hdfs上下载到本地;
mysql关联更新;
多表关联性能优化(主要添加索引);
mysql数据装载,中文乱码的处理;
大量数据插入mysql,批量提交的处理;
等等。
1、日期处理的函数脚本
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/DateCalc.py
# -*- coding=utf-8 -*- import warnings import datetime import calendar warnings.filterwarnings("ignore") def getNowDay(): DayNow = datetime.datetime.today().strftime('%Y-%m-%d') return DayNow def getYesterDay(): YesterDay = datetime.datetime.today() - datetime.timedelta(days=1) YesterDay = YesterDay.strftime('%Y-%m-%d') return YesterDay def getDaysXAgo(pt_day): DaysXAgo = datetime.datetime.strptime(pt_day, '%Y-%m-%d') - datetime.timedelta(days=7) DaysXAgo = DaysXAgo.strftime('%Y-%m-%d') return DaysXAgo def getDays1Ago(pt_day): Days1Ago = datetime.datetime.strptime(pt_day, '%Y-%m-%d') - datetime.timedelta(days=1) Days1Ago = Days1Ago.strftime('%Y-%m-%d') return Days1Ago def getAllMonthOfYear(): Month_list = ['{num:02d}'.format(num=i) for i in range(1, 13, 1)] return Month_list def getAllHourOfDay(): Hour_list = ['{num:02d}'.format(num=i) for i in range(0, 24, 1)] return Hour_list def getDayOfMonth(year, month): day_list = range(calendar.monthrange(year, month)[1]+1)[1:] day_list2 = [] for day in day_list: day_list2.append(str(day).zfill(2)) return day_list2 def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates # for date in dateRange('2017-02-01', '2017-03-14'): # year = date[0:4] # month = date[5:7] # day = date[8:10] # print year, month, day # print getAllMonthOfYear() # print getAllHourOfDay() # print getNowDay(),getYesterDay() # print getDayOfMonth(year=2017, month=10) # print dateRange(beginDate='2017-09-01', endDate='2017-11-19') # print getDaysXAgo('2017-12-01')
# -*- coding=utf-8 -*- import time import datetime import os import re import warnings warnings.filterwarnings("ignore") def CsPayActiveProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_pay_active; \ create table xcs_pay_active(pt_day varchar(10),uid bigint(20),pay_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ with tab_pay_sum as( \ select pt_day,uid,sum(amount) pay_amount \ from data_chushou_pay_info \ where state=0 and pt_day = '{pt_day}' \ group by pt_day,uid) \ select a1.pt_day,a1.uid,case when a1.pay_amount>a2.pay_amount_per then 1 else 0 end pay_active_flag \ from tab_pay_sum a1 \ left join (select pt_day,sum(pay_amount)/count(distinct uid) pay_amount_per \ from tab_pay_sum \ group by pt_day) a2 on a1.pt_day=a2.pt_day \ where a1.pay_amount>a2.pay_amount_per;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] uid = nrpd[1] pay_active_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,pay_active_flag,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsViewActiveProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_view_active; \ create table xcs_view_active(pt_day varchar(10),uid bigint(20),view_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ with tab_view_sum as( \ select pt_day,uid,sum(view_time) view_time \ from recommend_data_view \ where pt_day = '{pt_day}' \ group by pt_day,uid) \ select a1.pt_day,a1.uid,case when a1.view_time>a2.view_time_per then 1 else 0 end view_active_flag \ from tab_view_sum a1 \ left join (select pt_day,sum(view_time)/count(distinct uid) view_time_per \ from tab_view_sum \ group by pt_day) a2 on a1.pt_day=a2.pt_day \ where a1.view_time>a2.view_time_per;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] uid = nrpd[1] view_active_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,view_active_flag,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsMessageActiveProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_message_active; \ create table xcs_message_active(pt_day varchar(10),uid bigint(20),message_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ with tab_message_sum as( \ select pt_day,uid,count(*) message_cnt \ from oss_chushou_message_send \ where pt_day = '{pt_day}' \ group by pt_day,uid) \ select a1.pt_day,a1.uid,case when a1.message_cnt>a2.message_cnt_per then 1 else 0 end message_active_flag \ from tab_message_sum a1 \ left join (select pt_day,sum(message_cnt)/count(distinct uid) message_cnt_per \ from tab_message_sum \ group by pt_day) a2 on a1.pt_day=a2.pt_day \ where a1.message_cnt>a2.message_cnt_per;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] uid = nrpd[1] message_active_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day, uid, message_active_flag, etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsGiftActiveProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_gift_active; \ create table xcs_gift_active(pt_day varchar(10),room_creator_uid bigint(20),gift_active_flag int,etl_time datetime,KEY idx_uid (room_creator_uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ with tab_gift_sum as( \ select pt_day,room_creator_uid,sum(gift_point) gift_point \ from honeycomb_all_gift_record \ where pt_day = '{pt_day}' \ group by pt_day,room_creator_uid) \ select a1.pt_day,a1.room_creator_uid,case when a1.gift_point>a2.gift_point_per then 1 else 0 end gift_active_flag \ from tab_gift_sum a1 \ left join (select pt_day,sum(gift_point)/count(distinct room_creator_uid) gift_point_per \ from tab_gift_sum \ group by pt_day) a2 on a1.pt_day=a2.pt_day \ where a1.gift_point>a2.gift_point_per;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_gift_active(pt_day,room_creator_uid,gift_active_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] room_creator_uid = nrpd[1] gift_active_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,room_creator_uid,gift_active_flag,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_gift_active(pt_day,room_creator_uid,gift_active_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsLiveActiveProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_live_active; \ create table xcs_live_active(pt_day varchar(10),room_creator_uid bigint(20),live_active_flag int,etl_time datetime,KEY idx_uid (room_creator_uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ with tab_live_sum as( \ select a1.pt_day,a2.creator_uid,a1.anthor_live_time \ from (select a1.pt_day,a1.roomid,sum(live_minute) anthor_live_time \ from (select pt_day,room_id roomid,hour(time_interval)*60+minute(time_interval)+second(time_interval)/60 live_minute from ( \ select pt_day,room_id,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval \ from honeycomb_all_live_history_status \ where pt_day = '{pt_day}') x) a1 \ group by a1.pt_day,a1.roomid) a1 \ left join oss_room_v2 a2 on a1.roomid=a2.id \ where a2.pt_day='{yesterday}') \ select a1.pt_day,a1.creator_uid,case when a1.anthor_live_time>a2.anthor_live_time_per then 1 else 0 end live_active_flag \ from tab_live_sum a1 \ left join (select pt_day,sum(anthor_live_time)/count(distinct creator_uid) anthor_live_time_per \ from tab_live_sum \ group by pt_day) a2 on a1.pt_day=a2.pt_day \ where a1.anthor_live_time>a2.anthor_live_time_per;" \ """.format(pt_day=pt_day, yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d'))).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] room_creator_uid = nrpd[1] live_active_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,room_creator_uid,live_active_flag,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsUserSilentProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_user_silent; \ create table xcs_user_silent(pt_day varchar(10),uid bigint(20),user_silent_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P50506 -uMysqlUser -pMysqlPass -N -e "use jellyfish_server; \ select substr(created_time,1,10) pt_day,uid,1 user_silent_flag \ from silent_user \ where substr(created_time,1,10)='{pt_day}' \ group by substr(created_time,1,10),uid;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] uid = nrpd[1] user_silent_flag = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,user_silent_flag,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsUpperManageProc(pt_day): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ drop table if exists xcs_upper_manage; \ create table xcs_upper_manage(pt_day varchar(10),uid bigint(20),room_id bigint(20),etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) calcData=os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P50506 -uMysqlUser -pMysqlPass -N -e "use jellyfish_server; \ select substr(created_time,1,10) pt_day,uid,room_id \ from room_manager \ where substr(created_time,1,10)='{pt_day}' \ group by substr(created_time,1,10),uid,room_id;" \ """.format(pt_day=pt_day)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values " for nrpd in nrpd_list: pt_day = nrpd[0] uid = nrpd[1] room_id = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (pt_day,uid,room_id,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) # pt_day = '2017-09-19' # CsPayActiveProc(pt_day) # CsViewActiveProc(pt_day) # CsMessageActiveProc(pt_day) # CsGiftActiveProc(pt_day) # CsLiveActiveProc(pt_day) # CsUserSilentProc(pt_day) # CsUpperManageProc(pt_day)
# -*- coding=utf-8 -*- import os from DateCalc import * warnings.filterwarnings("ignore") def getParameterInfo(date_start, date_end): ParameterList = [] AccessKeyId = "AccessKeyId-string" AccessKeySecret = "AccessKeySecret-string" bucket = "TV-hz" endpoint = "oss-cn-hangzhou-internal.aliyuncs.com" server_list = ["sz-121-210", "sz-129-97", "sz-135-45", "sz-76-100", "sz-147-189", "sz-152-217", "sz-153-240", "sz-155-163", "sz-155-212", "sz-155-217", "sz-158-99", "sz-162-5", "sz-168-212", "sz-18-122", "sz-191-14", "sz-192-253", "sz-199-103", "sz-201-3", "sz-21-92", "sz-73-130", "sz-212-123", "sz-214-62", "sz-218-105", "sz-218-165", "sz-233-217", "sz-237-167", "sz-240-45", "sz-25-63", "sz-26-159", "sz-27-218", "sz-27-65", "sz-27-91", "sz-42-99", "sz-54-148", "sz-58-102", "sz-62-113", "sz-97-233", "sz-64-227", "sz-65-64", "sz-68-199", "sz-130-97", "sz-78-112", "sz-135-198", "sz-79-58", "sz-98-72", "sz-159-238", "sz-130-89", "sz-129-70", "sz-131-90"] for server_name in server_list: for date in dateRange(date_start, date_end): year = date[0:4] month = date[5:7] day = date[8:10] ParameterList.append((AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day)) return ParameterList def getParameterInfo2(date_start, date_end): ParameterList = [] AccessKeyId = "AccessKeyId-string" AccessKeySecret = "AccessKeySecret-string" bucket = "TV-hz" endpoint = "oss-cn-hangzhou-internal.aliyuncs.com" server_list = ["sz-217-183"] for server_name in server_list: for date in dateRange(date_start, date_end): year = date[0:4] month = date[5:7] day = date[8:10] ParameterList.append((AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day)) return ParameterList def getOssFile2Hdfs(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day): date = year+"-"+month+"-"+day ossFileName = "JellyFishLogs/"+server_name+"/"+year+"/"+month+"/"+day+"/jellyfish-server/user_credit_operator.log_"+date # ossPath = "http://TV-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossFileName # print ossPath hdfsFilePath = "/tmp/nisj/CreditScore/"+date+"/" hdfsFileName = hdfsFilePath+server_name+"#user_credit_operator_server."+date+".txt" os.system("""source /etc/profile; \ hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (AccessKeyId, AccessKeySecret, bucket, endpoint, ossFileName, hdfsFileName)) def getOssFile2Hdfs2(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day): date = year+"-"+month+"-"+day ossFileName = "JellyFishConsoleLogs/"+server_name+"/"+year+"/"+month+"/"+day+"/user_credit_operator.log_"+date # ossPath = "http://TV-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossFileName # print ossPath hdfsFilePath = "/tmp/nisj/CreditScore/"+date+"/" hdfsFileName = hdfsFilePath+server_name+"#user_credit_operator_console."+date+".txt" os.system("""source /etc/profile; \ hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (AccessKeyId, AccessKeySecret, bucket, endpoint, ossFileName, hdfsFileName)) def downloadFileFromHdfs(year, month, day): date = year+"-"+month+"-"+day for dir_path in os.popen("""source /etc/profile; \ hadoop dfs -ls /tmp/nisj/CreditScore/%s/ | awk -F ' ' '{print $8}'""" % (date)).readlines(): dir_path = dir_path.strip() if len(dir_path) != 0: os.system("""source /etc/profile; \ hadoop dfs -copyToLocal {dir_path} /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/""".format(dir_path=dir_path)) def logLoad2Mysql(year, month, day): date = year+"-"+month+"-"+day os.system("""source /etc/profile; \ awk -F ' INFO : ' '{print $2}' /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/*user_credit_operator*.%s.txt > /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/log_record_%s.txt """ % (date, date)) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \ delete from xcs_log_record where pt_day='{date}'; \ load data local infile '/home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/log_record_{date}.txt' ignore into table xcs_log_record character set utf8 fields terminated by '|' enclosed by '' lines terminated by '\n' (uid,type,subtype,expiredtype,reason); \ update xcs_log_record set pt_day='{date}' where pt_day is null;" """.format(date=date)) def logFromOss2Local(runDay): date_start = runDay date_end = runDay os.system("""source /etc/profile; \ rm -rf /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/*""") for Parameter in getParameterInfo(date_start, date_end): AccessKeyId = Parameter[0] AccessKeySecret = Parameter[1] bucket = Parameter[2] endpoint = Parameter[3] server_name = Parameter[4] year = Parameter[5] month = Parameter[6] day = Parameter[7] getOssFile2Hdfs(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day) for Parameter in getParameterInfo2(date_start, date_end): AccessKeyId = Parameter[0] AccessKeySecret = Parameter[1] bucket = Parameter[2] endpoint = Parameter[3] server_name = Parameter[4] year = Parameter[5] month = Parameter[6] day = Parameter[7] getOssFile2Hdfs2(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day) for date in dateRange(date_start, date_end): year = date[0:4] month = date[5:7] day = date[8:10] downloadFileFromHdfs(year, month, day) logLoad2Mysql(year, month, day) # test code # runDay = '2017-09-19' # logFromOss2Local(runDay)
4、用户基础信息数据准备
# -*- coding=utf-8 -*- import time import os import re from DateCalc import * warnings.filterwarnings("ignore") def CsUserInfoProc(pt_day): yesterday=getYesterDay() DaysXAgo=getDaysXAgo(pt_day) # os.system("""source /etc/profile; \ # /usr/lib/hive-current/bin/hive -e " \ # drop table if exists xcs_user_info; \ # create table xcs_user_info as \ # select uid,nickname,last_login_time \ # from oss_chushou_user_profile \ # where pt_day='{yesterday}' and state=0 and last_login_time between '{DaysXAgo}' and '{pt_day}';" \ # """.format(pt_day=pt_day, yesterday=yesterday, DaysXAgo=DaysXAgo)) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ drop table if exists xcs_user_info; \ CREATE TABLE xcs_user_info ( \ uid bigint(20), \ nickname varchar(2000), \ last_login_time varchar(19), \ etl_time datetime, \ KEY idx_uid (uid) \ ) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ " """) calcData = os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ select uid,nickname,last_login_time \ from oss_chushou_user_profile \ where pt_day='{yesterday}' and state=0 and last_login_time between '{DaysXAgo}' and '{pt_day}';" \ """.format(pt_day=pt_day, yesterday=yesterday, DaysXAgo=DaysXAgo)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values " for nrpd in nrpd_list: uid = nrpd[0] nickname = str(nrpd[1]).replace("""`""", """""").replace("""\'""", """""").replace("""\"""", """""").replace("""\\""", """""") last_login_time = nrpd[2] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s','%s')," % (uid,nickname,last_login_time,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) def CsRoomPartInfo(pt_day): yesterday=getYesterDay() os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ drop table if exists xcs_room_part_info; \ CREATE TABLE xcs_room_part_info ( \ room_id bigint(20), \ subcriber_count bigint(20), \ etl_time datetime, \ KEY idx_uid (room_id) \ ) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ ;" """) roomPartData = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use funnyai_data; \ select room_id from xcs_upper_manage where pt_day='{pt_day}' group by room_id;" """.format(pt_day=pt_day)).readlines(); roomIdCon = '' for roomId in roomPartData: roomIdCon = roomId.replace('\n', '') + ',' +roomIdCon roomIdCon = roomIdCon[:-1] calcData = os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ select id,subcriber_count \ from oss_room_v2 \ where pt_day='{yesterday}' and id in({roomIdCon}) \ ;" \ """.format(yesterday=yesterday, roomIdCon=roomIdCon)).readlines(); nrpd_list = [] for nrp_list in calcData: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) i = 0 insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values " for nrpd in nrpd_list: room_id = nrpd[0] subcriber_count = nrpd[1] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) i += 1 insert_sql_text = insert_sql_text + "( '%s','%s','%s')," % (room_id,subcriber_count,etl_time) if (i % 1000 == 0): insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values " insert_sql_text = insert_sql_text[0:-1] + ";" os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ {insert_sql_text} \ " """.format(insert_sql_text=insert_sql_text)) # pt_day = '2017-09-19' # CsUserInfoProc(pt_day) # CsRoomPartInfo(pt_day)
# -*- coding=utf-8 -*- import os from DateCalc import * warnings.filterwarnings("ignore") def CsResultTabCreate(): os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ drop table if exists xcs_user_credit_score; \ CREATE TABLE xcs_user_credit_score ( \ pt_day varchar(10), \ uid bigint(20) DEFAULT NULL, \ nickname varchar(2000) DEFAULT NULL, \ last_login_time varchar(19) DEFAULT NULL, \ seqing_score int(10) NOT NULL DEFAULT '0', \ jubao_score int(10) DEFAULT NULL, \ chengpaopao_score int(10) NOT NULL DEFAULT '0', \ weifan_score int(10) DEFAULT NULL, \ pay_active_score int(10) NOT NULL DEFAULT '0', \ view_active_score int(10) NOT NULL DEFAULT '0', \ message_active_score int(10) NOT NULL DEFAULT '0', \ gift_active_score int(10) NOT NULL DEFAULT '0', \ live_active_score int(10) NOT NULL DEFAULT '0', \ upper_manage_score decimal(48,4) DEFAULT NULL, \ user_silent_score int(10) NOT NULL DEFAULT '0', \ user_currday_increment_score decimal(48,4) DEFAULT NULL, \ user_currday_score decimal(48,4) DEFAULT '0', \ etl_time datetime DEFAULT CURRENT_TIMESTAMP, \ KEY idx_pt_day_uid (pt_day,uid) \ ) ENGINE=InnoDB DEFAULT CHARSET=utf8;" """) def CsResultDataInsertAndUpdate(pt_day): Days1Ago=getDays1Ago(pt_day) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ delete from xcs_user_credit_score where pt_day='{pt_day}'; insert into xcs_user_credit_score(pt_day,uid,nickname,last_login_time,seqing_score,jubao_score,chengpaopao_score,weifan_score,pay_active_score,view_active_score,message_active_score,gift_active_score,live_active_score,upper_manage_score,user_silent_score,user_currday_increment_score) \ select '{pt_day}' pt_day,uid,nickname,last_login_time,seqing_score,jubao_score,chengpaopao_score,weifan_score,pay_active_score,view_active_score,message_active_score,gift_active_score,live_active_score,upper_manage_score,user_silent_score, \ seqing_score+jubao_score+chengpaopao_score+weifan_score+pay_active_score+view_active_score+message_active_score+gift_active_score+live_active_score+upper_manage_score+user_silent_score user_currday_increment_score \ from (select a1.uid,a1.nickname,a1.last_login_time, \ case when a2.uid is not null then -10000 else 0 end seqing_score, \ case when a3.uid is not null then -50*a3.jubao_cnt else 0 end jubao_score, \ case when a4.uid is not null then -2000 else 0 end chengpaopao_score, \ case when a5.uid is not null then a5.weifan_score else 0 end weifan_score, \ case when a6.uid is not null then 50 else 0 end pay_active_score, \ case when a7.uid is not null then 20 else 0 end view_active_score, \ case when a8.uid is not null then 10 else 0 end message_active_score, \ case when a9.room_creator_uid is not null then 30 else 0 end gift_active_score, \ case when a10.room_creator_uid is not null then 20 else 0 end live_active_score, \ case when a11.uid is not null then a11.upper_manage_score else 0 end upper_manage_score, \ case when a12.uid is not null then -300 else 0 end user_silent_score \ from xcs_user_info a1 \ left join (select uid from xcs_log_record where pt_day='{pt_day}' and type=1 group by uid) a2 on a1.uid=a2.uid \ left join (select uid,count(*) jubao_cnt from xcs_log_record where pt_day='{pt_day}' and type=2 group by uid) a3 on a1.uid=a3.uid \ left join (select uid from xcs_log_record where pt_day='{pt_day}' and type=3 group by uid) a4 on a1.uid=a4.uid \ left join (select uid,sum(case when subtype in(1,2) then -50 when subtype in(5,6) then -500 when subtype=0 then -2000 else 0 end) weifan_score from xcs_log_record where pt_day='{pt_day}' and type=4 group by uid) a5 on a1.uid=a5.uid \ left join xcs_pay_active a6 on a1.uid=a6.uid \ left join xcs_view_active a7 on a1.uid=a7.uid \ left join xcs_message_active a8 on a1.uid=a8.uid \ left join xcs_gift_active a9 on a1.uid=a9.room_creator_uid \ left join xcs_live_active a10 on a1.uid=a10.room_creator_uid \ left join (select a11_1.uid,sum(a11_2.subcriber_count/1000) upper_manage_score \ from xcs_upper_manage a11_1 \ left join xcs_room_part_info a11_2 on a11_1.room_id=a11_2.room_id \ group by a11_1.uid) a11 on a1.uid=a11.uid \ left join xcs_user_silent a12 on a1.uid=a12.uid) x \ ;" """.format(pt_day=pt_day)) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ update xcs_user_credit_score a1,xcs_user_credit_score a2 \ set a1.user_currday_score=ifnull(a1.user_currday_increment_score+a2.user_currday_score,0) \ where a1.pt_day='{pt_day}' and a2.pt_day='{Days1Ago}' \ and a1.uid=a2.uid;" """.format(pt_day=pt_day, Days1Ago=Days1Ago)) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ update xcs_user_credit_score a1,xcs_user_credit_score a2 \ set a1.user_currday_score=ifnull((case when a2.user_currday_score-10>0 then a2.user_currday_score-10 else 0 end),0) \ where a1.pt_day='{pt_day}' and a2.pt_day='{Days1Ago}' \ and a1.uid=a2.uid \ and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0 and a1.upper_manage_score=0 and a1.pay_active_score=0 and a1.view_active_score=0 and a1.message_active_score=0 and a1.gift_active_score=0 and a1.live_active_score=0) \ and a2.user_currday_score>0 \ ;" """.format(pt_day=pt_day, Days1Ago=Days1Ago)) os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \ update xcs_user_credit_score a1,xcs_user_credit_score a2 \ set a1.user_currday_score=ifnull((case when a1.user_currday_score+10<0 then a1.user_currday_score+10 else 0 end),0) \ where a1.pt_day='{pt_day}' and a2.pt_day='{Days1Ago}' \ and a1.uid=a2.uid \ and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0) \ and a1.user_currday_score<0 \ ;" """.format(pt_day=pt_day, Days1Ago=Days1Ago)) # pt_day = '2017-09-19' # CsResultDataInsertAndUpdate(pt_day)
# -*- coding=utf-8 -*- from tabBasicDataProc import * from logBasicDataProc import * from userBasicDataProc import * from resultDataProc import * warnings.filterwarnings("ignore") def CreditScore_ctl(pt_day): # 表数据装载 CsPayActiveProc(pt_day) CsViewActiveProc(pt_day) CsMessageActiveProc(pt_day) CsGiftActiveProc(pt_day) CsLiveActiveProc(pt_day) CsUserSilentProc(pt_day) CsUpperManageProc(pt_day) # 打点日志数据装载 logFromOss2Local(runDay=pt_day) # 用户及房间基础信息装载 CsUserInfoProc(pt_day) CsRoomPartInfo(pt_day) # 最终数据的计算与更新 # CsResultTabCreate() CsResultDataInsertAndUpdate(pt_day) # Batch Code pt_day = getYesterDay() CreditScore_ctl(pt_day) # pt_day_list=['2017-09-21', '2017-09-22', '2017-09-23'] # for pt_day in pt_day_list: # print pt_day # CreditScore_ctl(pt_day)
[hadoop@emr-worker-9 CreditScore]$ crontab -l 10 13 * * * python /home/hadoop/nisj/automationDemand/CreditScore/CreditScore_Ctl.py >> /home/hadoop/nisj/automationDemand/CreditScore/CreditScore_Ctl.log 2>&1