ranczo-io/libs/mqtt_client.cpp
2025-11-27 13:40:45 +01:00

431 lines
16 KiB
C++

#include <functional>
#include <optional>
#include <ranczo-io/utils/mqtt_client.hpp>
#include <ranczo-io/utils/json_helpers.hpp>
#include <ranczo-io/utils/memory_resource.hpp>
#include <ranczo-io/utils/logger.hpp>
#include <config.hpp>
#include <boost/asio.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/json.hpp>
#include <boost/json/object.hpp>
#include <boost/json/serialize.hpp>
#include <boost/mqtt5.hpp>
#include <boost/mqtt5/types.hpp>
#include <boost/random/uniform_smallint.hpp>
#include <boost/system/detail/error_code.hpp>
#include <boost/system/errc.hpp>
#include <boost/system/result.hpp>
#include <boost/system/system_error.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <algorithm>
#include <cstdint>
#include <exception>
#include <memory_resource>
#include <spdlog/spdlog.h>
class perf_counter {
using clock = std::chrono::steady_clock;
std::string _name;
clock::time_point _begin;
ranczo::ModuleLogger _log{spdlog::default_logger(), "PerfCounter " + _name};
public:
explicit perf_counter(std::string_view name)
: _name(name),
_begin(clock::now())
{}
~perf_counter() {
const auto end = clock::now();
const auto delta = std::chrono::duration_cast<std::chrono::microseconds>(end - _begin).count();
_log.debug("{} took {} us", _name, delta);
}
};
namespace ranczo {
class Topic {
public:
std::string pattern;
Topic() = default;
Topic(std::string_view p) : pattern(p) {}
bool operator==(const std::string_view other) const {
return match(other);
}
private:
static std::vector< std::string_view > split(std::string_view str) {
std::vector< std::string_view > tokens;
size_t start = 0;
while(start < str.size()) {
size_t end = str.find('/', start);
if(end == std::string_view::npos)
end = str.size();
tokens.emplace_back(str.substr(start, end - start));
start = end + 1;
}
return tokens;
}
bool match(std::string_view topic) const {
auto pattern_parts = split(pattern);
auto topic_parts = split(topic);
size_t i = 0;
for(; i < pattern_parts.size(); ++i) {
if(pattern_parts[i] == "#") {
return true;
}
if(i >= topic_parts.size()) {
return false;
}
if(pattern_parts[i] != "+" && pattern_parts[i] != topic_parts[i]) {
return false;
}
}
return i == topic_parts.size(); // dokładnie tyle samo segmentów
}
};
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
using client_type = boost::mqtt5::mqtt_client< boost::asio::ip::tcp::socket >;
struct AsyncMqttClient::SubscribtionToken {
boost::uuids::uuid uuid;
};
struct AsyncMqttClient::AsyncMqttClientImpl {
const boost::asio::any_io_executor & _executor;
ModuleLogger _log;
client_type _mqtt_client;
std::vector< std::tuple< Topic, callback_t, std::unique_ptr< AsyncMqttClient::SubscribtionToken > > > _callbacks;
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _log{spdlog::default_logger(), "MQTT Impl"}, _mqtt_client{_executor} {
_log.trace("Creating client");
// Configure the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
_mqtt_client
.brokers("192.168.10.10", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.async_run(boost::asio::detached); // Start the client.
start();
_log.trace("Creating client done");
}
~AsyncMqttClientImpl() = default;
const AsyncMqttClient::SubscribtionToken * add_callback(std::string_view topic, callback_t cb) {
_log.trace("registering callback for {}", topic);
auto token = std::make_unique< AsyncMqttClient::SubscribtionToken >(boost::uuids::random_generator()());
auto * token_ptr = token.get();
_callbacks.emplace_back(Topic{topic}, std::move(cb), std::move(token));
_log.trace("registering callback for {} DONE", topic);
return token_ptr;
}
void remove_callback(const AsyncMqttClient::SubscribtionToken * token) {
_log.trace("removing callback by token");
auto it = std::find_if(_callbacks.begin(), _callbacks.end(), [&](const auto & tuple) {
return std::get< std::unique_ptr< AsyncMqttClient::SubscribtionToken > >(tuple).get() == token;
});
if(it != _callbacks.end()) {
_callbacks.erase(it);
} else {
_log.error("token not found");
}
_log.trace("removing callback DONE");
}
bool has_subscription_for_topic(std::string_view topic) const {
// sprawdzamy dokładne dopasowanie patternu, nie wildcardowe dopasowanie (==)
return std::any_of(
_callbacks.begin(), _callbacks.end(), [&](const auto & tuple) { return std::get< Topic >(tuple).pattern == topic; });
}
awaitable_expected< const AsyncMqttClient::SubscribtionToken * > subscribe_with_callback(std::string_view topic, callback_t cb) {
BOOST_ASSERT(!topic.empty());
BOOST_ASSERT(cb);
_log.trace("subscribe_with_callback on topic: {}", topic);
const bool already = has_subscription_for_topic(topic);
// zawsze rejestrujemy callback
const auto * tok = add_callback(topic, std::move(cb));
if(already) {
_log.trace("subscription to {} already active, only callback registered", topic);
co_return tok;
}
_log.trace("subscription to {} started", topic);
auto status = co_await subscribe(topic);
if(!status) {
_log.error("failed to subscribe for topic {}", topic);
remove_callback(tok);
co_return unexpected{status.error()};
}
_log.trace("subscription to {} DONE", topic);
co_return tok;
}
awaitable_expected< void > subscribe(std::string_view topic) {
_log.trace("subscribe to {}", topic);
// switch to mqtt strand
co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable);
// Configure the request to subscribe to a Topic.
boost::mqtt5::subscribe_topic sub_topic = boost::mqtt5::subscribe_topic{topic.data(),
boost::mqtt5::subscribe_options{
.max_qos = boost::mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
.no_local = boost::mqtt5::no_local_e::yes, // Forward message from Clients with same ID.
.retain_as_published = boost::mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag.
.retain_handling = boost::mqtt5::retain_handling_e::send // Send retained messages when the subscription is established.
}};
// Subscribe to a single Topic.
_log.trace("calling async_subscribe");
auto && [ec, sub_codes, sub_props] =
co_await _mqtt_client.async_subscribe(sub_topic, boost::mqtt5::subscribe_props{}, use_nothrow_awaitable);
_log.trace("calling async_subscribe DONE");
// Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call.
// An error can occur as a result of:
// a) wrong subscribe parameters
// b) mqtt_client::cancel is called while the Client is in the process of subscribing
if(ec) {
_log.error("subscribe error: {}", ec.message());
for(int i{}; i < sub_codes.size(); i++)
_log.error("subscribe suberror[{}]: {}", i, sub_codes[i].message());
co_return unexpected{ec};
}
if(sub_codes.empty()) {
_log.error("subscribe result contains no subcodes");
co_return unexpected{make_error_code(boost::system::errc::protocol_error)};
}
_log.info("subscribed to {}", topic);
for(int i{}; i < sub_codes.size(); i++)
_log.info("subscribe accepted: {}", sub_codes[i].message());
co_return _void{}; // Success
}
awaitable_expected< void > publish(std::string_view topic, const boost::json::value & value) noexcept {
_log.trace("publish on: {}", topic);
// 1) execute in context of mqtt strand
co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable);
// 2) owning copies for topic and payload.
std::string t{topic};
std::string payload;
try {
payload = boost::json::serialize(value);
} catch(const std::bad_alloc & e) {
_log.error("cought bad_alloca exception: {}", e.what());
co_return unexpected{make_error_code(std::errc::not_enough_memory)};
}
// 3) QoS 0: only error code returned
auto [ec] = co_await _mqtt_client.async_publish< boost::mqtt5::qos_e::at_most_once >(
std::move(t), std::move(payload), boost::mqtt5::retain_e::no, boost::mqtt5::publish_props{}, boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec) {
_log.error("publish returned an exception: {}", ec.message());
co_return unexpected{ec};
}
co_return _void{};
}
void
handle_received_message(const std::string & topic, const std::string & payload, const boost::mqtt5::publish_props & publish_props) {
perf_counter _ {"handle_received_message"};
_log.debug("received topic {}, payload {}", topic, payload);
bool run = false;
for(const auto & [registeredTopic, cb, token] : _callbacks) {
if(registeredTopic == topic) {
run = true;
// kopiujemy topic/payload do lambda, żeby mieć pewny lifetime
std::string topic_copy = topic;
std::string payload_copy = payload;
boost::asio::co_spawn(
_executor,
[this, handler = cb, topic = std::move(topic_copy), payload = std::move(payload_copy), publish_props]() mutable
-> boost::asio::awaitable< void > {
try {
// --- PMR dla requestu ---
memory_resource::MonotonicStack_2k_Resource mr;
json::pmr_memory_resource_adapter adapter_req(&mr);
boost::json::storage_ptr sp_req(&adapter_req);
// parse payload z PMR
auto value = boost::json::parse(payload, sp_req);
// --- response_topic z MQTT props ---
std::optional< std::string > responseTopic;
if(auto rt = publish_props[boost::mqtt5::prop::response_topic]; rt) {
responseTopic = *rt; // std::string
}
CallbackData cbdata{
.topic = topic,
.request = value,
.responseTopic = responseTopic // std::optional<std::string>
};
if(responseTopic) {
_log.trace("got response topic, handling it properly");
// --- PMR dla response ---
ResponseData response;
std::array< std::uint8_t, 2048 > responseBuffer{0};
std::pmr::monotonic_buffer_resource mr_resp{responseBuffer.data(), responseBuffer.size()};
json::pmr_memory_resource_adapter adapter_resp(&mr_resp);
boost::json::storage_ptr sp_resp(&adapter_resp);
boost::json::value r(sp_resp);
response = std::ref(r); // make a reference wrapper object
// wywołanie callbacka z możliwością wypełnienia response
if(auto result = co_await handler(cbdata, response); !result) {
_log.warn("callback error: {}", result.error().message());
}
// jeśli mamy ustawiony topic i wypełniony JSON -> odeślij status
if(response && cbdata.responseTopic) {
_log.debug("sending response on topic {}", *cbdata.responseTopic);
auto pub_res = co_await publish(*cbdata.responseTopic, *response);
if(!pub_res) {
_log.warn("response publish error: {}", pub_res.error().message());
}
}
}else{
// wywołanie callbacka z pustym response
ResponseData response;
if(auto result = co_await handler(cbdata, response); !result) {
_log.warn("callback error: {}", result.error().message());
}
}
} catch(const boost::system::system_error & e) {
_log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
} catch(const std::exception & e) {
_log.error("callback threw exception: {}", e.what());
} catch(...) {
_log.error("callback threw unknown exception");
}
co_return;
},
boost::asio::detached);
}
}
if(!run) {
_log.warn("received unsupported topic: {}", topic);
}
}
awaitable_expected< void > listen() {
_log.trace("client start");
for(;;) {
auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable);
if(ec) {
_log.warn("receive fail: {}", ec.message());
continue;
}
try {
handle_received_message(topic, payload, publish_props);
} catch(const boost::system::system_error & e) {
_log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
continue;
} catch(const std::exception & e) {
_log.error("caught an exception: {}, during callback execution", e.what());
continue;
}
}
co_return _void{};
}
void start() {
_log.trace("co_spawn mqtt client");
boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached);
}
void cancel() {
_mqtt_client.cancel();
}
};
AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor)
: _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {}
awaitable_expected< const AsyncMqttClient::SubscribtionToken * > AsyncMqttClient::subscribe(std::string_view topic,
callback_t cb) noexcept {
BOOST_ASSERT(_impl);
BOOST_ASSERT(not topic.empty());
BOOST_ASSERT(cb);
_log.debug("subscribe");
co_return ASYNC_TRY(_impl->subscribe_with_callback(topic, std::move(cb)));
}
awaitable_expected< void > AsyncMqttClient::publish(std::string_view topic, const boost::json::value & value) noexcept {
BOOST_ASSERT(_impl);
BOOST_ASSERT(not topic.empty());
_log.debug("publish on {}", topic);
ASYNC_CHECK(_impl->publish(topic, value));
co_return _void{};
}
awaitable_expected< void > AsyncMqttClient::listen() const noexcept {
BOOST_ASSERT(_impl);
_log.debug("listen");
ASYNC_CHECK(_impl->listen());
co_return _void{};
}
void AsyncMqttClient::cancel() {
BOOST_ASSERT(_impl);
_log.debug("cancel");
_impl->cancel();
}
AsyncMqttClient::~AsyncMqttClient() = default;
} // namespace ranczo