PyPortScanner
python multithreaded port scanner.
Example output:
Github
The source code, documentation and details of how to call this port scanner can be found atGithub PythonPortScanner by Yaokai。
contexts
Sometimes, while performing network related research, we need to perform some purposeful parameter measurements. Port scanning is one of the more common and important ones. Port scanning is the process of determining whether certain ports on a given host are open, or listening, through TCP handshaking or other means. The most widely used port scanning tool is nmap, which is undoubtedly a very powerful and easy-to-use program. However, nmap is a software running in the terminal, sometimes it is not very convenient to call in other code, or even no corresponding library. In addition, nmap relies on many other libraries, in the older system may not be able to use the newer nmap, which will cause the scanning inconvenience. Also, nmap requires root privileges when scanning. For this reason, I developed an efficient multi-threaded port scanner using the libraries that come with python 2.7 to fulfill the usage needs.
concrete realization
I. Scanning a given (ip,port) address pair using TCP handshake connection
In order to implement port scanning, we first understand how to use thepython socket
with the given(ip, port)
Perform a TCP handshake. In order to complete the TCP handshake, we need to initialize a TCP socket first. in thepython
The code to create a new TCP socket is as follows:
TCP_sock = (socket.AF_INET, socket.SOCK_STREAM) #(1) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2) TCP_sock.settimeout(delay) #(3)
included among these(1)
is the code that initializes the socket.socket.AF_INTE
parameterizationIPv4 socket
,socket.SOCK_STREAM
parameterizationTCP socket
. This way we have initialized a system that usesIPv4,TCP
protocol socket.
(2)
Used()
to set some other parameters of the socket.socket.SOL_SOCKET
Specifies that the current socket will use thesetsockopt()
The parameters that follow in thesocket.SO_REUSEPORT
Indicates that the current socket is using the reusable port setting.socket.SO_REUSEPORT
The exact meaning can be found inAnother one of my articles。
(3)
Set the socket's connection timeout todelay
The time (in seconds) corresponding to the variable. This is done to prevent us from waiting too long on a connection.
With an understanding of how to create a new socket, we can start working with the given(ip,port)
pair to make a TCP connection. The code is as follows:
try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except as e: output[port_number] = 'CLOSE' pass
Because this is an I/O operation, in order to handle possible exceptions, we need to add thetry,except
block handles this part of the operation. Secondly, we have a block based on thesocket.connect_ex()
method connects to the target address, and the status code returned by the method determines whether the connection was successful. The method returns0
means that the connection was successful. So when the return value is0
The current port is logged as open when the The other way around, the port is logged as closed. In addition, we also log the port as closed when the connection operation is abnormal, because it cannot be successfully connected (possibly due to firewalls or packet filtering, etc.).
Note that after the connection is complete we must call the()
method to close the TCP connection to the remote port. Otherwise, our scanning operation may cause a so-called Hanging TCP connection problem.
To summarize, the overall code for the TCP handshake scan is as follows:
""" Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __TCP_connect(ip, port_number, delay, output): # Initilize the TCP socket object TCP_sock = (socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) try: result = TCP_sock.connect_ex((ip, int(port_number))) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except as e: output[port_number] = 'CLOSE' pass
II. Multi-threaded scanning of ports
Single-threaded scanning, although logically simple, is undoubtedly and its inefficiency. It is an I/O intensive operation because of the large number of packets sent and received during the scanning process. If the scanning is done only with a single thread, the program will waste a lot of time in waiting for replies. Therefore a multi-threaded operation is necessary. Here, a natural thought is to start a separate thread for each port to be scanned.
Here we have set the list of ports to be scanned as the top 1000 most frequently used ports from Nmap:
For a given ip address, the scanning process looks like this:
1. Take out a port
2. Create a new thread using__TCP_connect()
function on this(ip,port)
Perform the connection operation.
3. Calls()
cap (a poem)()
method, which starts the scanning child thread and commands the main thread to wait for the child thread to die before terminating.
4. Repeat this process until all ports have been scanned.
Based on the above idea, the code for multithreaded scanning is as follows:
""" Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') """ def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while () < __thread_limit and port_index < len(__port_list): # Start threads thread = (target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) () # lock the thread until all threads complete () port_index = port_index + 1
where the __thread_limit parameter is used to limit the number of threads. output is a dictionary that starts with(port: status)
The results of the scan are saved in the form of the()
It is guaranteed that the main thread will only continue to execute after all the child threads have finished, thus ensuring that we will definitely scan all the ports.
III. Multi-threaded scanning of multiple websites
While scanning ports in multiple threads, the efficiency of scanning will be further improved if we can scan multiple websites in multiple threads. In order to achieve this, we need another thread to manage all the sub-threads corresponding to a website scanning its port.
In addition to this, in this case we have to delete the__scan_ports_helper()
hit the nail on the head()
. Otherwise the main thread will be blocked by the port scanning sub-thread and we won't be able to multi-thread scan multiple websites.
in the absence ofjoin()
How can we ensure that a site's scanning thread only returns after it has finished scanning all of its ports? The method I'm using here is to detectoutput
The length of the dictionary. Because after all the scanning is done, theoutput
The length must be the same as the length of__port_list
of the same length.
The changed code is as follows:
def __scan_ports_helper(ip, delay, output): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(__port_list): # Ensure that the number of cocurrently running threads does not exceed the thread limit while () < __thread_limit and port_index < len(__port_list): # Start threads thread = (target = __TCP_connect, args = (ip, __port_list[port_index], delay, output)) () port_index = port_index + 1 while (len(output) < len(self.target_ports)): continue
Based on the above code for the scanning thread, the code for the management thread for port scanning is shown below:
""" Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout """ def __scan_ports(websites, output_ip, delay): scan_result = {} for website in websites: website = str(website) scan_result[website] = {} thread = (target = __scan_ports_helper, args = (ip, delay, scan_result[website])) () # lock the script until all threads complete () return scan_result
At this point, we have completed all the code for a multi-threaded port scanner.
IV. Summary! Use these codes to scan a given website and output the results
For the sake of output convenience, I did not use the method of scanning multiple websites in multiple threads and scanning multiple ports in multiple threads for each website at the same time. In this example only multi-threaded scanning of ports, but only one website at a time is performed. The integrated code is as follows:
import sys import subprocess import socket import threading import time class PortScanner: # default ports to be scanned # or put any ports you want to scan here! __port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563] # default thread number limit __thread_limit = 1000 # default connection timeout time inseconds __delay = 10 """ Constructor of a PortScanner object Keyword arguments: target_ports -- the list of ports that is going to be scanned (default self.__port_list) """ def __init__(self, target_ports = None): # If target ports not given in the arguments, use default ports # If target ports is given in the arguments, use given port lists if target_ports is None: self.target_ports = self.__port_list else: self.target_ports = target_ports """ Return the usage information for invalid input host name. """ def __usage(self): print('python Port Scanner v0.1') print('please make sure the input host name is in the form of "" or "!"\n') """ This is the function need to be called to perform port scanning Keyword arguments: host_name -- the hostname that is going to be scanned message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def scan(self, host_name, message = ''): if 'http://' in host_name or 'https://' in host_name: host_name = host_name[host_name.find('://') + 3 : ] print('*' * 60 + '\n') print('start scanning website: ' + str(host_name)) try: server_ip = (str(host_name)) print('server ip is: ' + str(server_ip)) except as e: # If the DNS resolution of a website cannot be finished, abort that website. #print(e) print('hostname %s unknown!!!' % host_name) self.__usage() return {} # May need to return specificed values to the DB in the future start_time = () output = self.__scan_ports(server_ip, self.__delay, message) stop_time = () print('host %s scanned in %f seconds' %(host_name, stop_time - start_time)) print('finish scanning!\n') return output """ Set the maximum number of thread for port scanning Keyword argument: num -- the maximum number of thread running concurrently (default 1000) """ def set_thread_limit(self, num): num = int(num) if num <= 0 or num > 50000: print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!') print('The scanning process will use default thread limit!') return self.__thread_limit = num """ Set the time out delay for port scanning in seconds Keyword argument: delay -- the time in seconds that a TCP socket waits until timeout (default 10) """ def set_delay(self, delay): delay = int(delay) if delay <= 0 or delay > 100: print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)') print('The scanning process will use the default delay time') return self.__delay = delay """ Print out the list of ports being scanned """ def show_target_ports(self): print ('Current port list is:') print (self.target_ports) """ Print out the delay in seconds that a TCP socket waits until timeout """ def show_delay(self): print ('Current timeout delay is :%d' %(int(self.__delay))) """ Open multiple threads to perform port scanning Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports_helper(self, ip, delay, output, message): ''' Multithreading port scanning ''' port_index = 0 while port_index < len(self.target_ports): # Ensure that the number of cocurrently running threads does not exceed the thread limit while () < self.__thread_limit and port_index < len(self.target_ports): # Start threads thread = (target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message)) () port_index = port_index + 1 """ Controller of the __scan_ports_helper() function Keyword arguments: ip -- the ip address that is being scanned delay -- the time in seconds that a TCP socket waits until timeout message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __scan_ports(self, ip, delay, message): output = {} thread = (target = self.__scan_ports_helper, args = (ip, delay, output, message)) () # Wait until all port scanning threads finished while (len(output) < len(self.target_ports)): continue # Print openning ports from small to large for port in self.target_ports: if output[port] == 'OPEN': print(str(port) + ': ' + output[port] + '\n') return output """ Perform status checking for a given port on a given ip address using TCP handshake Keyword arguments: ip -- the ip address that is being scanned port_number -- the port that is going to be checked delay -- the time in seconds that a TCP socket waits until timeout output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE') message -- the message that is going to be included in the scanning packets, in order to prevent ethical problem (default: '') """ def __TCP_connect(self, ip, port_number, delay, output, message): # Initilize the TCP socket object TCP_sock = (socket.AF_INET, socket.SOCK_STREAM) TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) TCP_sock.settimeout(delay) # Initilize a UDP socket to send scanning alert message if there exists an non-empty message if message != '': UDP_sock = (socket.AF_INET, socket.SOCK_DGRAM) UDP_sock.sendto(str(message), (ip, int(port_number))) try: result = TCP_sock.connect_ex((ip, int(port_number))) if message != '': TCP_sock.sendall(str(message)) # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE if result == 0: output[port_number] = 'OPEN' else: output[port_number] = 'CLOSE' TCP_sock.close() except as e: output[port_number] = 'CLOSE' pass
This is the whole content of this article.