SoFunction
Updated on 2024-11-13

Implement a chat room program based on python

In this article, the example for you to share the python implementation of a simple chat room specific code for your reference, the details are as follows

I just got into python programming, and I've been interested in the socket module since I got into java, so I made a little chat room program.

The program consists of a client and a server, using UDP services, with the server side bound to a local IP and port, and the client side with a port randomly selected by the system.

It realizes the functions of group sending, private sending and peer-to-peer file transfer.

client (computing)A self-built class inherits the Cmd module and operates with a custom command command, calling the corresponding do_command method.

Use the json module for encapsulated serialization of messages to be parsed at the receiver.

The client code is as follows:

import socket
import threading
import json
import os
from cmd import Cmd
 
 
class Client(Cmd):
 """
 Client
 """
 prompt = '>>>'
 intro = '[Welcome] Simple Chat Room Client(Cliblock of printing)\n' + '[Welcome] importationhelpto get help\n'
 buffersize = 1024
 
 def __init__(self, host):
  """
  Tectonic
  """
  super().__init__()
  self.__socket = (socket.AF_INET, socket.SOCK_DGRAM)
  # self.__id = None
  self.__nickname = None
  self.__host = host
  self.thread_recv = None
   = False
  # Whether documents are being received
   = False
  # Is the file being sent
   = False
   = None
   = None
 
  # Receive file packet count
   = None
  # Receive file name
   = None
  # Send filename
   = None
 
  # Sender
   = None
  # Recipient
   = None
 
  # Receive file streams
  self.file_recv = None
  # Send file streams
  self.file_send = None
 
  # Receive file address
  self.filefrom_addr = None
  # Send file address
  self.fileto_addr = None
 
 def __receive_message_thread(self):
  """
  Accepting message threads
  """
  while :
   # noinspection PyBroadException
   try:
    buffer, addr = self.__socket.recvfrom(1024)
    '''
    The file stream is sent directly from the sender without going through the server, so when a message is sent from the sender, the data received is stored in the file
    '''
    if (addr != self.__host) & (addr == self.filefrom_addr) & :
     self.file_recv.write(buffer)
      += 1
     if  * 1024 >= :
      self.file_recv.close()
      print(, 'is received.')
       = False
     continue
 
    js = (())
 
    # If the received data is a message message, display the
    if js['type'] == 'message':
     print(js['message'])
 
    # If the received data is a file send request, store the file information and display the
    elif js['type'] == 'filequest':
     if :
      self.__socket.sendto(({
       'type': 'fileres',
       'fileres': 'no',
       'nickname': self.__nickname,
       'who': js['nickname'],
       'errormessage': 'is transfroming files.',
      }).encode(), self.__host)
      continue
     filename = js['filename']
     who = js['nickname']
     filesize = js['filesize']
      = True
      = filesize
      = filename
      = 0
      = who
     self.filefrom_addr = (js['send_ip'], js['send_port'])
 
     print('[system]:', who, ' send a file(',
       filename, ') to you. receive? ')
 
    # Received data as a request reply, if agree to receive then store the address of the receiver from the server and open the send thread
    elif js['type'] == 'fileres':
     if js['fileres'] == 'yes':
      print(js['recv_ip'], js['recv_port'])
      self.fileto_addr = (js['recv_ip'], js['recv_port'])
      thread = (
       target=self.__send_file_thread)
      ()
     else:
      print(js['nickname'], js['errormessage'])
       = False
 
   except Exception as e:
    print(e)
    print('[Client] Unable to get data from server')
 
 def __send_broadcast_message_thread(self, message):
  """
  Sending a broadcast message thread
  :param message: message content
  """
  self.__socket.sendto(({
   'type': 'broadcast',
   'nickname': self.__nickname,
   'message': message,
  }).encode(), self.__host)
 
 def __send_file_thread(self):
  """
  Send file thread
  :param message: message content
  """
  filecount = 0
  print('[system]', 'sending the file...')
  while filecount * 1024 <= :
   self.__socket.sendto(
    self.file_send.read(1024), self.fileto_addr)
   filecount += 1
  self.file_send.close()
   = False
  print('[system]', 'the file is sended.')
 
 def __send_whisper_message_thread(self, who, message):
  """
  Send private message thread
  :param message: message content
  """
  self.__socket.sendto(({
   'type': 'sendto',
   'who': who,
   'nickname': self.__nickname,
   'message': message
  }).encode(), self.__host)
 
 def send_exit(self):
  self.__socket.sendto(({
   'type': 'offline',
   'nickname': self.__nickname,
  }).encode(), self.__host)
 
 
 def start(self):
  """
  Launching the client
  """
  ()
 
 def do_login(self, args):
  """
  Logging in to the chat room
  :param args: parameter
  """
  nickname = (' ')[0]
 
  # Send nicknames to the server, get user ids
  self.__socket.sendto(({
   'type': 'login',
   'nickname': nickname,
  }).encode(), self.__host)
  # Trying to receive data
 
  buffer = self.__socket.recvfrom(1300)[0].decode()
  obj = (buffer)
  if obj['login'] == 'success':
   self.__nickname = nickname
   print('[Client] Successfully logged into chat room')
    = True
   # Open sub-threads for receiving data
   self.thread_recv = (
    target=self.__receive_message_thread)
   self.thread_recv.setDaemon(True)
   self.thread_recv.start()
  else:
   print('[Client] Unable to log in to chat room', obj['errormessage'])
 
 def do_send(self, args):
  """
  Send message
  :param args: parameter
  """
  if self.__nickname is None:
   print('Please login first! login nickname')
   return
  message = args
  # Open a sub-thread for sending data
  thread = (
   target=self.__send_broadcast_message_thread, args=(message, ))
  (True)
  ()
 
 def do_sendto(self, args):
  """
  Send private message
  :param args: parameter
  """
  if self.__nickname is None:
   print('Please login first! login nickname')
   return
  who = (' ')[0]
  message = (' ')[1]
  # # Show messages sent by yourself
  # print('[' + str(self.__nickname) + '(' + str(self.__id) + ')' + ']', message)
  # Open a sub-thread for sending data
  thread = (
   target=self.__send_whisper_message_thread, args=(who, message))
  (True)
  ()
 
 def do_catusers(self, arg):
  if self.__nickname is None:
   print('Please login first! login nickname')
   return
  catmessage = ({'type': 'catusers'})
  self.__socket.sendto((), self.__host)
 
 def do_catip(self, args):
  if self.__nickname is None:
   print('Please login first! login nickname')
   return
  who = args
  catipmessage = ({'type': 'catip', 'who': who})
  self.__socket.sendto((), self.__host)
 
 def do_help(self, arg):
  """
  Help
  :param arg: parameter
  """
  command = (' ')[0]
  if command == '':
   print('[Help] login nickname - log in to the chat room, nickname is your chosen nickname')
   print('[Help] send message - send a message, message is the message you type')
   print('[Help] sendto who message - private message, who is the username, message is the message you type')
   print('[Help] catusers - see all users')
   print('[Help] catip who - view user IP, who is the username')
   print('[Help] sendfile who filedir - send a file to a user, who is the username, filedir is the file path')
   print('[Help] getfile filename who yes/no - Receive a file, filename is the filename, who is the sender, yes/no is whether to receive or not')
  elif command == 'login':
   print('[Help] login nickname - log in to the chat room, nickname is your chosen nickname')
  elif command == 'send':
   print('[Help] send message - send a message, message is the message you type')
  elif command == 'sendto':
   print('[Help] sendto who message - sends a private message, message is the message you type')
  else:
   print('[Help] didn't look up the command you wanted to know about.')
 
 def do_exit(self, arg): # Commands beginning with do_*
  print("Exit")
  self.send_exit()
  try:
    = False
   self.thread_recv.join()
  except Exception as e:
   print(e)
  # self.__socket.close()
 
 def do_sendfile(self, args):
  who = (' ')[0]
  filepath = (' ')[1]
  filename = ('\\')[-1]
  # Determine if a document is being sent
  if :
   print('you are sending files, please try later.')
   return
  if not (filepath):
   print('the file is not exist.')
   return
  filesize = (filepath)
  # print(who, filename, filesize)
 
   = True
   = who
   = filename
   = filesize
  self.file_send = open(filepath, 'rb')
 
  self.__socket.sendto(({
   'type': 'filequest',
   'nickname': self.__nickname,
   'filename': ,
   'filesize': ,
   'who': ,
   'send_ip': '',
   'send_port': '',
  }).encode(), self.__host)
 
  print('request send...')
 
  # fileres = self.__socket.recvfrom(1024)[0].decode()
  # js = (fileres)
 
 def do_getfile(self, args):
  filename = (' ')[0]
  who = (' ')[1]
  ch = (' ')[2]
  # print( is not None, filename, , who, )
  if ( is not None) & (filename == ) & (who == ):
   if ch == 'yes':
    self.file_recv = open(, 'wb')
    self.__socket.sendto(({
     'type': 'fileres',
     'fileres': 'yes',
     'nickname': self.__nickname,
     'who': who,
     'recv_ip': '',
     'recv_port': '',
    }).encode(), self.__host)
    print('you agree to reveive the file(', filename, ') from', who)
 
   else:
    self.__socket.sendto(({
     'type': 'fileres',
     'fileres': 'no',
     'nickname': self.__nickname,
     'errormessage': 'deny the file.',
     'who': who,
     'recv_ip': '',
     'recv_port': '',
    }).encode(), self.__host)
    print('you deny to reveive the file(', filename, ') from', who)
     = False
  else:
   print('the name or sender of the file is wrong.')
 
 
c = Client(('127.0.0.1', 12346))
()

server-sideMainly carries out the categorization and forwarding processing of messages, and the maintenance of user lists and address lists.

The server-side code is as follows:

import socket
import json
 
obj = (socket.AF_INET, socket.SOCK_DGRAM)
(('127.0.0.1', 12346))
 
conn_list = []
user_list = []
 
while True:
 try:
  receive_data, client_address = (1024)
  js = (receive_data.decode())
  # Login Messages
  if js['type'] == 'login':
 
   nickname = str(js['nickname'])
   if nickname in user_list:
    (({'login': 'fail',
          'errormessage': 'the nickname is exists'}).encode(),
       client_address)
   else:
    # Send notifications to other users
    for i in range(len(conn_list)):
     ((
      {
       'type': 'message',
       'message': '[system]' + nickname + 'Logged in...'
      }).encode(), conn_list[i])
    user_list.append(nickname)
    conn_list.append(client_address)
    print(nickname, client_address, 'Login successful!')
    (({'login': 'success',
          'nickname': nickname}).encode(), client_address)
 
  # Mass messages
  elif js['type'] == 'broadcast':
   message = js['message']
   nickname = js['nickname']
   for i in range(len(conn_list)):
    ((
     {
      'type': 'message',
      'message': nickname + ':' + message
     }).encode(), conn_list[i])
 
  # Private messages
  elif js['type'] == 'sendto':
   who = js['who']
   nickname = js['nickname']
   message = js['message']
   # Check if the user exists
   if who not in user_list:
    ((
     {
      'type': 'message',
      'message': who + ' not exist or not  try later.'
     }).encode(),
     client_address)
   else:
    ((
     {
      'type': 'message',
      'message': nickname + ' whisper to you: ' + message
     }).encode(),
     conn_list[user_list.index(who)])
 
  # View the list of users
  elif js['type'] == 'catusers':
   users = (user_list)
   ((
    {
     'type': 'message',
     'message': users,
    }).encode(),
    client_address)
 
  # View user IP
  elif js['type'] == 'catip':
   who = js['who']
   if who not in user_list:
    ((
     {
      'type': 'message',
      'message': who + ' not exist or not  try later.'
     }).encode(),
     client_address)
   else:
    addr = (conn_list[user_list.index(who)])
    ((
     {
      'type': 'message',
      'message': addr,
     }).encode(),
     client_address)
 
  # Offline messages
  elif js['type'] == 'offline':
   user_list.remove(js['nickname'])
   conn_list.remove(client_address)
   (
    (js['nickname'] + 'offline.').encode(),
    client_address)
   # Send notifications to other users
   for i in range(len(conn_list)):
    ((
     {
      'type': 'message',
      'message': '[system]' + nickname + 'Down the line...'
     }).encode(), conn_list[i])
 
  # Send document requests
  elif js['type'] == 'filequest':
   who = js['who']
   if who not in user_list:
    ((
     {
      'type': 'message',
      'message': who + ' not exist or not  try later.'
     }).encode(),
     client_address)
   else:
    js['send_ip'] = client_address[0]
    js['send_port'] = client_address[1]
    ((js).encode(),
       conn_list[user_list.index(who)])
    print(js['nickname'], 'request to send file to', who)
 
  # Send document request replies
  elif js['type'] == 'fileres':
   who = js['who']
   if js['fileres'] == 'yes':
    js['recv_ip'] = client_address[0]
    js['recv_port'] = client_address[1]
    print(js['nickname'], 'agree to receive file from', js['who'])
   else:
    print(js['nickname'], 'deny to receive file from', js['who'])
   ((js).encode(),
      conn_list[user_list.index(who)])
 
 except Exception as e:
  print(e)

This is the whole content of this article.