SoFunction
Updated on 2025-05-22

C# Example of using MQTTnet to implement communication between server and client

1. Introduction to MQTT protocol

MQTT (Message Queuing Telemetry Transport) is a lightweightPublish/SubscribeThe protocol is designed for low-bandwidth and high-latency network environments such as the Internet of Things (IoT). Core concepts include:

  • Broker: Message broker (server side), responsible for message routing.
  • Client: The terminal (client) that publishes or subscribes to messages.
  • Topic: Classification identifier of message (such assensor/temperature)。

2. Core features of MQTT protocol

MQTT (Message Queuing Telemetry Transport) is a lightweight communication protocol based on a publish/subscribe model, designed for resource-constrained devices and unreliable network environments. Its core advantages include:

Low bandwidth consumption: Adopt binary message format, with extremely small head overhead, suitable for IoT devices.

Asynchronous communication: Through topics (Topic), the broadcast and directional delivery of messages is realized, and message producers and consumers are decoupled.

Multi-level quality of service (QoS)
  • QoS 0(Maximum once): Message may be lost, no retransmission mechanism.
  • QoS 1(at least once): Ensure that the message is delivered, but may be repeated.
  • QoS 2(Only once): Strictly ensure message uniqueness and apply to key instructions.

Offline support: The server can cache the client's Retained Messages for subsequent subscribers to read.

3. The core functions of the MQTTNET library

MQTTnet is a fully functional MQTT implementation library in the .NET ecosystem, with the following features:

  • Protocol Compatibility: Fully support the MQTTv3.1.1 and MQTTv5 protocols, the latter has added advanced functions such as session timeout control and reason code feedback.
  • High performance design: Based on the asynchronous programming model (async/await), it supports high concurrent connections and message throughput.
  • Cross-platform support: Compatible with Windows, Linux, and macOS, it can be deployed in cloud, edge devices or container environments.
  • Extensibility: Provides flexible interceptors and event hooks to facilitate the integration of business logic (such as message filtering, logging).
  • Security: Supports TLS 1.3 encrypted communication, and can be authenticated by client via certificate or account password.

The framework used

frame Version
.net 4.7.2+
MQTTnet 4.3.3+

4. Detailed explanation of server (BROKER) implementation

Core responsibilities

  • Manage client connections and session status.
  • Route the message to the matching subscriber.
  • Implement security policies (authentication, permission control).

Key configuration items

  • Port binding: The default non-encrypted port is 1883 and the encryption port is 8883.
  • Connection Verification: Customizable verification logic, such as checking the client ID format and account password legality.
  • Session Management: Set the session expiration time and clean up inactive connections.

Event mechanism

  • Client connection/disconnect event: Used to monitor the online status of the device.
  • Message Interceptor: Insert processing logic before and after message is published or delivered (such as data format verification, sensitive information filtering).
  • Subscription Management: Dynamically track topic subscription relationships, support wildcard characters (+Single layer,#Multi-layer).

Persistence extension

  • Integrated databases (such as SQLite, MySQL) can store retained messages or session status to ensure that data is not lost after the service is restarted.

The following is the server code: (The method below can be replaced with its own log method)

public class MQTTServerHelper
{
    private MqttServer _server;//MQTT server object
    // Define a delegate and event (temporary storage of connection client data)    public event EventHandler<InterceptingPublishEventArgs> OnMessageReceived;
    public event EventHandler<bool> ServerStauts;
    public event EventHandler<ClientConnectedEventArgs> ClientConnected;
    public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected;
    public event EventHandler<ClientSubscribedTopicEventArgs> ClientSubscribedTopic;
    public event EventHandler<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic;

    /// <summary>
    /// Initialize the MQTT service and start the service    /// </summary>
    /// <param name="ip">IPV4 address</param>    /// <param name="port">Port: between 0~65535</param>    public Task StartMqtServer(string ip, int port) 
    { 
        MqtServerOptions mqtServerOptions = new MqtServerOptionsBuilder()                     
               .WithDefaultEndpoint()     
               .WithDefaultEndpointBoundIPAdres((ip) 
               .WithDefaultEndpointPort(port) 
               .WithDefaultComunicationTimeout((500) .Build();     
        _server = new MqtFactory().CreateMqtServer(mqtServerOptions); / createMQTServer object 
        _server.ValidatingConectionAsync += Server_ValidatingConectionAsync; /Verify username and password
        _server.ClientConectedAsync += Server_ClientConectedAsync; /Bind client connection event 
        _server.ClientDisconectedAsync += Server_ClientDisconectedAsync; /Bind client disconnect event 
        _server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync; /Bind client subscription topic events 
        _server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync; /Binding client unsubscribe topic events 
        _server.InterceptingPublishAsync += Server_InterceptingPublishAsync; /Message reception event 
        _server.ClientAcknowledgedPublishPacketAsync += Server_ClientAcknowledgedPublishPacketAsync; /Process data packets confirmed by the client         
        _server.InterceptingClientEnqueueAsync += Server_InterceptingClientEnqueueAsync; /Subscribe to intercept client message queue 
        _server.AplicationMesageNotConsumedAsync += Server_AplicationMesageNotConsumedAsync; /Application logic processing 
        _server.StartedAsync += Server_StartedAsync;/Bind server startup event 
        _server.StopedAsync += Server_StopedAsync;/Binding server stop event 
        return _server.StartAsync();
    }

    /// &lt;summary&gt;
    /// Handle client confirmation release event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_AplicationMesageNotConsumedAsync(AplicationMesageNotConsumedEventArgs e)
    { 
        try 
        {
            ($"【MesageNotConsumed】-SenderId:{}-Mesage:{()}");
        }
        catch (Exception ex) 
        { 
            ($"Server_AplicationMesageNotConsumedAsyncAn exception occurred:{}"); 
        }
        return ;
    }

    /// &lt;summary&gt;
    /// Subscribe to intercept client message queue events    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_InterceptingClientEnqueueAsync(InterceptingClientAplicationMesageEnqueueEventArgs e)  
    { 
        try 
        {
            ($"【InterceptingClientEnqueue】-SenderId:{}-Mesage:{()}"); 
        } 
        catch (Exception ex) 
        {
            ($"Server_InterceptingClientEnqueueAsyncAn exception occurred:{}");
        }
        return ;
    }
    
    /// &lt;summary&gt;
    /// Triggered when the client has processed the application message received from the MQT server.    /// This event can be used to confirm that the message has been processed, update the application status,    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs e)         
    { 
        try 
        { 
            ($"【ClientAcknowledgedPublishPacket】-SenderId:{}-Mesage:{Encoding.(()}");         
        } 
        catch (Exception ex) 
        { 
            ($"Server_ClientAcknowledgedPublishPacketAsyncAn exception occurred:{}"); 
        } 
        return ; 
    }

    /// &lt;summary&gt;
    /// Server message reception    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs e) 
    { 
        try 
        { 
            string client = ; string topic = ; 
            string contents = ();
            //Encoding.(();
            OnMesageReceived?.Invoke(this, e); 
            ($"Received a message:Client:【{client}】 Topic:【{topic}】 Mesage:【{contents}】"); 
        } 
        catch (Exception ex) 
        { 
            ($"Server_InterceptingPublishAsyncAn exception occurred:{}");
        } 
        return ;
    }

    /// &lt;summary&gt;
    /// Server disconnect event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_StoppedAsync(EventArgs arg) 
    { 
        return (new Action() =&gt; 
        { 
            ServerStauts?.Invoke(this, false); 
            ($"Server [IP:Port] has stopped MQT"); 
        }); 
    }

    /// &lt;summary&gt;
    /// Server startup event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    public Task Server_StartedAsync(EventArgs e) 
    {
        return (new Action() =&gt; 
        {
            ServerStauts?.Invoke(this, true); 
            ($"MQT is enabled on the server [IP:Port]"); 
        });
    }

    /// &lt;summary&gt;
    /// Client unsubscribes topic event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs e)         
    { 
        return (new Action() =&gt; 
        { 
            ClientUnsubscribedTopic?.Invoke(this, e); 
            ($"Client【{}】Unsubscribe to the topic【{}】"); 
        });
    }

    /// &lt;summary&gt;
    /// Client Subscription Topic Events    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs e) 
    { 
        return (new Action() =&gt;  
        { 
            ClientSubscribedTopic?.Invoke(this, e); 
            ($"Client【{}】Subscribe to topics【{}】");         
        }); 
    }

    /// &lt;summary&gt;
    /// Client disconnect event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ClientDisconectedAsync(ClientDisconectedEventArgs e) 
    { 
        return (new Action() =&gt; 
        { 
            ClientDisconected?.Invoke(this, e); 
            ($"Client已断开.ClientId:【{}】,Endpoint:【{}】.ReasonCode:【{}】,DisconectType:【{}】"); 
        });
    }

    /// &lt;summary&gt;
    /// Bind client connection event    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ClientConectedAsync(ClientConectedEventArgs e) 
    { 
        return (new Action() =&gt; 
        { 
            ClientConected?.Invoke(this, e); 
            ($"Client已连接.ClientId:【{}】,Endpoint:【{}】"); 
        }); 
    }

    /// &lt;summary&gt;
    /// Verify client events    /// &lt;/summary&gt;
    /// &lt;param name="e"&gt;&lt;/param&gt;
    private Task Server_ValidatingConectionAsync(ValidatingConectionEventArgs e) 
    { 
        return (new Action() =&gt; 
        { 
            if ( = "") 
            { 
                 = ; 
                ($"Client已验证成功.ClientId:【{}】,Endpoint:【{}】"); 
            } 
            else 
            { 
                 = ;         
                ($"Client验证失败.ClientId:【{}】,Endpoint:【{}】");         
            } 
        });
    }
}

5. Detailed explanation of client (Client) implementation

Connection Policy

  • Keeping up mechanism: Maintain long connections through Keep Alive to adapt to network fluctuations.

Message interaction mode

  • Post a message: Specify the target topic, load data, and QoS levels, and you can choose to set the retention flag.
  • Subscribe to topics: Supports single topic, multi topic or wildcard subscription, and the server will push matching messages.

Asynchronous processing

  • Use event delegates or asynchronous methods to process received messages to avoid blocking the main thread.

The following is the client code:

/// &lt;sumary&gt;
/// MQT client help class/// &lt;/sumary&gt;
public clas MQTClientHelper 
{ 
    private IMqtClient _client; 
    /// &lt;sumary&gt; 
    /// Receive message    /// &lt;/sumary&gt; 
    public MQTReceivedMesageHandle ReceivedMesage; 
    public bol IsConected { get; set; } = false; 
    public bol IsDisConected { get; set; } = true; 
    private string _serverIp; private int _serverPort; 
    /// &lt;sumary&gt; 
    /// Subscribe to the theme collection    /// &lt;/sumary&gt; 
    private Dictionary&lt;string, bol&gt; _subscribeTopicList = nul; 
    #region Connect/disconnect the server    /// &lt;sumary&gt; 
    /// Connect to the server    /// &lt;/sumary&gt; 
    /// <param name="serverIp">Server IP</param>    /// <param name="serverPort">Service Port Number</param>    public void Start(string serverIp, int serverPort) 
    { 
        this._serverIp = serverIp; 
        this._serverPort = serverPort; 
        if (!(serverIp) &amp; !(serverIp) &amp; serverPort &gt; 0) 
        { 
            try 
            { 
                var options = new MqtClientOptions() 
                { 
                    ClientId = "Client 2"//().ToString("N")
                }; 
                 = new MqtClientTcpOptions() 
                { 
                    Server = serverIp, Port = serverPort 
                }; // = new MqtClientCredentials(UserName, (Pasword);
                 = true; 
                 = (10);
                if (_client != nul) 
                { 
                    _client.DisconectAsync(); 
                    _client = nul; 
                } 
                _client = new MqtFactory().CreateMqtClient(); 
                _client.ConectedAsync += Client_ConectedAsync; //Bind client connection event                _client.DisconectedAsync += Client_DisconectedAsync; //Binding client disconnect event                _client.AplicationMesageReceivedAsync += Client_AplicationMesageReceivedAsync; /Binding message receiving event 
                _client.ConectAsync(options); //connect                } 
                catch (Exception ex) 
                { 
                    /("MQTClient connection server error:{0}", ); 
                }
            }
            else
            { 
                /("The MQT server address or port number cannot be empty!"); 
            }
        }
    } 

    /// &lt;sumary&gt; 
    /// Disconnect the MQT client    /// &lt;/sumary&gt; 
    public void Client_Disconect() 
    { 
        if (_client != nul) 
        { 
            _client.DisconectAsync(); 
            _client.Dispose(); 
            ($"Close the MQT client successfully!"); 
        }
    } 
    /// &lt;sumary&gt; 
    /// The client re-MQT server    /// &lt;/sumary&gt; 
    public void Client_ConectAsync() 
    { 
        if (_client != nul) 
        { 
            _client.ReconectAsync(); 
            ($"Connecting to the MQT server successfully!"); 
        } 
    } 
    #endregion 
    #region MQT method    /// &lt;sumary&gt; 
    /// Establish a connection between the client and the server    /// &lt;/sumary&gt; 
    /// &lt;param name="arg"&gt;&lt;/param&gt; 
    private Task Client_ConectedAsync(MqtClientConectedEventArgs arg) 
    { 
        return (new Action() =&gt; 
        { 
            IsConected = true; 
            IsDisConected = false; 
            ($"Connect toMQTSuccessful server.{}"); 
            //Subscribe to the topic (can receive messages from the server, and the same topic cannot be used for posting messages to the client)            try 
            { 
                if (_subscribeTopicList != nul &amp; _subscribeTopicList.Count &gt; 0) 
                { 
                    List&lt;string&gt; subscribeTopics = _subscribeTopicList.();         
                    foreach (var topic in subscribeTopics) 
                        SubscribeAsync(topic); 
                } 
            } 
            catch (Exception ex) 
            { 
                //("MQT client and server [{0}:{1}] connection establishment topic error: {2}", _serverIp, _serverPort, );            } 
        }); 
    } 

    /// &lt;sumary&gt; 
    /// Disconnect from the client and the server    /// &lt;/sumary&gt; / &lt;param name="arg"&gt;&lt;/param&gt; 
    private Task Client_DisconectedAsync(MqtClientDisconectedEventArgs arg) 
    { 
        return (new Action(async () =&gt; 
        { 
            IsConected = false; 
            IsDisConected = true; 
            ($"Disconnected to MQT server. Try to reconnect"); 
            try 
            { 
                await (30); 
                //MqtClientOptions options = new MqtClientOptions(); 
                //await (options); 
                await _client.ReconectAsync(); 
            } 
            catch (Exception ex) 
            { 
                //("MQT client disconnects from server [{0}:{1}] unsubscribe topic error: {2}", _serverIp, _serverPort, );            } 
        }); 
    } 
    /// &lt;sumary&gt; 
    /// Reconnect the client and server    /// &lt;/sumary&gt; 
    /// &lt;returns&gt;&lt;/returns&gt; 
    public Task ReconectedAsync() 
    { 
        try 
        { 
            if (_client != nul) 
            { 
                _client.ReconectAsync(); 
            } 
        } 
        catch (Exception ex) 
        { 
            // ("MQT client and server [{0}:{1}] reconnect unsubscribe topic error: {2}", _serverIp, _serverPort, );        } 
        return ; 
    } 
    /// &lt;sumary&gt; 
    /// The client receives a message    /// &lt;/sumary&gt; 
    /// &lt;param name="arg"&gt;&lt;/param&gt; 
    private Task Client_AplicationMesageReceivedAsync(MqtAplicationMesageReceivedEventArgs arg) 
    { 
        try 
        { 
            return (new Action() =&gt; 
            { 
                string msg = (); 
                ($"Receive message:{msg}\nQoS={}\nClient={}\ntheme:{}"); 
            });
        } 
        catch (Exception ex) 
        { 
            //("MQT received a message error from the server [{0}]: {1}", arg != nul ? : ", );        }
        return ; 
    } 
    #endregion 
    #region Subscribe to topic    /// &lt;sumary&gt; 
    /// Subscribe to the topic    /// &lt;/sumary&gt; 
    /// <param name="topic">Theme</param>    public void SubscribeAsync(string topic) 
    { 
        try 
        { 
            if (_subscribeTopicList = nul) 
                _subscribeTopicList = new Dictionary&lt;string, bol&gt;(); 
            if (_subscribeTopicList.ContainsKey(topic) &amp; _subscribeTopicList[topic]) 
            { 
                //("MQT client has subscribed to the topic [{0}] and cannot be subscribed repeatedly", topic);                return; 
            } 
            //Subscribe to the topic            _client?.SubscribeAsync(topic, ); 
            //Add a subscription cache            bol isSubscribed = _client != nul &amp; _client.IsConected ? true : false; 
            if (!_subscribeTopicList.ContainsKey(topic) 
                _subscribeTopicList.Ad(topic, isSubscribed); 
            else 
                _subscribeTopicList[topic] = isSubscribed; 
        } 
        catch (Exception ex) 
        { 
            //("MQT client subscription topic[{0}] error: {1}", topic, );        } 
    } 

    /// &lt;sumary&gt; 
    /// Subscribe to the theme collection    /// &lt;/sumary&gt; 
    /// <param name="topicList">Theme collection</param>    public void SubscribeAsync(List&lt;string&gt; topicList) 
    { 
        try 
        { 
            if (topicList = nul |  = 0) 
                return; 
            foreach (var topic in topicList) 
                SubscribeAsync(topic); 
        } 
        catch (Exception ex) 
        { 
            //("MQT client subscription topic collection error: {0}", );        } 
    } 
    /// &lt;sumary&gt; 
    /// Unsubscribe to the topic    /// &lt;/sumary&gt; 
    /// <param name="topic">Theme</param>    /// <param name="isRemove">Whether to remove the cache</param>    public void UnsubscribeAsync(string topic, bol isRemove = true) 
    { 
        try 
        { 
            if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) 
            { 
                //("MQT client unsubscribe topic [{0}] does not exist", topic);                return; 
            } 
            if (!_subscribeTopicList.ContainsKey(topic) 
            { 
                //("MQT client unsubscribe topic [{0}] does not exist", topic);                return; 
            } 
            //Unsubscribe to the topic            _client.UnsubscribeAsync(topic); 
            //Modify the cache status of the subscription topic            if (isRemove) 
                _subscribeTopicList.Remove(topic); 
            else 
                _subscribeTopicList[topic] = false;
        } 
        catch (Exception ex) 
        { 
            //("MQT client unsubscribes topic[{0}] error: {1}", topic, );        } 
    } 
    /// &lt;sumary&gt; 
    /// Unsubscribe to the theme collection    /// &lt;/sumary&gt; 
    /// <param name="topicList">Theme collection</param>    /// <param name="isRemove">Whether to remove the cache</param>    public void UnsubscribeAsync(List&lt;string&gt; topicList, bol isRemove = true) 
    { 
        try 
        { 
            if (topicList = nul |  = 0) 
                return; 
            foreach (var topic in topicList) 
                UnsubscribeAsync(topic, isRemove); 
        } 
        catch (Exception ex) 
        { 
            //("MQT client unsubscribes topic collection error: {0}", );        } 
    } 
    /// &lt;sumary&gt; 
    /// Whether the subscription topic exists    /// &lt;/sumary&gt; 
    /// <param name="topic">Theme</param>    public bol IsExistSubscribeAsync(string topic) 
    { 
        try 
        { 
        if (_subscribeTopicList = nul | _subscribeTopicList.Count = 0) 
            return false; 
        if (!_subscribeTopicList.ContainsKey(topic) 
            return false; 
        return _subscribeTopicList[topic];
        } 
        catch (Exception ex) 
        { 
            //("MQT client subscription topic [{0}] is error: {1}", topic, ); return false;        } 
    } 
    #endregion 
    #region Post a message    /// &lt;sumary&gt; 
    /// Post a message    /// Cannot use the same topic as the client receiving messages    /// &lt;/sumary&gt; 
    /// <param name="topic">Theme</param>    /// <param name="mesage">Message</param>    public async void PublishMesage(string topic, string mesage) 
    { 
        try 
        { 
            if (_client != nul) 
            { 
                if ((mesage) | (mesage) 
                { 
                    //("MQT client cannot publish empty messages!");                    return;
                } 
                MqtClientPublishResult result = await _client.PublishStringAsync(topic,mesage,);//Just once, QoS level 1                ($"Post a message-theme:{topic},information:{mesage},result: {}"); 
            } 
            else 
            { 
                //("MQT client is not connected to the server and cannot publish messages with topic [{0}]: {1}", topic, message);                return; 
            } 
        } 
        catch (Exception ex) 
        { 
            //("MQT client publishes message with topic [{0}]: {1}, error: {2}", topic, message, );        } 
    } 
    #endregion 
}

6. Summary

The MQTT communication system built through MQTTnet can provide efficient and reliable solutions for scenarios such as the Internet of Things, real-time message push, etc. During the development process, we need to focus on communication mode design, security policy implementation and performance tuning, and flexibly use QoS and message retention characteristics based on specific business needs. It is recommended to refer to official documents and community best practices to gradually expand functions (such as cluster deployment, message persistence) to meet the needs of large-scale applications.

This is the article about C# using MQTTnet to implement communication between server and client. For more related C# MQTTnet, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!