1. Introduction
RabbitMQ
:Message QueueComponent, implementing the function of message transmission between two client hosts (publish & subscribe)
- Core concept: Switch, queue, binding, message
-
Switch type:
- Broadcast Exchange: When the switch receives a message, it publishes the message to all bound queues
-
Direct exchange: According to the
bkey
With boundrkey
Compare, put it in the queue if it is consistent -
Theme exchange:use
bkey
With boundrkey
Rule matching is performed, and if successful, it is placed in the queue
2. Installation
- Install:
sudo apt install rabbitmq-server
- Simple use:
# When the installation is completed, there is a user guest by default, but the permissions are not enough. You must create an administrator user to log in and publish subscription messages as remote login.#Add usersudo rabbitmqctl add_user root <PASSWORD> #Set user tagssudo rabbitmqctl set_user_tags root administrator #Set user permissionssudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ comes with a web management interface, execute the following command to enable it, the default port 15672sudo rabbitmq-plugins enable rabbitmq_management
2. Client library
C language library
C++ library
sudo apt install libev-dev #libev Network Library Componentsgit clone /CopernicaMarketingSoftware/ cd AMQP-CPP/ make make install
If the following error occurs during installation, it meansssl
There is a problem with the version
/usr/include/openssl/:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" 147 | # error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" | ^~~~~ In file included from /usr/include/openssl/:18, from linux_tcp/:20, from linux_tcp/:12: /usr/include/openssl/:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0' 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
Solution: Uninstall the current onessl
Repair and install the library
dpkg -l | grep ssl sudo dpkg -P --force-all libevent-openssl-2.1-7 sudo dpkg -P --force-all openssl sudo dpkg -P --force-all libssl-dev sudo apt --fix-broken install
-CPP
Simple use
1. Introduction
-
AMQP-CPP
is used withRabbitMq
C++ library for message middleware communication- It can parse from
RabbitMq
Data sent by the service can also be generated and sent to theRabbitMq
Data packets -
AMQP-CPP
The library will notRabbitMq
Establish network connection, all network IO is completed by the user
- It can parse from
-
AMQP-CPP
An optional network layer interface is provided, which predefinedTCP
The modules are not required to implement network IO by themselves.- You can also choose
libevent、libev、libuv、asio
Asynchronous communication components, the corresponding components need to be installed manually
- You can also choose
-
AMQP-CPP
Completely asynchronous, without blocking system calls, can be applied in high-performance applications without using threads - Notice: It requires support from C++17
2. Use
-
AMQP-CPP
There are two modes of use:- Use the default
TCP
Modules for network communication - Using extended
libevent、libev、libuv、asio
Asynchronous communication components for communication
- Use the default
- Here
libev
For example, you don't need to implement it yourselfmonitor
Functions can be used directlyAMQP::LibEvHandler
4. Classes and Interfaces
-
channel
It is a virtual connection, and multiple channels can be established on one connection- And all
RabbitMq
All instructions are passedchannel
transmission
- And all
- Therefore, the first step after the connection is to establish
channel
- Because all operations are asynchronous,
channel
The return value of the execution instruction on the execution cannot be used as the result of the operation
- Because all operations are asynchronous,
- Actually it returns
Deferred
class, you can use it to install processing functions
namespace AMQP { /** * Generic callbacks that are used by many deferred objects */ using SuccessCallback = std::function<void()>; using ErrorCallback = std::function<void(const char *message)>; using FinalizeCallback = std::function<void()>; /** * Declaring and deleting a queue */ using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>; // When using publisher acknowledgement, AckCallback will be called when the server acknowledgement message has been received and processed. using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; // When using the confirmation package channel, these callbacks will be called when the message is ack/nacked using PublishAckCallback = std::function<void()>; using PublishNackCallback = std::function<void()>; using PublishLostCallback = std::function<void()>; // Channel class class Channel { Channel(Connection *connection); bool connected(); /** *Declare the switch *If an empty name is provided, the server will assign a name. *The following flags can be used for switches: * *-durable persistence, the switch is still valid after restart *-autodelete After deleting all connected queues, automatically delete the exchange *-passive Only passive checks whether the switch exists *-internal Create internal exchange * *@param name The name of the switch *@param-type exchange type enum ExchangeType { fanout, broadcast exchange, bound queues can get messages direct, directly exchange, only handing messages to queues with consistent routingkey topic, topic exchange, handing messages to a queue that complies with bindingkey rules headers, consistent_hash, message_deduplication }; *@param flags switch flag *@param argumentsOther parameters * *This function returns a delay handler. Callbacks can be installed using onSuccess(), onError() and onFinalize() methods. */ Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments); /** *Declare queue *If a name is not provided, the server will be assigned a name. *flags can be a combination of the following values: * *-durable persistent queue remains valid after proxy restart *-autodelete Automatically delete the queue when all connected consumers leave *-passive Only passive checks whether the queue exists *-exclusive queue exists only on this connection and is automatically deleted when the connection is disconnected * *@param name The name of the queue *@param flags combination *@param arguments Optional parameters * *This function returns a delay handler. Callbacks can be installed *Use onSuccess(), onError() and onFinalize() methods. * Deferred &onError(const char *message) * *The onSuccess() callback that can be installed should have the following signature: void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); For example: ("myqueue").onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout << "Queue '" << name << "' "; std::cout << "has been declared with "; std::cout << messageCount; std::cout << " messages and "; std::cout << consumerCount; std::cout << " consumers" << std::endl; * }); */ DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments); /** *Bind the queue to the switch * *@param exchange source switch *@param queue Target queue *@param routingkey *@param arguments Other binding parameters * *This function returns a delay handler. Callbacks can be installed *Use onSuccess(), onError() and onFinalize() methods. */ Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments); /** * Publish messages to exchange *You must provide the name and routing key of the switch. RabbitMQ will then attempt to send the message to one or more queues. Using the optional flags parameter, you can specify what should happen if the message cannot be routed to the queue. By default, unchangeable messages are silently discarded. * *If the 'mandatory' or 'immediate' flags are set, Then unprocessed messages will be returned to the application. Before you start publishing, make sure you have called the recall()- method, And set up all appropriate handlers to handle these returned messages. * *The following flags can be provided: * *-mandatory If set, the server will return a message not sent to the queue *-immediate If set, the server will return a message that cannot be forwarded to the consumer immediately. *@param exchange to be published to *@param routingkey routing key *@param envelope full envelope to send *@param message to send *@param size message size *@param flags optional logo */ bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0); /** *Tell the RabbitMQ server that is ready to use messages - that is, subscribe to queue messages * *After calling this method, RabbitMQ starts delivering messages to the client application. consumer tag is a string identifier. If you want to stop it later via channel::cancel() call, You can use it to identify the user. *If you do not specify a user tag, the server will assign one to you. * *Support the following flags: * *-nolocal If set, messages posted on this channel will not be consumed at the same time *-noack If set, no confirmation of consumed messages is not necessary *-exclusive request exclusive access, only this user can access the queue * *@param queue The queue you want to use *@param tag The consumer tag associated with this consumption operation *@param flags Other tags *@param argumentsOther parameters * *This function returns a delay handler. You can use the onSuccess(), onError() and onFinalize() methods to install the callback The onSuccess() callback that can be installed should have the following format: void myCallback(const std::string_view&tag); Sample: ("myqueue").onSuccess( [](const std::string_view& tag) { std::cout << "Started consume under tag "; std::cout << tag << std::endl; }); */ DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments); /** *Confirm the received message * *The consumer client confirms and answers the received message * *When a message is received in the DeferredConsumer::onReceived() method, The message must be confirmed. So that RabbitMQ will remove it from the queue (unless consumed with the noack option) * *Support the following flags: * *-Multiple acknowledgement multiple messages: All previously unconfirmed messages will also be acknowledged * *@param deliveryTag The only delivery tag for the message *@param flags Optional logo *@return bool */ bool ack(uint64_t deliveryTag, int flags=0); }; class DeferredConsumer { /* Register a callback function that is called when the consumer starts void onSuccess(const std::string &consumertag) */ DeferredConsumer &onSuccess(const ConsumeCallback& callback); /* Register a callback function, used to be called when a complete message is received void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) */ DeferredConsumer &onReceived(const MessageCallback& callback); /* Alias for onReceived() */ DeferredConsumer &onMessage(const MessageCallback& callback); /* Register the function to be called when the server cancels the consumer void CancelCallback(const std::string &tag) */ DeferredConsumer &onCancelled(const CancelCallback& callback); }; class Message : public Envelope { const std::string &exchange(); const std::string &routingkey(); }; class Envelope : public MetaData { const char *body(); // Get the message body uint64_t bodySize(); // Get the message body size }; }
typedef struct ev_async { EV_WATCHER (ev_async); EV_ATOMIC_T sent; /* private */ }ev_async; //break type enum { EVBREAK_CANCEL = 0, /* undo unloop */ EVBREAK_ONE = 1, /* unloop once */ EVBREAK_ALL = 2 /* unloop all loops */ }; // Instantiate and get the IO event monitoring interface handlestruct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0)); # define EV_DEFAULT ev_default_loop (0) // Start running IO event monitoring, this is a blocking interfaceint ev_run (struct ev_loop *loop); /* break out of the loop */ // End IO monitoring// If you perform ev_run() on the main thread, you can call it directly,// If ev_run() is performed in other threads, it needs to be done through asynchronous notificationvoid ev_break (struct ev_loop *loop, int32_t break_type) ; void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents); // Initialize the asynchronous event structure and set the callback functionvoid ev_async_init(ev_async *w, callback cb); // Start asynchronous task processing in event monitoring loopvoid ev_async_start(struct ev_loop *loop, ev_async *w); // Send the current asynchronous event to execute in the asynchronous threadvoid ev_async_send(struct ev_loop *loop, ev_async *w);
5. Use
#include <> #include <> #include <amqpcpp/> #include <openssl/> #include <openssl/> int main() { // 1. Instantiate the IO event monitoring handle of the underlying network communication framework auto *loop = EV_DEFAULT; // 2. Instantiate the libEvHandler handle -> Associate the AMQP framework with event monitoring AMQP::LibEvHandler handler(loop); // 3. Instantiate the connection object AMQP::Address address("amqp://root:[email protected]:5672/"); AMQP::TcpConnection connection(&handler, address); // 4. Instantiate the channel object AMQP::TcpChannel channel(&connection); // 5. Declare the switch ("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "Declare the switch failed: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange switch creation successfully" << std::endl; }); // 6. Declare the queue ("test-queue") .onError([](const char *message) { std::cout << "Declaration queue failed: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-queue queue creation successfully" << std::endl; }); // 7. Bind the switch and queue ("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue binding failed: " \ << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange - test-queue binding succeeded" << std::endl; }); // 8. Publish a message to the switch for (int i = 0; i < 5; ++i) { std::string msg = "Hello SnowK-" + std::to_string(i); if(("test-exchange", "test-queue-key", msg) == false) { std::cout << "Publish failed" << std::endl; } } // 9. Start the underlying network communication framework -> Turn on IO ev_run(loop, 0); return 0; }
#include <> #include <> #include <amqpcpp/> #include <openssl/> #include <openssl/> void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) { std::string msg; ((), ()); // This cannot be used, AMQP::Message does not store '\0' after ' // std::cout << message << std::endl std::cout << msg << std::endl; channel->ack(deliveryTag); } int main() { // 1. Instantiate the IO event monitoring handle of the underlying network communication framework auto *loop = EV_DEFAULT; // 2. Instantiate the libEvHandler handle -> Associate the AMQP framework with event monitoring AMQP::LibEvHandler handler(loop); // 3. Instantiate the connection object AMQP::Address address("amqp://root:[email protected]:5672/"); AMQP::TcpConnection connection(&handler, address); // 4. Instantiate the channel object AMQP::TcpChannel channel(&connection); // 5. Declare the switch ("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "Declare the switch failed: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange switch creation successfully" << std::endl; }); // 6. Declare the queue ("test-queue") .onError([](const char *message) { std::cout << "Declaration queue failed: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-queue queue creation successfully" << std::endl; }); // 7. Bind the switch and queue ("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue binding failed: " \ << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange - test-queue binding succeeded"; }); // 8. Subscribe to message confrontation -> Set message processing callback function auto callback = std::bind(MessageCB, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); ("test-queue", "consume-tag") .onReceived(callback) .onError([](const char *message) { std::cout << "Subscribe to test-queue queue message failed: " << message << std::endl; exit(0); }); // 9. Start the underlying network communication framework -> Turn on IO ev_run(loop, 0); return 0; }
all: publish consume publish: g++ -o $@ $^ -lamqpcpp -lev -std=c++17 consume: g++ -o $@ $^ -lamqpcpp -lev -std=c++17 .PHONY:clean clean: rm publish consume
This is the article about the detailed explanation of C++ third-party library RabbitMq. This is all about this article. For more related C++ third-party library RabbitMq content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!