python 批量ping

 
更多

文件 ip_List.txt

10.210.40.0/24
10.210.41.0/24
10.210.42.0/24
10.210.43.0/24
10.210.44.0/24
10.210.45.0/24
10.210.46.0/24
10.210.47.0/24
10.210.48.0/24
10.210.49.0/24
10.210.50.0/24
10.210.51.0/24
10.210.52.0/24
10.210.53.0/24
10.210.54.0/24
10.210.55.0/24

python 脚本

#!/usr/bin/env python3
# -*- Coding:UTF-8 -*-
from multiprocessing import freeze_support
from multiprocessing.pool import ThreadPool
from datetime import datetime
import subprocess, ipaddress, openpyxl, threading, logging, re, os, sys
from openpyxl import Workbook

# ip网段文本路径(当前目录下)
IP_INFO_PATH = 'ip_List.txt'
#超时时间
Timeout = 5
# 线程数()
THREADING_NUM = 50
#发送包的数量
Packconut = 5
# 进程池
pool = ThreadPool(THREADING_NUM)
# 线程锁
queueLock = threading.Lock()
# 打印消息
def show_info(msg):
    queueLock.acquire()
    # print(msg)
    sys.stdout.write('\r' + msg)
    sys.stdout.flush()
    queueLock.release()
# 读取文本,获取ip网段信息
def get_ips_info():
    ips=[]
    try:
        with open(IP_INFO_PATH, 'r') as f:
            for line in f.readlines():
                # 去掉前后空白
                line = line.strip()
                # 忽略空格行,len=1
                if ( len(line) == 1 or line.startswith('#')):
                    continue
                else:
                    ips.append(line)
    except fileNotFoundError as e:
        show_info('Can not find "{}"'.format(IP_INFO_PATH))
    except IndexError as e:
        show_info('"{}" format error'.format(IP_INFO_PATH))
    return ips
def SinglePing(target_ip):
    """
    单机Ping测试
    """
    res = ''
    if sys.platform.lower() == 'linux':
        res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE)
        res_out = str(res.stdout.decode('gbk'))
    elif sys.platform.lower() == 'win32':
        res = subprocess.call(['ping', '-n', str(Packconut), '-w', '1000', target_ip ], stdout = subprocess.PIPE)
    else:
        show_info('不支持该平台系统,非常抱歉!')
    print(f'res状态是: {res}')
    if res.returncode == 0:
        show_info('%-20s%-20s' % (target_ip, 'Online'))
    else:
        show_info('%-20s%-20s' % (target_ip, 'Offline'))
def MultiPing(target_ip):
    """
    批量Ping测试
    """
    try:
        res , res_out  = '', ''
        # 判断系统平台,执行对应命令
        if sys.platform.lower() == 'linux': 
            res = subprocess.run(['timeout', '{}s'.format(Timeout), 'ping', '-c', str(Packconut), '-W', '1000', target_ip], stdout=subprocess.PIPE)
            res_out = str(res.stdout.decode('gbk'))
        # 判断系统平台,执行对应命令
        elif sys.platform.lower() == 'win32':
            res = subprocess.call(['ping', '-n', '2', str(Packconut), '1000', target_ip ], stdout = subprocess.PIPE)
            res_out = str(res.stdout.decode('gbk'))
        else:
            show_info('不支持该平台系统,非常抱歉!')
        # print(f'res状态是: {res.returncode}')
        if res.returncode == 0:
            result.append([target_ip,'Online'])
            show_info('%-20s%-20s' % (target_ip, 'Online'))
            # show_info('%-20s%-20s' % (target_ip, 'Online'))
            # Ping成功
            with open('{}/Online_Ping_result.txt'.format(Savelog), 'a+') as f:
                f.write('%-20s%-20s' % (target_ip, 'Online ') + '\n')
        else:
            result.append([target_ip,'Offline'])
            show_info('%-20s%-20s' % (target_ip, 'Offline'))
            # show_info('%-20s%-20s' % (target_ip, 'Offline'))
            # Ping失败
            with open('{}/Offline_Ping_result.txt'.format(Savelog), 'a+') as f:
                f.write('%-20s%-20s' % (target_ip, 'Offline') + '\n')

        # Ping结果记录
        with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
            if res_out!='':
                f.write('\n' + res_out)
                f.write('-' * 50)
    except Exception as e:
        print(e)
        show_info(e)
def get_ip_List(ip):
    """
    获取ip列表
    """
    ip_List = []
    try:
        temp = ipaddress.ip_network(ip).hosts()
        for ip in temp:
            ip_List.append(str(ip))
    except Exception as e:
        pass
    # print(ip_List)
    return ip_List


if __name__=='__main__':
    LogTime = datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
    start_time = datetime.now()
    freeze_support() 
    print('---------- {} ----------'.format(LogTime))
    Savelog='./result/{}'.format(LogTime)
    if(os.path.exists(Savelog) == False):
        os.makedirs(Savelog)
    with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
        f.write('----- {} 操作开始 -----\n'.format(datetime.now().strftime('%Y-%m-%d_%H:%M:%S')))
    # 单主机Ping
    # SinglePing('127.0.0.1')
    # 批量执行Ping
    ips_List = get_ips_info()
    print(ips_List)
    result=[['Ip Address','Ping']]
    for i,ips in enumerate(ips_List):
        ip_List = []
        if re.match(r'\d+.\d+.\d+.\d+/\d+', ips):
            ip_List = get_ip_List(ips)
        else:
            ip_List.append(ips)
        for ip in ip_List:
            pool.apply_async(MultiPing, args=(ip,))
    pool.close()
    pool.join()
    workbook   = Workbook()
    worksheet  = workbook.active
    worksheet.title = "目标"
    for target in ips_List:
        worksheet.append([target]) #写入Excel
    worksheet = workbook.create_sheet("扫描结果",1)
    for rs in result:
        worksheet.append(rs)
    end_time = datetime.now()
    m, s = divmod((end_time - start_time).total_seconds(), 60)
    h, m = divmod(m, 60)
    end_str='\nAll done.总花费时间 %02d:%02d:%02d' % (h, m, s)
    with open('{}/record_Ping.txt'.format(Savelog), 'a+') as f:
        f.write('{}\n----- {} 操作结束 -----'.format(end_str,datetime.now().strftime('%Y-%m-%d_%H:%M:%S')))
    print(end_str)
    result.append([end_str])
    for rs in result:
        worksheet.append(rs)
    workbook.save('{}/ping_result.xlsx'.format(Savelog))
    workbook.close() 
打赏

本文固定链接: https://www.cxy163.net/archives/2053 | 绝缘体

该日志由 绝缘体.. 于 2022年07月19日 发表在 首页 分类下,
原创文章转载请注明: python 批量ping | 绝缘体

报歉!评论已关闭.