#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class perf_counter { using clock = std::chrono::steady_clock; std::string _name; clock::time_point _begin; ranczo::ModuleLogger _log{spdlog::default_logger(), "PerfCounter " + _name}; public: explicit perf_counter(std::string_view name) : _name(name), _begin(clock::now()) {} ~perf_counter() { const auto end = clock::now(); const auto delta = std::chrono::duration_cast(end - _begin).count(); _log.debug("{} took {} us", _name, delta); } }; namespace ranczo { class Topic { public: std::string pattern; Topic() = default; Topic(std::string_view p) : pattern(p) {} bool operator==(const std::string_view other) const { return match(other); } private: static std::vector< std::string_view > split(std::string_view str) { std::vector< std::string_view > tokens; size_t start = 0; while(start < str.size()) { size_t end = str.find('/', start); if(end == std::string_view::npos) end = str.size(); tokens.emplace_back(str.substr(start, end - start)); start = end + 1; } return tokens; } bool match(std::string_view topic) const { auto pattern_parts = split(pattern); auto topic_parts = split(topic); size_t i = 0; for(; i < pattern_parts.size(); ++i) { if(pattern_parts[i] == "#") { return true; } if(i >= topic_parts.size()) { return false; } if(pattern_parts[i] != "+" && pattern_parts[i] != topic_parts[i]) { return false; } } return i == topic_parts.size(); // dokładnie tyle samo segmentów } }; constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); using client_type = boost::mqtt5::mqtt_client< boost::asio::ip::tcp::socket >; struct AsyncMqttClient::SubscribtionToken { boost::uuids::uuid uuid; }; struct AsyncMqttClient::AsyncMqttClientImpl { const boost::asio::any_io_executor & _executor; ModuleLogger _log; client_type _mqtt_client; std::vector< std::tuple< Topic, callback_t, std::unique_ptr< AsyncMqttClient::SubscribtionToken > > > _callbacks; AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _log{spdlog::default_logger(), "MQTT Impl"}, _mqtt_client{_executor} { _log.trace("Creating client"); // Configure the Client. // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. _mqtt_client .brokers("192.168.10.10", 1883) // Broker that we want to connect to. 1883 is the default TCP port. .async_run(boost::asio::detached); // Start the client. start(); _log.trace("Creating client done"); } ~AsyncMqttClientImpl() = default; const AsyncMqttClient::SubscribtionToken * add_callback(std::string_view topic, callback_t cb) { _log.trace("registering callback for {}", topic); auto token = std::make_unique< AsyncMqttClient::SubscribtionToken >(boost::uuids::random_generator()()); auto * token_ptr = token.get(); _callbacks.emplace_back(Topic{topic}, std::move(cb), std::move(token)); _log.trace("registering callback for {} DONE", topic); return token_ptr; } void remove_callback(const AsyncMqttClient::SubscribtionToken * token) { _log.trace("removing callback by token"); auto it = std::find_if(_callbacks.begin(), _callbacks.end(), [&](const auto & tuple) { return std::get< std::unique_ptr< AsyncMqttClient::SubscribtionToken > >(tuple).get() == token; }); if(it != _callbacks.end()) { _callbacks.erase(it); } else { _log.error("token not found"); } _log.trace("removing callback DONE"); } bool has_subscription_for_topic(std::string_view topic) const { // sprawdzamy dokładne dopasowanie patternu, nie wildcardowe dopasowanie (==) return std::any_of( _callbacks.begin(), _callbacks.end(), [&](const auto & tuple) { return std::get< Topic >(tuple).pattern == topic; }); } awaitable_expected< const AsyncMqttClient::SubscribtionToken * > subscribe_with_callback(std::string_view topic, callback_t cb) { BOOST_ASSERT(!topic.empty()); BOOST_ASSERT(cb); _log.trace("subscribe_with_callback on topic: {}", topic); const bool already = has_subscription_for_topic(topic); // zawsze rejestrujemy callback const auto * tok = add_callback(topic, std::move(cb)); if(already) { _log.trace("subscription to {} already active, only callback registered", topic); co_return tok; } _log.trace("subscription to {} started", topic); auto status = co_await subscribe(topic); if(!status) { _log.error("failed to subscribe for topic {}", topic); remove_callback(tok); co_return unexpected{status.error()}; } _log.trace("subscription to {} DONE", topic); co_return tok; } awaitable_expected< void > subscribe(std::string_view topic) { _log.trace("subscribe to {}", topic); // switch to mqtt strand co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable); // Configure the request to subscribe to a Topic. boost::mqtt5::subscribe_topic sub_topic = boost::mqtt5::subscribe_topic{topic.data(), boost::mqtt5::subscribe_options{ .max_qos = boost::mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2. .no_local = boost::mqtt5::no_local_e::yes, // Forward message from Clients with same ID. .retain_as_published = boost::mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag. .retain_handling = boost::mqtt5::retain_handling_e::send // Send retained messages when the subscription is established. }}; // Subscribe to a single Topic. _log.trace("calling async_subscribe"); auto && [ec, sub_codes, sub_props] = co_await _mqtt_client.async_subscribe(sub_topic, boost::mqtt5::subscribe_props{}, use_nothrow_awaitable); _log.trace("calling async_subscribe DONE"); // Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call. // An error can occur as a result of: // a) wrong subscribe parameters // b) mqtt_client::cancel is called while the Client is in the process of subscribing if(ec) { _log.error("subscribe error: {}", ec.message()); for(int i{}; i < sub_codes.size(); i++) _log.error("subscribe suberror[{}]: {}", i, sub_codes[i].message()); co_return unexpected{ec}; } if(sub_codes.empty()) { _log.error("subscribe result contains no subcodes"); co_return unexpected{make_error_code(boost::system::errc::protocol_error)}; } _log.info("subscribed to {}", topic); for(int i{}; i < sub_codes.size(); i++) _log.info("subscribe accepted: {}", sub_codes[i].message()); co_return _void{}; // Success } awaitable_expected< void > publish(std::string_view topic, const boost::json::value & value) noexcept { _log.trace("publish on: {}", topic); // 1) execute in context of mqtt strand co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable); // 2) owning copies for topic and payload. std::string t{topic}; std::string payload; try { payload = boost::json::serialize(value); } catch(const std::bad_alloc & e) { _log.error("cought bad_alloca exception: {}", e.what()); co_return unexpected{make_error_code(std::errc::not_enough_memory)}; } // 3) QoS 0: only error code returned auto [ec] = co_await _mqtt_client.async_publish< boost::mqtt5::qos_e::at_most_once >( std::move(t), std::move(payload), boost::mqtt5::retain_e::no, boost::mqtt5::publish_props{}, boost::asio::as_tuple(boost::asio::use_awaitable)); if(ec) { _log.error("publish returned an exception: {}", ec.message()); co_return unexpected{ec}; } co_return _void{}; } void handle_received_message(const std::string & topic, const std::string & payload, const boost::mqtt5::publish_props & publish_props) { perf_counter _ {"handle_received_message"}; _log.debug("received topic {}, payload {}", topic, payload); bool run = false; for(const auto & [registeredTopic, cb, token] : _callbacks) { if(registeredTopic == topic) { run = true; // kopiujemy topic/payload do lambda, żeby mieć pewny lifetime std::string topic_copy = topic; std::string payload_copy = payload; boost::asio::co_spawn( _executor, [this, handler = cb, topic = std::move(topic_copy), payload = std::move(payload_copy), publish_props]() mutable -> boost::asio::awaitable< void > { try { // --- PMR dla requestu --- memory_resource::MonotonicStack_2k_Resource mr; json::pmr_memory_resource_adapter adapter_req(&mr); boost::json::storage_ptr sp_req(&adapter_req); // parse payload z PMR auto value = boost::json::parse(payload, sp_req); // --- response_topic z MQTT props --- std::optional< std::string > responseTopic; if(auto rt = publish_props[boost::mqtt5::prop::response_topic]; rt) { responseTopic = *rt; // std::string } CallbackData cbdata{ .topic = topic, .request = value, .responseTopic = responseTopic // std::optional }; if(responseTopic) { _log.trace("got response topic, handling it properly"); // --- PMR dla response --- ResponseData response; std::array< std::uint8_t, 2048 > responseBuffer{0}; std::pmr::monotonic_buffer_resource mr_resp{responseBuffer.data(), responseBuffer.size()}; json::pmr_memory_resource_adapter adapter_resp(&mr_resp); boost::json::storage_ptr sp_resp(&adapter_resp); boost::json::value r(sp_resp); response = std::ref(r); // make a reference wrapper object // wywołanie callbacka z możliwością wypełnienia response if(auto result = co_await handler(cbdata, response); !result) { _log.warn("callback error: {}", result.error().message()); } // jeśli mamy ustawiony topic i wypełniony JSON -> odeślij status if(response && cbdata.responseTopic) { _log.debug("sending response on topic {}", *cbdata.responseTopic); auto pub_res = co_await publish(*cbdata.responseTopic, *response); if(!pub_res) { _log.warn("response publish error: {}", pub_res.error().message()); } } }else{ // wywołanie callbacka z pustym response ResponseData response; if(auto result = co_await handler(cbdata, response); !result) { _log.warn("callback error: {}", result.error().message()); } } } catch(const boost::system::system_error & e) { _log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what()); } catch(const std::exception & e) { _log.error("callback threw exception: {}", e.what()); } catch(...) { _log.error("callback threw unknown exception"); } co_return; }, boost::asio::detached); } } if(!run) { _log.warn("received unsupported topic: {}", topic); } } awaitable_expected< void > listen() { _log.trace("client start"); for(;;) { auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable); if(ec) { _log.warn("receive fail: {}", ec.message()); continue; } try { handle_received_message(topic, payload, publish_props); } catch(const boost::system::system_error & e) { _log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what()); continue; } catch(const std::exception & e) { _log.error("caught an exception: {}, during callback execution", e.what()); continue; } } co_return _void{}; } void start() { _log.trace("co_spawn mqtt client"); boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached); } void cancel() { _mqtt_client.cancel(); } }; AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} awaitable_expected< const AsyncMqttClient::SubscribtionToken * > AsyncMqttClient::subscribe(std::string_view topic, callback_t cb) noexcept { BOOST_ASSERT(_impl); BOOST_ASSERT(not topic.empty()); BOOST_ASSERT(cb); _log.debug("subscribe"); co_return ASYNC_TRY(_impl->subscribe_with_callback(topic, std::move(cb))); } awaitable_expected< void > AsyncMqttClient::publish(std::string_view topic, const boost::json::value & value) noexcept { BOOST_ASSERT(_impl); BOOST_ASSERT(not topic.empty()); _log.debug("publish on {}", topic); ASYNC_CHECK(_impl->publish(topic, value)); co_return _void{}; } awaitable_expected< void > AsyncMqttClient::listen() const noexcept { BOOST_ASSERT(_impl); _log.debug("listen"); ASYNC_CHECK(_impl->listen()); co_return _void{}; } void AsyncMqttClient::cancel() { BOOST_ASSERT(_impl); _log.debug("cancel"); _impl->cancel(); } AsyncMqttClient::~AsyncMqttClient() = default; } // namespace ranczo