需求如题,获取 Splunk 数据,防止数据量过大导致导出数据时超时,按照分批次导出保存到本地文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from datetime import datetime, timedelta
import sys,io
import splunklib.client as client
import splunklib.results as results
import os

# _create_unverified_https_context = ssl._create_unverified_context
# ssl._create_default_https_context = _create_unverified_https_context
reload(sys)
sys.setdefaultencoding('utf8')

# splunk客户端
# option:数据分类标识
class ConnectPhoenix:
def __init__(self,option):
self.HOST = "192.6.66.6"
self.PORT = 8089
self.USERNAME = "admin"
self.PASSWORD = "666666"
self.option = option

def phoenixService(self):
phoenix_service = client.connect(
host=self.HOST,
port=self.PORT,
username=self.USERNAME,
password=self.PASSWORD,
verify=False,
app="search")
return phoenix_service

# 获取查询SPL
def get_query(self):
if self.option == 'original':
return 'search index=* | table _time,_raw'

# 获取查询结果
# period:起始时间
# delay:终止时间
def get_results(self, period, delay):
period = period.strftime('%Y-%m-%dT%H:%M:%S')
delay = delay.strftime('%Y-%m-%dT%H:%M:%S')
query = self.get_query()
kwargs = {'earliest_time': period, 'latest_time': delay}
phoenix_service = self.phoenixService()
phoenix_jobs = phoenix_service.jobs
job = phoenix_jobs.export(query, **kwargs)
query_results = results.ResultsReader(io.BufferedReader(job))
return query_results

# 针对查询结果做ETL
# log:查询结果
# option:数据分类标识
class FormatLog:
def __init__(self,log,option):
self.log = log
self.option = option

def format_log(self):
if self.option == 'original':
logdir = self.log['_raw']

return str(self.log)


# 保存数据到本地文件,按 月、日、时 分批
# option:任务类型
# period:查询起始时间
# delay:查询终止时间
# date_type:文件保存方式,月、日、时
# file_path:文件保存路径
class Savefile:
def __init__(self, option, period, delay, date_type, file_path):
self.option = option
self.period = period
self.delay = delay
self.date_type = date_type
self.file_path = file_path

# 保存数据到本地
# time_mark:时间标记,记录本批次查询的起始时间,同时也是文件名
def save_file(self, time_mark):
phoenix_server = ConnectPhoenix(self.option)
query_results = phoenix_server.get_results(time_mark, self.delay)
if self.date_type == 'day':
path = self.file_path + '/' + time_mark.strftime('%Y-%m')
if not os.path.exists(path):
os.makedirs(path)
f = io.open(path + '/' + time_mark.strftime('%Y-%m-%d') + '.log', 'a+', encoding='utf-8')
for result in query_results:
if isinstance(result, results.Message):
pass
else:
formatLog = FormatLog(result, self.option)
logdic = formatLog.format_log()
f.write(logdic.decode('utf-8') + '\n')
f.close()
elif self.date_type == 'month':
path = self.file_path + '/' + time_mark.strftime('%Y')
if not os.path.exists(path):
os.makedirs(path)
f = io.open(path + '/' + time_mark.strftime('%Y-%m') + '.log', 'a+', encoding='utf-8')
for result in query_results:
if isinstance(result, results.Message):
pass
else:
formatLog = FormatLog(result, self.option)
logdic = formatLog.format_log()
f.write(logdic.decode('utf-8') + '\n')
f.close()
elif self.date_type == 'hour':
path = self.file_path + '/' + time_mark.strftime('%Y-%m-%d')
if not os.path.exists(path):
os.makedirs(path)
f = io.open(path + '/' + time_mark.strftime('%Y-%m-%d %H%M%S') + '.log', 'a+', encoding='utf-8')
for result in query_results:
if isinstance(result, results.Message):
pass
else:
formatLog = FormatLog(result, self.option)
logdic = formatLog.format_log()
f.write(logdic.decode('utf-8') + '\n')
f.close()

# 对日期进行分割,按时间分批次查询数据保存到本地
def segment_date(self):
self.period = datetime.strptime(self.period, '%Y-%m-%dT%H:%M:%S')
self.delay = datetime.strptime(self.delay, '%Y-%m-%dT%H:%M:%S')
if self.date_type == 'day':
time_mark = self.delay.replace(hour=0, minute=0, second=0)
while True:
if time_mark - self.period < timedelta(days=0):
self.save_file(self.period)
print(str(self.period) + '|' + str(self.delay))
break
else:
self.save_file(time_mark)
print(str(time_mark) + '|' + str(self.delay))
self.delay = time_mark
time_mark = time_mark - timedelta(days=1)
elif self.date_type == 'month':
time_mark = self.delay.replace(day=1, hour=0, minute=0, second=0)
while True:
if time_mark - self.period < timedelta(days=0):
self.save_file(self.period)
print(str(self.period) + '|' + str(self.delay))
break
else:
self.save_file(time_mark)
print(str(time_mark) + '|' + str(self.delay))
self.delay = time_mark
time_mark = (time_mark - timedelta(days=1)).replace(day=1, hour=0, minute=0, second=0)
elif self.date_type == 'hour':
time_mark = self.delay.replace(minute=0, second=0)
while True:
if time_mark - self.period < timedelta(hours=0):
self.save_file(self.period)
print(str(self.period) + '|' + str(self.delay))
break
else:
self.save_file(time_mark)
print(str(time_mark) + '|' + str(self.delay))
self.delay = time_mark
time_mark = time_mark - timedelta(hours=1)


if __name__=='__main__':
# get args from cron cmd
option = sys.argv[1] if len(sys.argv) > 1 else 'original' # 任务名,默认original
period = sys.argv[2] if len(sys.argv) > 2 else '2023-06-01T12:00:00' # 查询起始时间,格式: 2023-06-01T12:00:00
delay = sys.argv[3] if len(sys.argv) > 3 else '2023-06-01T13:00:00' # 查询终止时间,格式: 2023-06-01T13:00:00
date_type = sys.argv[4] if len(sys.argv) > 4 else 'day' # 目的地址,默认day,可选:month、day、hour
file_path = int(sys.argv[5]) if len(sys.argv) > 5 else '/data' # 日志存放目录,默认/data

savefile = Savefile(option, period, delay, date_type, file_path)
savefile.segment_date()