Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
A portscanner is simply a script that is trying to make a connection ta a device through a list of ports, in that way we can see which ports are open on a device. It’s a useful tool for penetration testing. In this tutorial we will make a portscanner using the socket library in Python. We will also make it multithreaded so it can scan multiple ports at the same time. That will increase the performance and the time it takes to perform a scan.
As a bonus we will also make it possible to parse arguments on the command line when running the script. So it will be possible to provide host to scan and portrange directly when running it, no need to change the code everytime. The code will be heavely commented just so it’s extra clear what everything does.
import argparse
import socket
import time
import sys
from threading import Thread, Lock
from queue import Queue
threads = 100 # Number of simultaneous threads to use, feel free to tweak
queue = Queue() # The queue for the threads
open_ports = [] # For storing all the open ports
print_lock = Lock() # Print lock so only one thread at a time can print
socket.setdefaulttimeout(0.15) # The timeout when attempting a connection
def scan_ports(port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Create socket
s.connect((host, port)) # Attempt connection on specified host and port
with print_lock: # Use lock to prevent multiple threads writing at the same time
print(port, 'is open')
open_ports.append(port) # Save all open ports for printing later
s.close()
except (socket.timeout, ConnectionRefusedError): # If we get ConnectionRefused exception, do nothing
pass
def scan_thread():
global queue
while True:
worker = queue.get() # Retrieve port number from queue
scan_ports(worker) # Scan the port using scan_ports() function
queue.task_done()
def main(host, ports):
global queue
startTime = time.time() # Timestamp to measure the time taken to scan
for t in range(threads):
t = Thread(target=scan_thread)
t.daemon = True
t.start() # Start each thread
for worker in ports: # Put each port into the queue
queue.put(worker)
queue.join() # Wait until are threads are finished
runtime = float("%0.2f" % (time.time() - startTime)) # Second timestamp, calculate time taken
print("Run time: ", runtime, "seconds")
print("Summary of open ports at the host", host, open_ports) # Print a summary of all open ports
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser(description="Simple port scanner")
parser.add_argument("--target", "-t", dest="host", help="Target host to scan.")
parser.add_argument("--ports", "-p", dest="port_range", default="1-65535", help="Specify port range to scan, if left blank ports 1 - 65535 will be scanned.")
args = parser.parse_args()
host, port_range = args.host, args.port_range
start_port, end_port = port_range.split("-")
start_port, end_port = int(start_port), int(end_port)
ports = [ p for p in range(start_port, end_port)]
main(host, ports)
except KeyboardInterrupt:
print("Interrupt received, scanning will stop...") # Catch keyboard interrupt exception (Ctrl - c)
You can find the full source code at my Github, or just execute command git clone https://github.com/alfadhir/Multithreaded-port-scanner.git
at your command line.
This website uses cookies. By continuing to use this site, you accept our use of cookies.