ranczo-io/services/floorheat_svc/mqtt_client.cpp
Bartosz Wieczorek 549ddfc615 refactor
2025-08-05 14:37:36 +02:00

191 lines
7.8 KiB
C++

#include "mqtt_client.hpp"
#include <boost/random/uniform_smallint.hpp>
#include <boost/system/system_error.hpp>
#include <config.hpp>
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <boost/json/object.hpp>
#include <boost/system/detail/error_code.hpp>
#include <boost/system/errc.hpp>
#include <boost/system/result.hpp>
#include <cstdint>
#include <exception>
#include <memory>
#include <spdlog/spdlog.h>
#include <boost/mqtt5.hpp>
#include <expected>
#include <unordered_map>
#include <boost/json/memory_resource.hpp>
#include <memory_resource>
// Adapter to use std::pmr::memory_resource with boost::json
class pmr_memory_resource_adapter : public boost::json::memory_resource {
std::pmr::memory_resource * std_resource_;
public:
explicit pmr_memory_resource_adapter(std::pmr::memory_resource * res) : std_resource_(res) {}
protected:
void * do_allocate(std::size_t size, std::size_t alignment) override {
return std_resource_->allocate(size, alignment);
}
void do_deallocate(void * p, std::size_t size, std::size_t alignment) override {
std_resource_->deallocate(p, size, alignment);
}
bool do_is_equal(const memory_resource & other) const noexcept override {
auto * o = dynamic_cast< const pmr_memory_resource_adapter * >(&other);
if(!o)
return false;
return std_resource_->is_equal(*o->std_resource_);
}
};
namespace ranczo {
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
using client_type = boost::mqtt5::mqtt_client< boost::asio::ip::tcp::socket >;
using mqtt_expected_void = expected< void, boost::system::error_code >;
struct AsyncMqttClient::AsyncMqttClientImpl {
const boost::asio::any_io_executor & _executor;
client_type _mqtt_client;
std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks;
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} {
spdlog::trace("Creating mqtt client");
// Configure the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
_mqtt_client
.brokers("192.168.100.6", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.async_run(boost::asio::detached); // Start the client.
start();
spdlog::trace("Creating mqtt client done");
}
~AsyncMqttClientImpl() = default;
awaitable_expected< void > subscribe(std::string_view topic) {
spdlog::trace("MQTT subscribe to {}", topic);
// Configure the request to subscribe to a Topic.
boost::mqtt5::subscribe_topic sub_topic = boost::mqtt5::subscribe_topic{topic.data(),
boost::mqtt5::subscribe_options{
.max_qos = boost::mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
.no_local = boost::mqtt5::no_local_e::yes, // Forward message from Clients with same ID.
.retain_as_published = boost::mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag.
.retain_handling = boost::mqtt5::retain_handling_e::send // Send retained messages when the subscription is established.
}};
// Subscribe to a single Topic.
auto && [ec, sub_codes, sub_props] =
co_await _mqtt_client.async_subscribe(sub_topic, boost::mqtt5::subscribe_props{}, use_nothrow_awaitable);
// Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call.
// An error can occur as a result of:
// a) wrong subscribe parameters
// b) mqtt_client::cancel is called while the Client is in the process of subscribing
if(ec) {
spdlog::error("MQTT subscribe error: {}", ec.message());
for(int i{}; i < sub_codes.size(); i++)
spdlog::error("MQTT subscribe suberror[{}]: {}", sub_codes[i].message());
co_return std::unexpected{ec};
}
if(sub_codes.empty()) {
spdlog::error("MQTT subscribe result contains no subcodes");
co_return std::unexpected{make_error_code(boost::system::errc::protocol_error)};
}
spdlog::info("MQTT subscribed to {}", topic);
for(int i{}; i < sub_codes.size(); i++)
spdlog::info("MQTT subscribe accepted: {}", sub_codes[i].message());
co_return mqtt_expected_void{}; // Success
}
/// TODO should be const
awaitable_expected< void > listen() {
std::array< uint8_t, 2048 > buf{};
spdlog::trace("MQTT client start");
for(;;) {
// Receive an Appplication Message from the subscribed Topic(s).
auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable);
if(ec) {
spdlog::warn("MQTT receive fail: {}", ec.message());
co_return unexpected{ec};
}
spdlog::debug("MQTT received topic {}, payload {}", topic, payload);
try {
buf.fill(0); // zero stack memory to avaio any junk
std::pmr::monotonic_buffer_resource mr{buf.data(), buf.size()}; // Create our memory resource on the stack
pmr_memory_resource_adapter boost_adapter(&mr); // Create a boost adapter
boost::json::storage_ptr sp(&boost_adapter); // Construct a non-owning pointer to the resource
auto value = boost::json::parse(payload, sp); // throws on parse error
if(auto it = _callbacks.find(topic); it != _callbacks.end()) {
/// TODO topic filtering as part of unordered map?
/// unordered map can map only one thing, maybe vector would be better?
spdlog::debug("MQTT executing callback for: {}", topic);
it->second(value); /// TODO pass topic to callbacl?
} else {
spdlog::warn("MQTT received unsupported topic: {}", topic);
}
} catch(const boost::system::system_error & e) {
spdlog::error("MQTT received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
continue;
}catch(const std::exception& e){
spdlog::error("MQTT catched an exception: {}, during callback execution", e.what());
continue;
}
}
co_return mqtt_expected_void{};
}
void start() {
spdlog::trace("co_spawn mqtt client");
boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached);
}
};
AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor)
: _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {}
awaitable_expected<void> AsyncMqttClient::subscribe(std::string_view topic, std::function< void(const boost::json::value &) > cb) noexcept {
BOOST_ASSERT(_impl);
BOOST_ASSERT(not topic.empty());
BOOST_ASSERT(cb);
spdlog::trace("MQTT subscribtion to {} started", topic);
ASYNC_CHECK_MSG(_impl->subscribe(topic), "MQTT subscribtion to {} failed", topic);
spdlog::trace("MQTT subscribtion to {} ok, registering callback", topic);
_impl->_callbacks[std::string{topic.data()}] = cb;
co_return mqtt_expected_void{};
}
awaitable_expected<void> AsyncMqttClient::listen() const noexcept{
BOOST_ASSERT(_impl);
spdlog::info("MQTT client listen");
ASYNC_CHECK(_impl->listen());
co_return mqtt_expected_void{};
}
AsyncMqttClient::~AsyncMqttClient() = default;
} // namespace ranczo