概述
这两天要获取一部分神策的埋点数据保存在hive库,就尝试用pyspark写个简单的脚本获取。请求api用saprk写入hive以下是详细代码
sensor data fetch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
""" File: sensor_data_fetch.py Date: 2020/08/24 submit command: users表全量获取数据最后的时间参数随便: spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 3 sensor_data_fetch.py users 2020-08-02 events表增量获取需要填写正确的时间参数: spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 3 sensor_data_fetch.py events 2020-08-02 """ from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import sys import datetime import requests reload(sys) sys.setdefaultencoding('utf8')
def get_sensor_data(table,start_date): url = "http://xxxxxx.com/api/sql/query" token = "xxxxxxxxxxxxxxxxxxxxx" project = "xxxxxxx"
sql = "SELECT * FROM " + table + " where date = " + "'" + start_date + "'"
if table == 'users': sql = "SELECT * FROM " + table print('sql:'+ sql) payload = { "q": sql, "format": "csv", "token": token, "project": project } res = requests.get(url=url, params=payload) return res.content.decode()
def main(): today = datetime.date.today() one_day = datetime.timedelta(days=1) yesterday = today - one_day yesterday_str = yesterday.strftime("%Y%m%d")
table = sys.argv[1] start_date = sys.argv[2] print(table) print(start_date) result_data = get_sensor_data(table,start_date)
res = result_data.split("\n") shc= res[0].replace("$","_").split("\t") res.pop(0)
data_list = [] for item in res: if(len(item) > 1): data_list.append(tuple(item.split("\t"))) spark = SparkSession.builder.appName( "sensordata").enableHiveSupport().getOrCreate()
hiveTable = 'xxdb.sensor_' + table tableName = 'sensor_' + table
df = spark.createDataFrame(data_list, shc)
tableCount = spark.sql('show tables in xxdb').filter("tableName='" + tableName + "'").count() if tableCount >=1: print('############表:' + hiveTable + '已存在') tableColumns = spark.table(hiveTable).columns if len(shc) != len(tableColumns): print('############表:' + hiveTable + '已存在但是表结构发生了变化') hiveTableHis = 'xxdb.sensor_' + table + yesterday_str spark.sql("select * from " + hiveTable).write.format('orc').mode('overwrite').saveAsTable(hiveTableHis) spark.sql("drop table " + hiveTable) if table == 'users': df.write.format('orc').mode('overwrite').saveAsTable(hiveTable) else: df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable) else: print('############表结构一致') dfTablePCount=spark.sql("select * from " + hiveTable + " where date = " + "'" + start_date + "' limit 2").count() if dfTablePCount >1: dropPartionSql = "alter table " + hiveTable + " drop partition (date= " + "'" + start_date + "'" + ")" spark.sql(dropPartionSql) df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable) else: print('############表:' + hiveTable + '还未创建') if table == 'users': df.write.format('orc').mode('overwrite').saveAsTable(hiveTable) else: df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable)
if __name__ == '__main__': main()
|