获取神策数据保存在hive表


概述

这两天要获取一部分神策的埋点数据保存在hive库,就尝试用pyspark写个简单的脚本获取。请求api用saprk写入hive以下是详细代码

sensor data fetch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

#!/usr/bin/env python
# encoding: utf-8

"""
File: sensor_data_fetch.py
Date: 2020/08/24
submit command:
users表全量获取数据最后的时间参数随便:
spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 3 sensor_data_fetch.py users 2020-08-02
events表增量获取需要填写正确的时间参数:
spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 3 sensor_data_fetch.py events 2020-08-02
"""
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import sys
import datetime
import requests
reload(sys)
sys.setdefaultencoding('utf8')

#from urllib import parse


def get_sensor_data(table,start_date):
url = "http://xxxxxx.com/api/sql/query"
token = "xxxxxxxxxxxxxxxxxxxxx"
project = "xxxxxxx"

sql = "SELECT * FROM " + table + " where date = " + "'" + start_date + "'"

if table == 'users':
sql = "SELECT * FROM " + table
print('sql:'+ sql)
payload = {
"q": sql,
"format": "csv",
"token": token,
"project": project
}
res = requests.get(url=url, params=payload)
return res.content.decode()


def main():
#today = datetime.date.today().strftime("%Y%m%d")
today = datetime.date.today()
one_day = datetime.timedelta(days=1)
yesterday = today - one_day
yesterday_str = yesterday.strftime("%Y%m%d")

table = sys.argv[1]
start_date = sys.argv[2]
print(table)
print(start_date)
# 获取神策数据
result_data = get_sensor_data(table,start_date)

res = result_data.split("\n")
# 得到表头字段list
shc= res[0].replace("$","_").split("\t")
res.pop(0)

data_list = []
for item in res:
if(len(item) > 1):
data_list.append(tuple(item.split("\t")))
# 获取saprksession
spark = SparkSession.builder.appName(
"sensordata").enableHiveSupport().getOrCreate()

hiveTable = 'xxdb.sensor_' + table
tableName = 'sensor_' + table

df = spark.createDataFrame(data_list, shc)

# 判断表是否存在
tableCount = spark.sql('show tables in xxdb').filter("tableName='" + tableName + "'").count()
if tableCount >=1:
print('############表:' + hiveTable + '已存在')
tableColumns = spark.table(hiveTable).columns
#判断表结构是否一致 发现表结构不一致 保存历史数据为一张his表, 删除表后重新创建新表
if len(shc) != len(tableColumns):
print('############表:' + hiveTable + '已存在但是表结构发生了变化')
hiveTableHis = 'xxdb.sensor_' + table + yesterday_str
spark.sql("select * from " + hiveTable).write.format('orc').mode('overwrite').saveAsTable(hiveTableHis)
spark.sql("drop table " + hiveTable)
# 保存写入数据
if table == 'users':
df.write.format('orc').mode('overwrite').saveAsTable(hiveTable)
else:
df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable)
else:
print('############表结构一致')
#判断分区是否存在存在则先删除分区
dfTablePCount=spark.sql("select * from " + hiveTable + " where date = " + "'" + start_date + "' limit 2").count()
if dfTablePCount >1:
dropPartionSql = "alter table " + hiveTable + " drop partition (date= " + "'" + start_date + "'" + ")"
#spark.sql("msck repair table " + hiveTable)
spark.sql(dropPartionSql)
df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable)
else:
print('############表:' + hiveTable + '还未创建')
# 保存写入数据
if table == 'users':
df.write.format('orc').mode('overwrite').saveAsTable(hiveTable)
else:
df.write.format('orc').mode('append').partitionBy('date').saveAsTable(hiveTable)


if __name__ == '__main__':
main()


文章作者: Callable
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Callable !
评论
  目录