In this article, the example for you to share the python implementation of a simple chat room specific code for your reference, the details are as follows
I just got into python programming, and I've been interested in the socket module since I got into java, so I made a little chat room program.
The program consists of a client and a server, using UDP services, with the server side bound to a local IP and port, and the client side with a port randomly selected by the system.
It realizes the functions of group sending, private sending and peer-to-peer file transfer.
client (computing)A self-built class inherits the Cmd module and operates with a custom command command, calling the corresponding do_command method.
Use the json module for encapsulated serialization of messages to be parsed at the receiver.
The client code is as follows:
import socket import threading import json import os from cmd import Cmd class Client(Cmd): """ Client """ prompt = '>>>' intro = '[Welcome] Simple Chat Room Client(Cliblock of printing)\n' + '[Welcome] importationhelpto get help\n' buffersize = 1024 def __init__(self, host): """ Tectonic """ super().__init__() self.__socket = (socket.AF_INET, socket.SOCK_DGRAM) # self.__id = None self.__nickname = None self.__host = host self.thread_recv = None = False # Whether documents are being received = False # Is the file being sent = False = None = None # Receive file packet count = None # Receive file name = None # Send filename = None # Sender = None # Recipient = None # Receive file streams self.file_recv = None # Send file streams self.file_send = None # Receive file address self.filefrom_addr = None # Send file address self.fileto_addr = None def __receive_message_thread(self): """ Accepting message threads """ while : # noinspection PyBroadException try: buffer, addr = self.__socket.recvfrom(1024) ''' The file stream is sent directly from the sender without going through the server, so when a message is sent from the sender, the data received is stored in the file ''' if (addr != self.__host) & (addr == self.filefrom_addr) & : self.file_recv.write(buffer) += 1 if * 1024 >= : self.file_recv.close() print(, 'is received.') = False continue js = (()) # If the received data is a message message, display the if js['type'] == 'message': print(js['message']) # If the received data is a file send request, store the file information and display the elif js['type'] == 'filequest': if : self.__socket.sendto(({ 'type': 'fileres', 'fileres': 'no', 'nickname': self.__nickname, 'who': js['nickname'], 'errormessage': 'is transfroming files.', }).encode(), self.__host) continue filename = js['filename'] who = js['nickname'] filesize = js['filesize'] = True = filesize = filename = 0 = who self.filefrom_addr = (js['send_ip'], js['send_port']) print('[system]:', who, ' send a file(', filename, ') to you. receive? ') # Received data as a request reply, if agree to receive then store the address of the receiver from the server and open the send thread elif js['type'] == 'fileres': if js['fileres'] == 'yes': print(js['recv_ip'], js['recv_port']) self.fileto_addr = (js['recv_ip'], js['recv_port']) thread = ( target=self.__send_file_thread) () else: print(js['nickname'], js['errormessage']) = False except Exception as e: print(e) print('[Client] Unable to get data from server') def __send_broadcast_message_thread(self, message): """ Sending a broadcast message thread :param message: message content """ self.__socket.sendto(({ 'type': 'broadcast', 'nickname': self.__nickname, 'message': message, }).encode(), self.__host) def __send_file_thread(self): """ Send file thread :param message: message content """ filecount = 0 print('[system]', 'sending the file...') while filecount * 1024 <= : self.__socket.sendto( self.file_send.read(1024), self.fileto_addr) filecount += 1 self.file_send.close() = False print('[system]', 'the file is sended.') def __send_whisper_message_thread(self, who, message): """ Send private message thread :param message: message content """ self.__socket.sendto(({ 'type': 'sendto', 'who': who, 'nickname': self.__nickname, 'message': message }).encode(), self.__host) def send_exit(self): self.__socket.sendto(({ 'type': 'offline', 'nickname': self.__nickname, }).encode(), self.__host) def start(self): """ Launching the client """ () def do_login(self, args): """ Logging in to the chat room :param args: parameter """ nickname = (' ')[0] # Send nicknames to the server, get user ids self.__socket.sendto(({ 'type': 'login', 'nickname': nickname, }).encode(), self.__host) # Trying to receive data buffer = self.__socket.recvfrom(1300)[0].decode() obj = (buffer) if obj['login'] == 'success': self.__nickname = nickname print('[Client] Successfully logged into chat room') = True # Open sub-threads for receiving data self.thread_recv = ( target=self.__receive_message_thread) self.thread_recv.setDaemon(True) self.thread_recv.start() else: print('[Client] Unable to log in to chat room', obj['errormessage']) def do_send(self, args): """ Send message :param args: parameter """ if self.__nickname is None: print('Please login first! login nickname') return message = args # Open a sub-thread for sending data thread = ( target=self.__send_broadcast_message_thread, args=(message, )) (True) () def do_sendto(self, args): """ Send private message :param args: parameter """ if self.__nickname is None: print('Please login first! login nickname') return who = (' ')[0] message = (' ')[1] # # Show messages sent by yourself # print('[' + str(self.__nickname) + '(' + str(self.__id) + ')' + ']', message) # Open a sub-thread for sending data thread = ( target=self.__send_whisper_message_thread, args=(who, message)) (True) () def do_catusers(self, arg): if self.__nickname is None: print('Please login first! login nickname') return catmessage = ({'type': 'catusers'}) self.__socket.sendto((), self.__host) def do_catip(self, args): if self.__nickname is None: print('Please login first! login nickname') return who = args catipmessage = ({'type': 'catip', 'who': who}) self.__socket.sendto((), self.__host) def do_help(self, arg): """ Help :param arg: parameter """ command = (' ')[0] if command == '': print('[Help] login nickname - log in to the chat room, nickname is your chosen nickname') print('[Help] send message - send a message, message is the message you type') print('[Help] sendto who message - private message, who is the username, message is the message you type') print('[Help] catusers - see all users') print('[Help] catip who - view user IP, who is the username') print('[Help] sendfile who filedir - send a file to a user, who is the username, filedir is the file path') print('[Help] getfile filename who yes/no - Receive a file, filename is the filename, who is the sender, yes/no is whether to receive or not') elif command == 'login': print('[Help] login nickname - log in to the chat room, nickname is your chosen nickname') elif command == 'send': print('[Help] send message - send a message, message is the message you type') elif command == 'sendto': print('[Help] sendto who message - sends a private message, message is the message you type') else: print('[Help] didn't look up the command you wanted to know about.') def do_exit(self, arg): # Commands beginning with do_* print("Exit") self.send_exit() try: = False self.thread_recv.join() except Exception as e: print(e) # self.__socket.close() def do_sendfile(self, args): who = (' ')[0] filepath = (' ')[1] filename = ('\\')[-1] # Determine if a document is being sent if : print('you are sending files, please try later.') return if not (filepath): print('the file is not exist.') return filesize = (filepath) # print(who, filename, filesize) = True = who = filename = filesize self.file_send = open(filepath, 'rb') self.__socket.sendto(({ 'type': 'filequest', 'nickname': self.__nickname, 'filename': , 'filesize': , 'who': , 'send_ip': '', 'send_port': '', }).encode(), self.__host) print('request send...') # fileres = self.__socket.recvfrom(1024)[0].decode() # js = (fileres) def do_getfile(self, args): filename = (' ')[0] who = (' ')[1] ch = (' ')[2] # print( is not None, filename, , who, ) if ( is not None) & (filename == ) & (who == ): if ch == 'yes': self.file_recv = open(, 'wb') self.__socket.sendto(({ 'type': 'fileres', 'fileres': 'yes', 'nickname': self.__nickname, 'who': who, 'recv_ip': '', 'recv_port': '', }).encode(), self.__host) print('you agree to reveive the file(', filename, ') from', who) else: self.__socket.sendto(({ 'type': 'fileres', 'fileres': 'no', 'nickname': self.__nickname, 'errormessage': 'deny the file.', 'who': who, 'recv_ip': '', 'recv_port': '', }).encode(), self.__host) print('you deny to reveive the file(', filename, ') from', who) = False else: print('the name or sender of the file is wrong.') c = Client(('127.0.0.1', 12346)) ()
server-sideMainly carries out the categorization and forwarding processing of messages, and the maintenance of user lists and address lists.
The server-side code is as follows:
import socket import json obj = (socket.AF_INET, socket.SOCK_DGRAM) (('127.0.0.1', 12346)) conn_list = [] user_list = [] while True: try: receive_data, client_address = (1024) js = (receive_data.decode()) # Login Messages if js['type'] == 'login': nickname = str(js['nickname']) if nickname in user_list: (({'login': 'fail', 'errormessage': 'the nickname is exists'}).encode(), client_address) else: # Send notifications to other users for i in range(len(conn_list)): (( { 'type': 'message', 'message': '[system]' + nickname + 'Logged in...' }).encode(), conn_list[i]) user_list.append(nickname) conn_list.append(client_address) print(nickname, client_address, 'Login successful!') (({'login': 'success', 'nickname': nickname}).encode(), client_address) # Mass messages elif js['type'] == 'broadcast': message = js['message'] nickname = js['nickname'] for i in range(len(conn_list)): (( { 'type': 'message', 'message': nickname + ':' + message }).encode(), conn_list[i]) # Private messages elif js['type'] == 'sendto': who = js['who'] nickname = js['nickname'] message = js['message'] # Check if the user exists if who not in user_list: (( { 'type': 'message', 'message': who + ' not exist or not try later.' }).encode(), client_address) else: (( { 'type': 'message', 'message': nickname + ' whisper to you: ' + message }).encode(), conn_list[user_list.index(who)]) # View the list of users elif js['type'] == 'catusers': users = (user_list) (( { 'type': 'message', 'message': users, }).encode(), client_address) # View user IP elif js['type'] == 'catip': who = js['who'] if who not in user_list: (( { 'type': 'message', 'message': who + ' not exist or not try later.' }).encode(), client_address) else: addr = (conn_list[user_list.index(who)]) (( { 'type': 'message', 'message': addr, }).encode(), client_address) # Offline messages elif js['type'] == 'offline': user_list.remove(js['nickname']) conn_list.remove(client_address) ( (js['nickname'] + 'offline.').encode(), client_address) # Send notifications to other users for i in range(len(conn_list)): (( { 'type': 'message', 'message': '[system]' + nickname + 'Down the line...' }).encode(), conn_list[i]) # Send document requests elif js['type'] == 'filequest': who = js['who'] if who not in user_list: (( { 'type': 'message', 'message': who + ' not exist or not try later.' }).encode(), client_address) else: js['send_ip'] = client_address[0] js['send_port'] = client_address[1] ((js).encode(), conn_list[user_list.index(who)]) print(js['nickname'], 'request to send file to', who) # Send document request replies elif js['type'] == 'fileres': who = js['who'] if js['fileres'] == 'yes': js['recv_ip'] = client_address[0] js['recv_port'] = client_address[1] print(js['nickname'], 'agree to receive file from', js['who']) else: print(js['nickname'], 'deny to receive file from', js['who']) ((js).encode(), conn_list[user_list.index(who)]) except Exception as e: print(e)
This is the whole content of this article.