Jul
19
2022
python 批量ping
文件 ip_List.txt
10.210.40.0/24 10.210.41.0/24 10.210.42.0/24 10.210.43.0/24 10.210.44.0/24 10.210.45.0/24 10.210.46.0/24 10.210.47.0/24 10.210.48.0/24 10.210.49.0/24 10.210.50.0/24 10.210.51.0/24 10.210.52.0/24 10.210.53.0/24 10.210.54.0/24 10.210.55.0/24
python 脚本
#!/usr/bin/env python3 # -*- Coding:UTF-8 -*- from multiprocessing import freeze_support from multiprocessing.pool import ThreadPool from datetime import datetime import subprocess, ipaddress, openpyxl, threading, logging, re, os, sys from openpyxl import Workbook # ip网段文本路径(当前目录下) IP_INFO_PATH = 'ip_List.txt' #超时时间 Timeout = 5 # 线程数() THREADING_NUM = 50 #发送包的数量 Packconut = 5 # 进程池 pool = ThreadPool(THREADING_NUM) # 线程锁 queueLock = threading.Lock() # 打印消息 def show_info(msg): queueLock.acquire() # print(msg) sys.stdout.write('\r' + msg) sys.stdout.flush() queueLock.release() # 读取文本,获取ip网段信息 def get_ips_info(): ips=[] try: with open(IP_INFO_PATH, 'r') as f: for line in f.readlines(): # 去掉前后空白 line = line.strip() # 忽略空格行,len=1 if ( len(line) == 1 or line.startswith('#')): continue else: ips.append(line) except fileNotFoundError as e: show_info('Can not find "{}"'.format(IP_INFO_PATH)) except IndexError as e: show_info('"{}" format error'.format(IP_INFO_PATH)) return ips def SinglePing(target_ip): """ 单机Ping测试 """ res = '' if sys.platform.lower() == 'linux': res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE) res_out = str(res.stdout.decode('gbk')) elif sys.platform.lower() == 'win32': res = subprocess.call(['ping', '-n', str(Packconut), '-w', '1000', target_ip ], stdout = subprocess.PIPE) else: show_info('不支持该平台系统,非常抱歉!') print(f'res状态是: {res}') if res.returncode == 0: show_info('%-20s%-20s' % (target_ip, 'Online')) else: show_info('%-20s%-20s' % (target_ip, 'Offline')) def MultiPing(target_ip): """ 批量Ping测试 """ try: res , res_out = '', '' # 判断系统平台,执行对应命令 if sys.platform.lower() == 'linux': res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE) res_out = str(res.stdout.decode('gbk')) # 判断系统平台,执行对应命令 elif sys.platform.lower() == 'win32': res = subprocess.call(['ping', '-n', '2', str(Packconut), '1000', target_ip ], stdout = subprocess.PIPE) res_out = str(res.stdout.decode('gbk')) else: show_info('不支持该平台系统,非常抱歉!') # print(f'res状态是: {res.returncode}') if res.returncode == 0: result.append([target_ip,'Online']) show_info('%-20s%-20s' % (target_ip, 'Online')) # show_info('%-20s%-20s' % (target_ip, 'Online')) # Ping成功 with open('{}/Online_Ping_result.txt'.format(Savelog), 'a+') as f: f.write('%-20s%-20s' % (target_ip, 'Online ') + '\n') else: result.append([target_ip,'Offline']) show_info('%-20s%-20s' % (target_ip, 'Offline')) # show_info('%-20s%-20s' % (target_ip, 'Offline')) # Ping失败 with open('{}/Offline_Ping_result.txt'.format(Savelog), 'a+') as f: f.write('%-20s%-20s' % (target_ip, 'Offline') + '\n') # Ping结果记录 with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f: if res_out!='': f.write('\n' + res_out) f.write('-' * 50) except Exception as e: print(e) show_info(e) def get_ip_List(ip): """ 获取ip列表 """ ip_List = [] try: temp = ipaddress.ip_network(ip).hosts() for ip in temp: ip_List.append(str(ip)) except Exception as e: pass # print(ip_List) return ip_List if __name__=='__main__': LogTime = datetime.now().strftime('%Y-%m-%d_%H:%M:%S') start_time = datetime.now() freeze_support() print('---------- {} ----------'.format(LogTime)) Savelog='./result/{}'.format(LogTime) if(os.path.exists(Savelog) == False): os.makedirs(Savelog) with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f: f.write('----- {} 操作开始 -----\n'.format(datetime.now().strftime('%Y-%m-%d_%H:%M:%S'))) # 单主机Ping # SinglePing('127.0.0.1') # 批量执行Ping ips_List = get_ips_info() print(ips_List) result=[['Ip Address','Ping']] for i,ips in enumerate(ips_List): ip_List = [] if re.match(r'\d+.\d+.\d+.\d+/\d+', ips): ip_List = get_ip_List(ips) else: ip_List.append(ips) for ip in ip_List: pool.apply_async(MultiPing, args=(ip,)) pool.close() pool.join() workbook = Workbook() worksheet = workbook.active worksheet.title = "目标" for target in ips_List: worksheet.append([target]) #写入Excel worksheet = workbook.create_sheet("扫描结果",1) for rs in result: worksheet.append(rs) end_time = datetime.now() m, s = divmod((end_time - start_time).total_seconds(), 60) h, m = divmod(m, 60) end_str='\nAll done.总花费时间 %02d:%02d:%02d' % (h, m, s) with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f: f.write('{}\n----- {} 操作结束 -----'.format(end_str,datetime.now().strftime('%Y-%m-%d_%H:%M:%S'))) print(end_str) result.append([end_str]) for rs in result: worksheet.append(rs) workbook.save('{}/ping_result.xlsx'.format(Savelog)) workbook.close()
最活跃的读者