ping 命令是我们检查网络中最常用的命令,作为网络人员,基本上每天都会用到,可以很好地帮助我们分析和判定网络故障;
如果有 10 设备,100 台设备,1000 台设备怎么办?一个个 ping 过去人都要疯掉了,这种情况在大型网络中我们有可能遇到,那怎么办呢?我们今天来看下如何用 python 来实现批量 ping 测试主机。
代码如下:
#!/usr/bin/python3
# -*- coding: utf-8 -*-

import
 os

import
 argparse

import
 socket

import
 struct

import
 select

import
 time



ICMP_ECHO_REQUEST = 
8# Platform specific
DEFAULT_TIMEOUT = 
0.1
DEFAULT_COUNT = 
4


classPinger(object):
""" Pings to a host -- the Pythonic way"""

def__init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
        self.target_host = target_host

        self.count = count

        self.timeout = timeout



defdo_checksum(self, source_string):
"""  Verify the packet integritity """
        sum = 
0
        max_count = (len(source_string)/
2
)*
2
        count = 
0
while
 count < max_count:


            val = source_string[count + 
1
]*
256
 + source_string[count]

            sum = sum + val

            sum = sum & 
0xffffffff
            count = count + 
2

if
 max_count<len(source_string):

            sum = sum + ord(source_string[len(source_string) - 
1
])

            sum = sum & 
0xffffffff

        sum = (sum >> 
16
)  +  (sum & 
0xffff
)

        sum = sum + (sum >> 
16
)

        answer = ~sum

        answer = answer & 
0xffff
        answer = answer >> 
8
 | (answer << 
8
 & 
0xff00
)

return
 answer


defreceive_pong(self, sock, ID, timeout):
"""

        Receive ping from the socket.

        """

        time_remaining = timeout

whileTrue
:

            start_time = time.time()

            readable = select.select([sock], [], [], time_remaining)

            time_spent = (time.time() - start_time)

if
 readable[
0
] == []: 
# Timeout
return

            time_received = time.time()

            recv_packet, addr = sock.recvfrom(
1024
)

            icmp_header = recv_packet[
20
:
28
]

            type, code, checksum, packet_ID, sequence = struct.unpack(

"bbHHh"
, icmp_header

   )

if
 packet_ID == ID:

                bytes_In_double = struct.calcsize(
"d"
)

                time_sent = struct.unpack(
"d"
, recv_packet[
28
:
28
 + bytes_In_double])[
0
]

return
 time_received - time_sent


            time_remaining = time_remaining - time_spent

if
 time_remaining <= 
0
:

return


defsend_ping(self, sock,  ID):
"""

        Send ping to the target host

        """

        target_addr  =  socket.gethostbyname(self.target_host)


        my_checksum = 
0

# Create a dummy heder with a 0 checksum.
        header = struct.pack(
"bbHHh"
, ICMP_ECHO_REQUEST, 
0
, my_checksum, ID, 
1
)

        bytes_In_double = struct.calcsize(
"d"
)

        data = (
192
 - bytes_In_double) * 
"Q"
        data = struct.pack(
"d"
, time.time()) + bytes(data.encode(
'utf-8'
))


# Get the checksum on the data and the dummy header.
        my_checksum = self.do_checksum(header + data)

        header = struct.pack(

"bbHHh"
, ICMP_ECHO_REQUEST, 
0
, socket.htons(my_checksum), ID, 
1
  )

        packet = header + data

        sock.sendto(packet, (target_addr, 
1
))



defping_once(self):
"""

        Returns the delay (in seconds) or none on timeout.

        """

        icmp = socket.getprotobyname(
"icmp"
)

try
:

            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)

except
 socket.error 
as
 e:

if
 e.errno == 
1
:

# Not superuser, so operation not permitted
                e.msg +=  
"ICMP messages can only be sent from root user processes"
raise
 socket.error(e.msg)

except
 Exception 
as
 e:

            print(
"Exception: %s"
 %(e))


        my_ID = os.getpid() & 
0xFFFF

        self.send_ping(sock, my_ID)

        delay = self.receive_pong(sock, my_ID, self.timeout)

        sock.close()

return
 delay



defping(self):
"""

        Run the ping process

        """

for
 i 
in
 range(self.count):

print
 (
"Ping to %s..."
 % self.target_host,)

try
:

                delay  =  self.ping_once()

except
 socket.gaierror 
as
 e:

print
 (
"Ping failed. (socket error: '%s')"
 % e[
1
])

break

if
 delay  ==  
None
:

print
 (
"Ping failed. (timeout within %ssec.)"
 % self.timeout)

else
:

                delay = delay * 
1000
                print(
"Get pong in %0.4fms"
 % delay)



if
 __name__ == 
'__main__'
:

    alive = []

    host_prefix = 
'192.168.242.'
for
 i 
in
 range(
1
255
):

        host = host_prefix + str(i)

        pinger = Pinger(target_host=host)


        delay = pinger.ping_once()

if
 delay == 
None
:

            print(
"Ping %s 失败,超时2秒"
 % host)

else
:

            print(
"ping %s = %s ms"
 % (host, round(delay * 
1000
4
)))

            alive.append(host)

# time.sleep(0.5)
测试如下:
原文链接:www.yjsec.com/2020/11/07
文章转载:Pyton编程学习圈
(版权归原作者所有,侵删)

点击下方“阅读原文”查看更多
继续阅读
阅读原文