Jul
19
2022
python 批量ping
文件 ip_List.txt
10.210.40.0/24 10.210.41.0/24 10.210.42.0/24 10.210.43.0/24 10.210.44.0/24 10.210.45.0/24 10.210.46.0/24 10.210.47.0/24 10.210.48.0/24 10.210.49.0/24 10.210.50.0/24 10.210.51.0/24 10.210.52.0/24 10.210.53.0/24 10.210.54.0/24 10.210.55.0/24
python 脚本
#!/usr/bin/env python3
# -*- Coding:UTF-8 -*-
from multiprocessing import freeze_support
from multiprocessing.pool import ThreadPool
from datetime import datetime
import subprocess, ipaddress, openpyxl, threading, logging, re, os, sys
from openpyxl import Workbook
# ip网段文本路径(当前目录下)
IP_INFO_PATH = 'ip_List.txt'
#超时时间
Timeout = 5
# 线程数()
THREADING_NUM = 50
#发送包的数量
Packconut = 5
# 进程池
pool = ThreadPool(THREADING_NUM)
# 线程锁
queueLock = threading.Lock()
# 打印消息
def show_info(msg):
queueLock.acquire()
# print(msg)
sys.stdout.write('\r' + msg)
sys.stdout.flush()
queueLock.release()
# 读取文本,获取ip网段信息
def get_ips_info():
ips=[]
try:
with open(IP_INFO_PATH, 'r') as f:
for line in f.readlines():
# 去掉前后空白
line = line.strip()
# 忽略空格行,len=1
if ( len(line) == 1 or line.startswith('#')):
continue
else:
ips.append(line)
except fileNotFoundError as e:
show_info('Can not find "{}"'.format(IP_INFO_PATH))
except IndexError as e:
show_info('"{}" format error'.format(IP_INFO_PATH))
return ips
def SinglePing(target_ip):
"""
单机Ping测试
"""
res = ''
if sys.platform.lower() == 'linux':
res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE)
res_out = str(res.stdout.decode('gbk'))
elif sys.platform.lower() == 'win32':
res = subprocess.call(['ping', '-n', str(Packconut), '-w', '1000', target_ip ], stdout = subprocess.PIPE)
else:
show_info('不支持该平台系统,非常抱歉!')
print(f'res状态是: {res}')
if res.returncode == 0:
show_info('%-20s%-20s' % (target_ip, 'Online'))
else:
show_info('%-20s%-20s' % (target_ip, 'Offline'))
def MultiPing(target_ip):
"""
批量Ping测试
"""
try:
res , res_out = '', ''
# 判断系统平台,执行对应命令
if sys.platform.lower() == 'linux':
res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE)
res_out = str(res.stdout.decode('gbk'))
# 判断系统平台,执行对应命令
elif sys.platform.lower() == 'win32':
res = subprocess.call(['ping', '-n', '2', str(Packconut), '1000', target_ip ], stdout = subprocess.PIPE)
res_out = str(res.stdout.decode('gbk'))
else:
show_info('不支持该平台系统,非常抱歉!')
# print(f'res状态是: {res.returncode}')
if res.returncode == 0:
result.append([target_ip,'Online'])
show_info('%-20s%-20s' % (target_ip, 'Online'))
# show_info('%-20s%-20s' % (target_ip, 'Online'))
# Ping成功
with open('{}/Online_Ping_result.txt'.format(Savelog), 'a+') as f:
f.write('%-20s%-20s' % (target_ip, 'Online ') + '\n')
else:
result.append([target_ip,'Offline'])
show_info('%-20s%-20s' % (target_ip, 'Offline'))
# show_info('%-20s%-20s' % (target_ip, 'Offline'))
# Ping失败
with open('{}/Offline_Ping_result.txt'.format(Savelog), 'a+') as f:
f.write('%-20s%-20s' % (target_ip, 'Offline') + '\n')
# Ping结果记录
with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
if res_out!='':
f.write('\n' + res_out)
f.write('-' * 50)
except Exception as e:
print(e)
show_info(e)
def get_ip_List(ip):
"""
获取ip列表
"""
ip_List = []
try:
temp = ipaddress.ip_network(ip).hosts()
for ip in temp:
ip_List.append(str(ip))
except Exception as e:
pass
# print(ip_List)
return ip_List
if __name__=='__main__':
LogTime = datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
start_time = datetime.now()
freeze_support()
print('---------- {} ----------'.format(LogTime))
Savelog='./result/{}'.format(LogTime)
if(os.path.exists(Savelog) == False):
os.makedirs(Savelog)
with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
f.write('----- {} 操作开始 -----\n'.format(datetime.now().strftime('%Y-%m-%d_%H:%M:%S')))
# 单主机Ping
# SinglePing('127.0.0.1')
# 批量执行Ping
ips_List = get_ips_info()
print(ips_List)
result=[['Ip Address','Ping']]
for i,ips in enumerate(ips_List):
ip_List = []
if re.match(r'\d+.\d+.\d+.\d+/\d+', ips):
ip_List = get_ip_List(ips)
else:
ip_List.append(ips)
for ip in ip_List:
pool.apply_async(MultiPing, args=(ip,))
pool.close()
pool.join()
workbook = Workbook()
worksheet = workbook.active
worksheet.title = "目标"
for target in ips_List:
worksheet.append([target]) #写入Excel
worksheet = workbook.create_sheet("扫描结果",1)
for rs in result:
worksheet.append(rs)
end_time = datetime.now()
m, s = divmod((end_time - start_time).total_seconds(), 60)
h, m = divmod(m, 60)
end_str='\nAll done.总花费时间 %02d:%02d:%02d' % (h, m, s)
with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
f.write('{}\n----- {} 操作结束 -----'.format(end_str,datetime.now().strftime('%Y-%m-%d_%H:%M:%S')))
print(end_str)
result.append([end_str])
for rs in result:
worksheet.append(rs)
workbook.save('{}/ping_result.xlsx'.format(Savelog))
workbook.close()
微信扫一扫,打赏作者吧~
最活跃的读者