From e29fad52d525d22f46d4efe3af66de8e64189a3a Mon Sep 17 00:00:00 2001 From: Bartosz Wieczorek Date: Tue, 12 Aug 2025 12:21:15 +0200 Subject: [PATCH] add uuid for every mqtt callback --- libs/mqtt_client.cpp | 60 ++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/libs/mqtt_client.cpp b/libs/mqtt_client.cpp index d41e02e..7545ea1 100644 --- a/libs/mqtt_client.cpp +++ b/libs/mqtt_client.cpp @@ -1,26 +1,25 @@ #include +#include + #include #include -#include - #include #include - #include #include #include #include +#include +#include +#include +#include + #include #include #include #include - -#include #include -#include - -#include #include // Adapter to use std::pmr::memory_resource with boost::json @@ -103,7 +102,7 @@ using mqtt_expected_void = expected< void, boost::system::error_code >; struct AsyncMqttClient::AsyncMqttClientImpl { const boost::asio::any_io_executor & _executor; client_type _mqtt_client; - std::unordered_map< std::string, std::function< awaitable_expected< void >(const boost::json::value &) > > _callbacks; + std::vector< std::tuple > _callbacks; AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} { spdlog::trace("Creating mqtt client"); @@ -180,29 +179,36 @@ struct AsyncMqttClient::AsyncMqttClientImpl { auto value = boost::json::parse(payload, sp); // throws on parse error - if(auto it = _callbacks.find(topic); it != _callbacks.end()) { - co_spawn( - _executor, - [handler = it->second, value]() -> boost::asio::awaitable< void > { - try { - if(auto result = co_await handler(value); not result) { - spdlog::warn("MQTT callback error: {}", result.error().message()); + bool run = false; + for(const auto & [registeredTopic, cb, uuid] : _callbacks) { + if(registeredTopic == topic) { + run = true; + co_spawn( + _executor, + [handler = cb, value]() -> boost::asio::awaitable< void > { + try { + if(auto result = co_await handler(value); not result) { + spdlog::warn("MQTT callback error: {}", result.error().message()); + } + } catch(const std::exception & e) { + spdlog::error("MQTT callback threw exception: {}", e.what()); + } catch(...) { + spdlog::error("MQTT callback threw unknown exception"); } - } catch(const std::exception & e) { - spdlog::error("MQTT callback threw exception: {}", e.what()); - } catch(...) { - spdlog::error("MQTT callback threw unknown exception"); - } - co_return; - }(), - boost::asio::detached); - } else { + co_return; + }(), + boost::asio::detached); + } + } + + if(not run) { spdlog::warn("MQTT received unsupported topic: {}", topic); } + } catch(const boost::system::system_error & e) { spdlog::error("MQTT received bad json, parsing failed with error: {}/{}", e.code().message(), e.what()); continue; - }catch(const std::exception& e){ + } catch(const std::exception & e) { spdlog::error("MQTT catched an exception: {}, during callback execution", e.what()); continue; } @@ -229,7 +235,7 @@ awaitable_expected AsyncMqttClient::subscribe(std::string_view topic, call ASYNC_CHECK_MSG(_impl->subscribe(topic), "MQTT subscribtion to {} failed", topic); spdlog::trace("MQTT subscribtion to {} ok, registering callback", topic); - _impl->_callbacks[std::string{topic.data()}] = cb; + _impl->_callbacks.emplace_back(Topic{topic}, cb, boost::uuids::random_generator()()); co_return mqtt_expected_void{}; }