使用Mqtt C++客户端发送和接收MQTT消息

2020-05-18 Lanffy 更多博文 » 博客 » GitHub »

C++ MQTT

原文链接 https://lanffy.github.io/2020/05/18/SentReceiveMQTTMessageInC++
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。



最近学习了MQTT C++这个库,阅读了其中的部分代码。简要记录如下。 本文章主要介绍在Windwos平台下,如何接入MQTT C++客户端,主要介绍使用MQTT C++客户端接收和发送消息。MQTT C++客户端的安装可以参考上一篇文章:Mqtt简介以及在Windows中编译安装Mqtt C++客户端

依赖库:

这些依赖库需要提前下载安装好。

Mqtt简介

简介这里不再赘述了,可以参考:Mqtt简介以及在Windows中编译安装Mqtt C++客户端

发送消息

引入用到的库和常量:

#include <iostream>
#include <cstdlib>
#include <string>
#include <map>
#include <vector>
#include <cstring>
#include "mqtt/client.h"

const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("33f1c750-01a6-4a26-9057-6a5adf0f80f5");
const std::string TOPIC("lanffy/test");
const int QOS = 1;

定义动作监听器,链接成功,消息发送成功后,都会回调相应的动作:

class user_callback : public virtual mqtt::callback
{
void connection_lost(const std::string& cause) override {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;
}

void delivery_complete(mqtt::delivery_token_ptr tok) override {
std::cout << "\n\t[Delivery complete for token: "
<< (tok ? tok->get_message_id() : -1) << "]" << std::endl;
}

public:
};

链接Mqtt Server并发送消息:

int main(int argc, char* argv[])
{
std::cout << "Initialzing..." << std::endl;
mqtt::client client(SERVER_ADDRESS, CLIENT_ID);

user_callback cb;
client.set_callback(cb);

mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
std::cout << "...OK" << std::endl;

try {
std::cout << "\nConnecting..." << std::endl;
client.connect(connOpts);
std::cout << "...OK" << std::endl;

// First use a message pointer.

std::cout << "\nSending message..." << std::endl;
auto pubmsg = mqtt::make_message(TOPIC, "Hello World,This is a message...");
pubmsg->set_qos(QOS);
client.publish(pubmsg);
std::cout << "...OK" << std::endl;

// Disconnect
std::cout << "\nDisconnecting..." << std::endl;
client.disconnect();
std::cout << "...OK" << std::endl;
}
catch (const mqtt::persistence_exception& exc) {
std::cerr << "Persistence Error: " << exc.what() << " ["
<< exc.get_reason_code() << "]" << std::endl;
return 1;
}
catch (const mqtt::exception& exc) {
std::cerr << exc.what() << std::endl;
return 1;
}

std::cout << "\nExiting" << std::endl;
return 0;
}

运行代码,同时使用MQTT BOX订阅相同的topic,即可收到消息。

接收消息

引入用到的库:

#include <iostream>
#include <cstdlib>
#include <string>
#include <cstring>
#include <cctype>
#include <thread>
#include <chrono>
#include "mqtt/async_client.h"

定义链接消息的常量:

const std::string SERVER_ADDRESS("tcp://broker.hivemq.com:1883");
const std::string CLIENT_ID("33f1c750-01a6-4a26-9057-6a5adf0f80f5");
const std::string TOPIC("lanffy/test");

const int QOS = 1;
const int N_RETRY_ATTEMPTS = 5;

定义消息接收监听器:

class action_listener : public virtual mqtt::iaction_listener
{
std::string name_;

void on_failure(const mqtt::token& tok) override {
std::cout << name_ << " failure";
if (tok.get_message_id() != 0)
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
std::cout << std::endl;
}

void on_success(const mqtt::token& tok) override {
std::cout << name_ << " success";
if (tok.get_message_id() != 0)
std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
auto top = tok.get_topics();
if (top && !top->empty())
std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
std::cout << std::endl;
}

public:
action_listener(const std::string& name) : name_(name) {}
};

/////////////////////////////////////////////////////////////////////////////


class callback : public virtual mqtt::callback,
public virtual mqtt::iaction_listener

{
int nretry_;
mqtt::async_client& cli_;
mqtt::connect_options& connOpts_;
action_listener subListener_;

void reconnect() {
std::this_thread::sleep_for(std::chrono::milliseconds(2500));
try {
cli_.connect(connOpts_, nullptr, *this);
}
catch (const mqtt::exception& exc) {
std::cerr << "Error: " << exc.what() << std::endl;
exit(1);
}
}

void on_failure(const mqtt::token& tok) override {
std::cout << "Connection attempt failed" << std::endl;
if (++nretry_ > N_RETRY_ATTEMPTS)
exit(1);
reconnect();
}

void on_success(const mqtt::token& tok) override {}

void connected(const std::string& cause) override {
std::cout << "\nConnection success" << std::endl;
std::cout << "\nSubscribing to topic '" << TOPIC << "'\n"
<< "\tfor client " << CLIENT_ID
<< " using QoS" << QOS << "\n"
<< "\nPress Q<Enter> to quit\n" << std::endl;

cli_.subscribe(TOPIC, QOS, nullptr, subListener_);
}

void connection_lost(const std::string& cause) override {
std::cout << "\nConnection lost" << std::endl;
if (!cause.empty())
std::cout << "\tcause: " << cause << std::endl;

std::cout << "Reconnecting..." << std::endl;
nretry_ = 0;
reconnect();
}


void message_arrived(mqtt::const_message_ptr msg) override {
std::cout << "Message arrived" << std::endl;
std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
}

void delivery_complete(mqtt::delivery_token_ptr token) override {}

public:
callback(mqtt::async_client& cli, mqtt::connect_options& connOpts)
: nretry_(0), cli_(cli), connOpts_(connOpts), subListener_("Subscription") {}
};

发起调用:

int main(int argc, char* argv[])
{
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);

mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID);

callback cb(client, connOpts);
client.set_callback(cb);

try {
std::cout << "Connecting to the MQTT server..." << std::flush;
client.connect(connOpts, nullptr, cb);
}
catch (const mqtt::exception&) {
std::cerr << "\nERROR: Unable to connect to MQTT server: '"
<< SERVER_ADDRESS << "'" << std::endl;
return 1;
}

while (std::tolower(std::cin.get()) != 'q')
;

try {
std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
client.disconnect()->wait();
std::cout << "OK" << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << exc.what() << std::endl;
return 1;
}

return 0;
}

通过MQTT BOX发送消息到相同的topic即可收到消息,如下图: