概述
smokeping 是一个开源免费的监控网络性能的工具,功能例如 ping、dig、echoping、curl 、dns等,绘制图形使用了 RRDtools 。
本文使用 docker 部署,操作比较简单,最主要的是收集全国各运营商的监测 IP ,以及维护这些 IP 的可用性,这里使用 Python 检测测试 IP 的可用性,并自动生成配置文件。
Smokeping 应用部署与配置
-
创建数据目录
# 创建存放smokeping数据的目录mkdir -p /data/smokeping/config mkdir -p /data/smokeping/data
-
启动容器
docker create --name=smokeping -e PUID=1000 -e PGID=1000 -e TZ=Asia/Shanghai -p 80:80 -v /path/to/smokeping/config:/config -v /path/to/smokeping/data:/data --restart unless-stopped linuxserver/smokeping
-
修改检测时间间隔
这部分是修改 RRD 数据库的配置
vim /data/smokeping/config/Database*** Database ***step = 120 pings = 20# 表示每 120 秒执行 20 次ping操作
-
修改 Targets 文件
Targets 是 ping 测试的目标对象,日常修改主要是这个,我们这里通过 Python 检测测试 IP 的可用性,并自动生成配置文件
Python 处理 Targets 文件
-
收集测试 IP
我主要是从 ipip.net收集的
-
收集全国电信、联通、移动的测试 IP
-
收集国外的测试 IP
-
-
将收集好的 IP 放到数据库
我用的是 PostgreSQL ,新建两张表,分别是 ip_test、ip_test_international
表结构如下图所示
-
收集完成后,记得备份 table,可使用如下 SQL 语句
create table ip_test as table backup_ip_test;create table ip_test_international as table backup_ip_test_international;
代码如下
将对代码分段解释其作用
使用到的模块的解释
# 导入所需的模块
import os
import smtplib
import datetime
import psycopg2
from pypinyin import lazy_pinyin
from email.utils import formataddr
from email.mime.text import MIMEText
-
-
OS:执行 ping 操作
-
smtplib、email 相关模块:发送邮件使用
-
psycopg2:操作 PostgreSQL 数据库
-
Pypinyin:将汉字转换为拼音
-
定义一个邮件的列表
mail_down_ip_body = []
操作IP数据库,测试不通的设置为 is down
def ping_ip():
# 用 os.system ping 太慢了,可以优化一下,使用 python 封装 icmp,并使用多线程
conn = psycopg2.connect(dbname="数据库名称", user="数据库用户名", password="数据库密码", host="填写数据库地址", port="5432")
cursor = conn.cursor()
# 为节省时间,这里就不合并到一起了
cursor.execute("select china_unicom from ip_test;")
ip_list_unicom = cursor.fetchall()
for i in ip_list_unicom:
response = os.system('ping -c 8 ' + i[0])
if response == 0:
print(i[0], 'is up')
else:
print(i[0], 'is down')
cursor.execute("select state, city from ip_test where china_unicom = (%s) and china_unicom != 'is down';",
(i[0],))
china_unicom_down_name = cursor.fetchall()
for z in china_unicom_down_name:
mail_down_ip_body.append(('联通', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))
cursor.execute("update ip_test set china_unicom = 'is down' where china_unicom = (%s);",
(i[0],))
cursor.execute("select china_mobile from ip_test;")
ip_list_mobile = cursor.fetchall()
for i in ip_list_mobile:
response = os.system('ping -c 8 ' + i[0])
if response == 0:
print(i[0], 'is up')
else:
print(i[0], 'is down')
cursor.execute("select state, city from ip_test where china_mobile = (%s) and china_mobile != 'is down';",
(i[0],))
china_mobile_down_name = cursor.fetchall()
for z in china_mobile_down_name:
mail_down_ip_body.append(('移动', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))
cursor.execute("update ip_test set china_mobile = 'is down' where china_mobile = (%s);",
(i[0],))
cursor.execute("select china_telecom from ip_test;")
ip_list_telecom = cursor.fetchall()
for i in ip_list_telecom:
response = os.system('ping -c 8 ' + i[0])
if response == 0:
print(i[0], 'is up')
else:
print(i[0], 'is down')
cursor.execute("select state, city from ip_test where china_telecom = (%s) and china_telecom != 'is down';",
(i[0],))
china_telecom_down_name = cursor.fetchall()
for z in china_telecom_down_name:
mail_down_ip_body.append(('电信', z[0].rstrip() + z[1].rstrip(), i[0].rstrip()))
cursor.execute("update ip_test set china_telecom = 'is down' where china_telecom = (%s);",
(i[0],))
cursor.execute("select ip from ip_test_international;")
ip_list_int = cursor.fetchall()
for i in ip_list_int:
response = os.system('ping -c 8 ' + i[0])
if response == 0:
print(i[0], 'is up')
else:
print(i[0], 'is down')
cursor.execute("select name from ip_test_international where ip = (%s) and ip != 'is down';",
(i[0],))
ip_int_down_name = cursor.fetchall()
try:
mail_down_ip_body.append((ip_int_down_name[0][0], i[0]))
except IndexError:
pass
cursor.execute("update ip_test_international set ip = 'is down' where ip = (%s);",
(i[0],))
conn.commit()
cursor.close()
conn.close()
功能解释
-
从数据库取到所有 ip,执行 ping 操作,每个 ip ping 8 个包,如果全部 timeout,将存放 IP 的位置更新为 is down
-
由于原始数据库是从 csv 中导入的,所以有些空格,用了
.rstrip()
处理
从数据库拿到国内和国际的IP测试数据
def get_data():
conn = psycopg2.connect(dbname="数据库名称", user="数据库用户名", password="数据库密码",
host="填写数据库地址", port="5432")
cursor = conn.cursor()
cursor.execute("select state, city, china_telecom, china_unicom, china_mobile, china_edu from ip_test;")
ip_test = cursor.fetchall()
cursor.execute("select name, name_en, ip from ip_test_international;")
ip_test_int = cursor.fetchall()
cursor.close()
conn.close()
return ip_test, ip_test_int
def output_config():
china_telcom = []
china_unicom = []
china_mobile = []
int_ip = []
for i in get_data()[0]:
if 'is down' != (i[2].rstrip()):
china_telcom.append(('+++ dianxin-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '电信', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[2].rstrip(), 'host = ' + i[2].rstrip()))
for i in get_data()[0]:
# print(i[3].rstrip())
if 'is down' != (i[3].rstrip()):
china_unicom.append(('+++ liantong-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '联通', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[3].rstrip(), 'host = ' + i[3].rstrip()))
for i in get_data()[0]:
if 'is down' != (i[4].rstrip()):
china_mobile.append(('+++ yidong-' + ''.join(x[0] for x in lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + str(random.randint(1, 10000)), 'menu = ' + i[0].rstrip().rstrip('市') + i[1].rstrip().rstrip('市') + '移动', 'title = ' + ''.join(lazy_pinyin(i[0].rstrip().rstrip('市'))) + '-' + ''.join(lazy_pinyin(i[1].rstrip().rstrip('市'))) + '-' + i[4].rstrip(), 'host = ' + i[4].rstrip()))
for i in get_data()[1]:
if 'is down' != (i[2]):
int_ip.append(('++ ' + ''.join(i[1]), 'menu = ' + i[0], 'title = ' + i[1] + '-' + i[2], 'host = ' + i[2]))
return china_telcom, china_unicom, china_mobile, int_ip
功能解释
从数据库取到 ping 处理后的数据,处理后输出为如下格式的列表
[(+++ yidong-sd-2637,menu = 山东青岛移动,title = shandong-qingdao-218.201.98.33,host = 218.201.98.33)]
将之前所有获取的信息整理成smokeping所需的”Targets”配置文件
#这里变量设置的比较乱
def finally_target():
with open('ip_target.txt', 'r+') as f:
content = f.read()
f.seek(0, 0)
with open('begin_text', 'r') as file:
f.write(file.read() + 'n' + content)
print(f.read())
with open('ip_target.txt', 'a') as f:
with open('end_text', 'r') as file:
f.write(file.read())
with open('ip_target.txt', 'r+') as f:
aa = f.read()
pos = aa.find('+++ liantong')
cc = aa[:pos] + """
++ liantong #联通
menu = 联通网络监控
title = China Unicom
#host = /Other/liantong/liantong-bj /Other/liantong/liantong-sh /Other/liantong/liantong-gz
""" + 'n' + aa[pos:]
dd = cc.find('+++ yidong')
ee = cc[:dd] + """
++ yidong #移动
menu = 移动网络监控
title = China mobile
""" + 'n' + cc[dd:]
ff = ee.find('++ Tokyo-Japan')
gg = ee[:ff] + """
+ Internet
menu = 国际线路
title = 国际线路
""" + 'n' + ee[ff:]
with open('Targets', 'w') as finally_txt:
finally_txt.write(gg)
功能解释
-
得到之前处理后的数据,将 Target 文件拼装起来
-
Targets 包含了开头和结尾的文件如下
-
开头文件
*** Targets ***
probe = FPing
menu = Top
title = Network Latency Grapher
remark = Welcome to the SmokePing website of WORKS Company.
Here you will learn all about the latency of our network.
Other # 第一层级
menu = 国内线路
title = 国内线路
++ dianxin #电信
menu = 电信网络监控
title = 中国电信
------------
# 结尾文件
+ InternetSites
menu = Internet Sites
title = Internet Sites
++ JupiterBroadcasting
menu = JupiterBroadcasting
title = JupiterBroadcasting
host = jupiterbroadcasting.com
这里省略了一些
def mail():
my_sender = 'houm01@foxmail.com'
my_user = 'houm01@foxmail.com'
my_pass = 'qq邮件授权码'
with open('text.txt', 'w') as f:
for i in mail_down_ip_body:
f.write(str(i).replace('(', '').replace(')', '').replace(''', '').replace(',', ' -- ') + 'n')
with open('text.txt', 'r') as f:
mail_txt_str = f.read()
if len(mail_txt_str) != 0:
mail_text = '''{} 检测到新增 down ip 如下nn{}n已生成Targets文件,请管理员判断是否处理'''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), mail_txt_str)
scan_state = '有新增的down ip,请关注'
else:
mail_text = '''{} 经过检测,没有发现有新增down的IPn
{}
'''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), mail_txt_str)
scan_state = '无新增down ip'
ret = True
try:
msg = MIMEText(mail_text, 'plain', 'utf-8')
msg['From'] = formataddr(["Smokeping 测试", my_sender]) # 括号里的对应发件人邮箱昵称、发件人邮箱账号
msg['To'] = formataddr(["Service", my_user]) # 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg['Subject'] = "Smokeping 测试结果报告 - {} - {}".format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M'), scan_state) # 邮件的主题,也可以说是标题
server = smtplib.SMTP_SSL("smtp.qq.com", 465) # 发件人邮箱中的SMTP服务器,端口是25
server.login(my_sender, my_pass) # 括号中对应的是发件人邮箱账号、邮箱密码
server.sendmail(my_sender, [my_user, ], msg.as_string()) # 括号中对应的是发件人邮箱账号、收件人邮箱账号、发送邮件
server.quit() # 关闭连接
except Exception: # 如果 try 中的语句没有执行,则会执行下面的 ret=False
ret = False
if ret:
print("邮件发送成功")
else:
print("邮件发送失败")
功能解释
将之前处理好的文件,构造成邮件,发出
需要做一个判断,如果检测结果没有新增的不通的 IP,邮件内容要说明无新增 IP
如果有新增 down 的 IP,列出是哪些 IP,并列出 IP 的归属地
if name == 'main':
ping_ip()
with open('ip_target.txt', 'w') as f:
for i in output_config():
for y in i:
for z in y:
f.write(z + 'n')
finally_target()
mail()
功能解释
-
将之前几个函数拼起来
Crontab 调度之前写的脚本
crontab 的调度脚本如下,每天9点45分开始执行脚本
crontab -lSHELL=/bin/bash45 09 * * * cd /data/python_script/smokeping_test/ && python3 ip_test.py
总结
目前脚本还有很多不完善的地方,例如 邮件发送时没有带上附件、排列国际站点时没有对同一国家按顺序排列、ping 检测较慢,没有多进程处理 等问题,还需要持续优化一下.
但基本实现了 Smokeping 最麻烦的步骤,也就是处理维护 Targets 文件的问题.
在一个站点部署后,可以放使用 nginx 将 Target 文件发布出去,其他站点通过 wget 的方式获取.