前言

  最近客户方提出了要将日志另外备份一份并进行加密压缩,今天用这篇文章来总结一下。原本的架构是logstash通过udp端口将数据发送到我们的系统,现在客户想在中间将数据取出来做一份备份,并把每天的数据加密压缩。
功能分两个脚本来实现,话不多说,直接进入正题。

脚本一:接收数据并按天保存在本地

  先把数据取出来,在logstash中添加output插件,发送到本地888端口,数据为json格式

1
2
3
4
5
udp {
host => "10.7.2.20"
port => "888"
codec => json
}

  可以在本地用nc监听888端口查看网络是否阻塞,这里就不做演示了。
  之后先编写接收udp端口数据的代码,一步一步来:

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

#创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#绑定本地信息
localaddr = ("192.168.44.112",8888)
udp_socket.bind(localaddr)
#data为接收到的数据,client是发送方的ip,port
data,client = udp_socket.recvfrom(2048)
#打印到控制台,查看是否有数据过来
print(data.decode("utf-8"))
#退出套接字
udp_socket.close()

  收到数据后,发现来一条数据后就断开连接,就在前面加一个循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

#创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#绑定本地信息
localaddr = ("192.168.44.112",8888)
udp_socket.bind(localaddr)
while True:
#data为接收到的数据,client是发送方的ip,port
data,client = udp_socket.recvfrom(2048)
#打印到控制台,查看是否有数据过来
print(data.decode("utf-8"))
#退出套接字
udp_socket.close()

  这样就能一直收到发送方的数据了,然后把数据存在本地文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import socket,os

#创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#绑定本地信息
localaddr = ("192.168.44.112",8888)
udp_socket.bind(localaddr)
while True:
#data为接收到的数据,client是发送方的ip,port
data,client = udp_socket.recvfrom(2048)
#打印到控制台,查看是否有数据过来
#print(data.decode("utf-8"))
#/data/test.txt为存放日志的文件路径,a+表示追加内容
f = open("/data/test.txt","a+")
#收到的数据没有换行,在每一条数据后面添加换行符
f.write(data.decode("utf-8")+ "\n")
f.close()
#退出套接字
udp_socket.close()

  测试之后,完美运行,下面就是完善代码了,前面说过日志要按天来存储,并且日志还分为不同的类型,所以存放路径就分为每种类型日志下面有各自每天的日志备份,日志样例如下

1
{"@timestamp":"2021-03-11T09:24:03.668Z","hostipv6":"fe80::c360:92ba:aede:5c36","hostipv4":"192.168.44.111","ecs":{"version":"1.0.0"},"@version":"1","message":"Mar 11 17:24:01 localhost CROND[104204]: (root) CMD (/usr/local/bin/php /data/rdiweb/public/di_admin.php Autotask >> /tmp/111.log 2>&1 &)","host":{"name":"localhost.localdomain","containerized":false,"os":{"kernel":"3.10.0-693.el7.x86_64","family":"redhat","name":"CentOS Linux","codename":"Core","version":"7 (Core)","platform":"centos"},"ip":["192.168.44.111","fe80::c360:92ba:aede:5c36"],"id":"251f075008fe4214825ef8910e29bf29","mac":["00:0c:29:64:b2:af"],"hostname":"localhost.localdomain","architecture":"x86_64"},"input":{"type":"log"},"agent":{"ephemeral_id":"25d88d8c-994c-49c9-afa3-78867740c45a","version":"7.2.0","type":"filebeat","hostname":"localhost.localdomain","id":"7936b07b-99db-458a-b7ed-3df7e348ef96"},"log":{"file":{"path":"/var/log/cron"},"offset":111834},"appname":"cron","tags":["cron","beats_input_codec_plain_applied"]}

  可以看到从logstash收到的json日志是由tags字段来分日志类型的,比如bashhistory、cron、secure等等日志,而message为原始日志,所以我们只需要取得tags字段来分类型,并把message存在文件里即可,这里我是用正则来提取的值,因为数据的格式不规则,用json解析可能会报错,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import socket,os,re

def main(path):
#创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#绑定本地信息
localaddr = ("192.168.44.112",8888)
udp_socket.bind(localaddr)

#获取原始日志
def getmessage(data):
rule = "\"message\":\"(.*?)\",\""
message = re.findall(rule, data)
return message[0]

#获取日志类型
def gettype(data):
rule = "\"tags\":\[\"(.*?)\","
message = re.findall(rule, data)
return message[0]

while True:
#data为接收到的数据,client是发送方的ip,port
data,client = udp_socket.recvfrom(2048)
#打印到控制台,查看是否有数据过来
#print(data.decode("utf-8"))
#获取当前日期
import time
time = time.strftime('%Y-%m-%d')
#提取日志类型
type = gettype(data)
#提取日志
message = getmessage(data)
#path为存放日志的文件路径,type为日志类型,time为当天日期,a+表示追加内容
f = open("%s/%s/%s.txt"%(path,type,time),"a+")
#收到的数据没有换行,在每一条数据后面添加换行符
f.write(data.decode("utf-8") + "\n")
f.close()
#退出套接字
udp_socket.close()
if __name__ == "__main__":
# 日志存放目录
path = "/data/data"
main(path)

  运行效果如图:

  可以看到日志按照类型放在不同文件夹下,并按日期分为不同的文件,代码到这里就差不多了,但是还差一些需要完善的地方,比如来的是空数据,列表会下标越界,包括后续如果客户方要接入新的日志,找不到对应的路径也会报错,总不能每次都能手动创建日志类型的文件夹吧,话不多说,完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import socket,os,re

def main(path):
#创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#绑定本地信息
localaddr = ("192.168.44.112",8888)
udp_socket.bind(localaddr)

#获取原始日志
def getmessage(data):
rule = "\"message\":\"(.*?)\",\""
message = re.findall(rule, data)
return message[0]

#获取日志类型
def gettype(data):
rule = "\"tags\":\[\"(.*?)\","
message = re.findall(rule, data)
return message[0]

while True:
#data为接收到的数据,client是发送方的ip,port
data,client = udp_socket.recvfrom(2048)
# 判断数据是否为空,为空跳过当前循环,防止下标越界
if data.isspace():
continue
#打印到控制台,查看是否有数据过来
#print(data.decode("utf-8"))
#获取当前日期
import time
time = time.strftime('%Y-%m-%d')
#提取日志类型
type = gettype(data)
#提取日志
message = getmessage(data)
# 判断子目录是否存在,如果不存在即创建
if os.path.exists("%s/%s"%(path,type)) == False:
os.makedirs("%s/%s"%(path,type))
#path为存放日志的文件路径,type为日志类型,time为当天日期,a+表示追加内容
f = open("%s/%s/%s.txt"%(path,type,time),"a+")
#收到的数据没有换行,在每一条数据后面添加换行符
f.write(data.decode("utf-8") + "\n")
f.close()
#退出套接字
udp_socket.close()
if __name__ == "__main__":
# 日志存放目录
path = "/data/data"
main(path)

脚本二:将本地文件进行加密压缩

  上一个脚本已经完美的把日志备份在了本地服务器,那么接下来就是将日志进行加密压缩,这个脚本相对上一个来说还是很简单的。
  用python压缩文件还是很简单的,比较费事的是如何加密压缩,因为pyhon自带的zipfile库不支持加密,所以这里只能使用linux压缩文件的命令行来进行处理了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os
from datetime import date, timedelta

def zipDir(path,outpath,password):
import os
# 一定要有返回值,否则压缩失败;
status = os.popen("zip -jP %s %s %s"%(password,outpath,path))
# 等待子进程完成
os.wait()
if __name__ == "__main__":
# 日志存放路径
path = "/data/secure"
# 压缩密码
password = "123456"
# 获取昨天日期
yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
zipDir("%s/%s.txt" %(path[0],yesterday), "%s/%s.zip" %(path,yesterday), password)

  到这里文件就完美加密压缩了,可是现在的代码只能压缩一种日志类型下的文件,不能将所有日志类型全部压缩,并且压缩后的日志文件没有删除,浪费了服务器的磁盘空间,于是又花费了一些时间将代码完善,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from datetime import date, timedelta

def zipDir(path,outpath,password):
import os
# 一定要有返回值,否则压缩失败;
status = os.popen("zip -jP %s %s %s"%(password,outpath,path))
# 等待子进程完成
os.wait()
# 删除压缩后的文件
os.remove(path)

if __name__ == "__main__":
# 日志存放路径
paths = "/data/data"
# 压缩密码
password = "123456"
num = True
# 循环不同类型日志目录
for path in os.walk(paths):
# 跳过主目录
if num:
num = False
continue
# 获取昨天日期
yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
zipDir("%s/%s.txt" %(path[0],yesterday), "%s/%s.zip" %(path[0],yesterday), password)

  可以看到上面的代码遍历了目录,并且删除了压缩后的日志文件,和上一个脚本一样,哪怕后续加入了新的日志类型,脚本也不需要再改动了,运行效果如下:

  之后把脚本加入到linux任务执行计划里,每天晚上2点压缩前一天的文件即可。
  root用户下执行ctontab -e命令,将下面执行脚本的命令添加在文件末尾即可:

1
00 2 * * * /data/python3/bin/python3 /data/file2zip.py

  /data/python3/bin/python3是python安装的路径,注意这里一定要写绝对路径,否则可能会执行失败,/data/file2zip.py是脚本的路径和文件名,00 2 * * *是指每天2:00运行脚本
  同样上一个脚本也要加入到任务执行计划中,这样服务器挂掉或者重启之后脚本也能重启:

1
*/5 * * * * /data/python3/bin/python3 /data/logstash2file.py

  这里每五分钟执行一次即可。

总结

  这个需求相对来说没到很难的程度,只是要全面考虑到怎么应对不同的生产环境。