add uuid for every mqtt callback

This commit is contained in:
Bartosz Wieczorek 2025-08-12 12:21:15 +02:00
parent 899659ab70
commit e29fad52d5

View File

@ -1,26 +1,25 @@
#include <ranczo-io/utils/mqtt_client.hpp>
#include <config.hpp>
#include <boost/random/uniform_smallint.hpp>
#include <boost/system/system_error.hpp>
#include <config.hpp>
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/object.hpp>
#include <boost/system/detail/error_code.hpp>
#include <boost/system/errc.hpp>
#include <boost/system/result.hpp>
#include <boost/mqtt5.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/json/memory_resource.hpp>
#include <cstdint>
#include <exception>
#include <memory>
#include <spdlog/spdlog.h>
#include <boost/mqtt5.hpp>
#include <expected>
#include <unordered_map>
#include <boost/json/memory_resource.hpp>
#include <memory_resource>
// Adapter to use std::pmr::memory_resource with boost::json
@ -103,7 +102,7 @@ using mqtt_expected_void = expected< void, boost::system::error_code >;
struct AsyncMqttClient::AsyncMqttClientImpl {
const boost::asio::any_io_executor & _executor;
client_type _mqtt_client;
std::unordered_map< std::string, std::function< awaitable_expected< void >(const boost::json::value &) > > _callbacks;
std::vector< std::tuple<Topic, callback_t, boost::uuids::uuid> > _callbacks;
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} {
spdlog::trace("Creating mqtt client");
@ -180,29 +179,36 @@ struct AsyncMqttClient::AsyncMqttClientImpl {
auto value = boost::json::parse(payload, sp); // throws on parse error
if(auto it = _callbacks.find(topic); it != _callbacks.end()) {
co_spawn(
_executor,
[handler = it->second, value]() -> boost::asio::awaitable< void > {
try {
if(auto result = co_await handler(value); not result) {
spdlog::warn("MQTT callback error: {}", result.error().message());
bool run = false;
for(const auto & [registeredTopic, cb, uuid] : _callbacks) {
if(registeredTopic == topic) {
run = true;
co_spawn(
_executor,
[handler = cb, value]() -> boost::asio::awaitable< void > {
try {
if(auto result = co_await handler(value); not result) {
spdlog::warn("MQTT callback error: {}", result.error().message());
}
} catch(const std::exception & e) {
spdlog::error("MQTT callback threw exception: {}", e.what());
} catch(...) {
spdlog::error("MQTT callback threw unknown exception");
}
} catch(const std::exception & e) {
spdlog::error("MQTT callback threw exception: {}", e.what());
} catch(...) {
spdlog::error("MQTT callback threw unknown exception");
}
co_return;
}(),
boost::asio::detached);
} else {
co_return;
}(),
boost::asio::detached);
}
}
if(not run) {
spdlog::warn("MQTT received unsupported topic: {}", topic);
}
} catch(const boost::system::system_error & e) {
spdlog::error("MQTT received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
continue;
}catch(const std::exception& e){
} catch(const std::exception & e) {
spdlog::error("MQTT catched an exception: {}, during callback execution", e.what());
continue;
}
@ -229,7 +235,7 @@ awaitable_expected<void> AsyncMqttClient::subscribe(std::string_view topic, call
ASYNC_CHECK_MSG(_impl->subscribe(topic), "MQTT subscribtion to {} failed", topic);
spdlog::trace("MQTT subscribtion to {} ok, registering callback", topic);
_impl->_callbacks[std::string{topic.data()}] = cb;
_impl->_callbacks.emplace_back(Topic{topic}, cb, boost::uuids::random_generator()());
co_return mqtt_expected_void{};
}