SoFunction
Updated on 2024-11-18

Script sharing for network testing using Python

preamble

Recently a classmate asked me to help write a tool for testing networks. Due to work stuff, I've been dragging my feet off and on for a long time before giving a relatively complete version. Actually, I use Python less, so I basically wrote the program while looking up information.

The main logic of the program is as follows:

Read a list of ip's in an excel file, then use a multithreaded call to ping to count the network parameters for each ip, and finally output the results to an excel file.

The code is shown below:

#! /usr/bin/env python
# -*- coding: UTF-8 -*-
# File: pingtest_test.py
# Date: 2008-09-28
# Author: Michael Field
# Modified By:intheworld
# Date: 2017-4-17
import sys
import os
import getopt
import commands
import subprocess
import re
import time
import threading
import xlrd
import xlwt

TEST = [
  '220.181.57.217',
  '166.111.8.28',
  '202.114.0.242',
  '202.117.0.20',
  '202.112.26.34',
  '202.203.128.33',
  '202.115.64.33',
  '202.201.48.2',
  '202.114.0.242',
  '202.116.160.33',
  '202.202.128.33',
]
RESULT={}
def usage():
 print "USEAGE:"
 print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %[0]
 print "\t TESTsimpleIPlistings"
 print "\t-t times Number of tests;default1000;"
 print "\t-c concurrent number Number of parallel threads:default10"
 print "\t-h|-?, Helpful Information"
 print "\t The output is the current directory fileping_result.txt cap (a poem) ping_result.xls"
 print "for example:"
 print "\t./ping_test.py -n TEST -t 1 -c 10"

def div_list(ls,n):
 if not isinstance(ls,list) or not isinstance(n,int):
  return []
 ls_len = len(ls)
 print 'ls length = %s' %ls_len
 if n<=0 or 0==ls_len:
  return []
 if n > ls_len:
  return []
 elif n == ls_len:
  return [[i] for i in ls]
 else:
  j = ls_len/n
  k = ls_len%n
  ### j,j,j,... (preceded by n-1 j),j+k
  #Steps j, times n-1
  ls_return = []
  for i in xrange(0,(n-1)*j,j):
   ls_return.append(ls[i:i+j])
  # Counting the j+k at the end
  ls_return.append(ls[(n-1)*j:])
  return ls_return

def pin(IP):
 try:
  xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)
 except Exception:
  xpin = 'empty'
 ms = '=[0-9]+ms'.decode("utf8")
 print "%s" %ms
 print "%s" %xpin
 mstime=(ms,xpin)
 if not mstime:
  MS='timeout'
  return MS
 else:
  MS=().split('=')[1]
  return ('ms')
def count(total_count,I):
 global RESULT
 nowsecond = int(())
 nums = 0
 oknums = 0
 timeout = 0
 lostpacket = 0.0
 total_ms = 0.0
 avgms = 0.0
 maxms = -1
 while nums < total_count:
  nums += 1
  MS = pin(I)
  print 'pin output = %s' %MS
  if MS == 'timeout':
   timeout += 1
   lostpacket = timeout*100.0 / nums
  else:
   oknums += 1
   total_ms = total_ms + float(MS)
   if oknums == 0:
    oknums = 1
    maxms = float(MS)
    avgms = total_ms / oknums
   else:
    avgms = total_ms / oknums
    maxms = max(maxms, float(MS))
  RESULT[I] = (I, avgms, maxms, lostpacket)

def thread_func(t, ipList):
 if not isinstance(ipList,list):
  return
 else:
  for ip in ipList:
   count(t, ip)

def readIpsInFile(excelName):
 data = xlrd.open_workbook(excelName)
 table = ()[0]
 nrows = 
 print 'nrows %s' %nrows
 ips = []
 for i in range(nrows):
  (table.cell_value(i, 0))
  print table.cell_value(i, 0)
 return ips
 

if __name__ == '__main__':
 file = 'ping_result.txt'
 times = 10
 network = ''
 thread_num = 10
 args = [1:]
 try:
  (opts, getopts) = (args, 'n:t:c:h?')
 except:
  print "\nInvalid command line option detected."
  usage()
  (1)
 for opt, arg in opts:
  if opt in ('-n'):
   network = arg
  if opt in ('-h', '-?'):
   usage()
   (0)
  if opt in ('-t'):
   times = int(arg)
  if opt in ('-c'):
   thread_num = int(arg)
 f = open(file, 'w')
 workbook = ()
 sheet1 = workbook.add_sheet("sheet1", cell_overwrite_ok=True)
 if not isinstance(times,int):
  usage()
  (0)
 if network not in ['TEST'] and not (((__file__), network)):
  print "The network is wrong or excel file does not exist. please check it."
  usage()
  (0)
 else:
  if network == 'TEST':
   ips = TEST
  else:
   ips = readIpsInFile(network)
  print 'Starting...'
  threads = []
  nest_list = div_list(ips, thread_num)
  loops = range(len(nest_list))
  print 'Total %s Threads is working...' %len(nest_list)
  for ipList in nest_list:
   t = (target=thread_func,args=(times,ipList))
   (t)
  for i in loops:
   threads[i].start()
  for i in loops:
   threads[i].join()
  it = 0
  for line in RESULT:
   value = RESULT[line]
   (it, 0, line)
   (it, 1, str('%.2f'%value[1]))
   (it, 2, str('%.2f'%value[2]))
   (it, 3, str('%.2f'%value[3]))
   it+=1
   (line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')
  ()
  ('ping_result.xls')
  print 'Work Done. please check result %s and ping_result.xls.'%file

This code refers to someone else's implementation, although it is not particularly complex, here is a brief explanation.

  • The excel read and write uses xlrd and xlwt, basically using some simple api's.
  • Threading is used to implement multithreaded concurrency, much like the POSIX standard interface. thread_func is the thread's handler function, and its input contains a List of ip's, so the individual ip's are processed by looping through the function internally.
  • In addition, Python commands are not compatible under Windows, so the subprocess module is used.

So far, I don't have a good understanding of character sets inside Python, so the code for regular expression matching isn't strong enough, but it barely works for now, and I'll change it later if necessary!

summarize

The above is the entire content of this article, I hope that the content of this article on your learning or work can bring some help, if there are questions you can leave a message to exchange, thank you for my support.