431 lines
16 KiB
C++
431 lines
16 KiB
C++
#include <functional>
|
|
#include <optional>
|
|
|
|
#include <ranczo-io/utils/mqtt_client.hpp>
|
|
#include <ranczo-io/utils/json_helpers.hpp>
|
|
#include <ranczo-io/utils/memory_resource.hpp>
|
|
#include <ranczo-io/utils/logger.hpp>
|
|
|
|
#include <config.hpp>
|
|
|
|
#include <boost/asio.hpp>
|
|
#include <boost/asio/as_tuple.hpp>
|
|
#include <boost/asio/use_awaitable.hpp>
|
|
#include <boost/json.hpp>
|
|
#include <boost/json/object.hpp>
|
|
#include <boost/json/serialize.hpp>
|
|
#include <boost/mqtt5.hpp>
|
|
#include <boost/mqtt5/types.hpp>
|
|
#include <boost/random/uniform_smallint.hpp>
|
|
#include <boost/system/detail/error_code.hpp>
|
|
#include <boost/system/errc.hpp>
|
|
#include <boost/system/result.hpp>
|
|
#include <boost/system/system_error.hpp>
|
|
#include <boost/uuid/uuid.hpp>
|
|
#include <boost/uuid/uuid_generators.hpp>
|
|
|
|
#include <algorithm>
|
|
#include <cstdint>
|
|
#include <exception>
|
|
#include <memory_resource>
|
|
#include <spdlog/spdlog.h>
|
|
|
|
class perf_counter {
|
|
using clock = std::chrono::steady_clock;
|
|
|
|
std::string _name;
|
|
clock::time_point _begin;
|
|
|
|
ranczo::ModuleLogger _log{spdlog::default_logger(), "PerfCounter " + _name};
|
|
|
|
public:
|
|
explicit perf_counter(std::string_view name)
|
|
: _name(name),
|
|
_begin(clock::now())
|
|
{}
|
|
|
|
~perf_counter() {
|
|
const auto end = clock::now();
|
|
const auto delta = std::chrono::duration_cast<std::chrono::microseconds>(end - _begin).count();
|
|
|
|
_log.debug("{} took {} us", _name, delta);
|
|
}
|
|
};
|
|
|
|
namespace ranczo {
|
|
class Topic {
|
|
public:
|
|
std::string pattern;
|
|
|
|
Topic() = default;
|
|
Topic(std::string_view p) : pattern(p) {}
|
|
|
|
bool operator==(const std::string_view other) const {
|
|
return match(other);
|
|
}
|
|
|
|
private:
|
|
static std::vector< std::string_view > split(std::string_view str) {
|
|
std::vector< std::string_view > tokens;
|
|
size_t start = 0;
|
|
while(start < str.size()) {
|
|
size_t end = str.find('/', start);
|
|
if(end == std::string_view::npos)
|
|
end = str.size();
|
|
tokens.emplace_back(str.substr(start, end - start));
|
|
start = end + 1;
|
|
}
|
|
return tokens;
|
|
}
|
|
|
|
bool match(std::string_view topic) const {
|
|
auto pattern_parts = split(pattern);
|
|
auto topic_parts = split(topic);
|
|
|
|
size_t i = 0;
|
|
for(; i < pattern_parts.size(); ++i) {
|
|
if(pattern_parts[i] == "#") {
|
|
return true;
|
|
}
|
|
|
|
if(i >= topic_parts.size()) {
|
|
return false;
|
|
}
|
|
|
|
if(pattern_parts[i] != "+" && pattern_parts[i] != topic_parts[i]) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return i == topic_parts.size(); // dokładnie tyle samo segmentów
|
|
}
|
|
};
|
|
|
|
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
|
|
using client_type = boost::mqtt5::mqtt_client< boost::asio::ip::tcp::socket >;
|
|
|
|
struct AsyncMqttClient::SubscribtionToken {
|
|
boost::uuids::uuid uuid;
|
|
};
|
|
|
|
struct AsyncMqttClient::AsyncMqttClientImpl {
|
|
const boost::asio::any_io_executor & _executor;
|
|
ModuleLogger _log;
|
|
client_type _mqtt_client;
|
|
std::vector< std::tuple< Topic, callback_t, std::unique_ptr< AsyncMqttClient::SubscribtionToken > > > _callbacks;
|
|
|
|
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _log{spdlog::default_logger(), "MQTT Impl"}, _mqtt_client{_executor} {
|
|
_log.trace("Creating client");
|
|
// Configure the Client.
|
|
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
|
|
_mqtt_client
|
|
.brokers("192.168.10.10", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
|
|
.async_run(boost::asio::detached); // Start the client.
|
|
|
|
start();
|
|
_log.trace("Creating client done");
|
|
}
|
|
|
|
~AsyncMqttClientImpl() = default;
|
|
|
|
const AsyncMqttClient::SubscribtionToken * add_callback(std::string_view topic, callback_t cb) {
|
|
_log.trace("registering callback for {}", topic);
|
|
|
|
auto token = std::make_unique< AsyncMqttClient::SubscribtionToken >(boost::uuids::random_generator()());
|
|
auto * token_ptr = token.get();
|
|
|
|
_callbacks.emplace_back(Topic{topic}, std::move(cb), std::move(token));
|
|
|
|
_log.trace("registering callback for {} DONE", topic);
|
|
return token_ptr;
|
|
}
|
|
|
|
void remove_callback(const AsyncMqttClient::SubscribtionToken * token) {
|
|
_log.trace("removing callback by token");
|
|
|
|
auto it = std::find_if(_callbacks.begin(), _callbacks.end(), [&](const auto & tuple) {
|
|
return std::get< std::unique_ptr< AsyncMqttClient::SubscribtionToken > >(tuple).get() == token;
|
|
});
|
|
|
|
if(it != _callbacks.end()) {
|
|
_callbacks.erase(it);
|
|
} else {
|
|
_log.error("token not found");
|
|
}
|
|
|
|
_log.trace("removing callback DONE");
|
|
}
|
|
|
|
bool has_subscription_for_topic(std::string_view topic) const {
|
|
// sprawdzamy dokładne dopasowanie patternu, nie wildcardowe dopasowanie (==)
|
|
return std::any_of(
|
|
_callbacks.begin(), _callbacks.end(), [&](const auto & tuple) { return std::get< Topic >(tuple).pattern == topic; });
|
|
}
|
|
|
|
awaitable_expected< const AsyncMqttClient::SubscribtionToken * > subscribe_with_callback(std::string_view topic, callback_t cb) {
|
|
BOOST_ASSERT(!topic.empty());
|
|
BOOST_ASSERT(cb);
|
|
|
|
_log.trace("subscribe_with_callback on topic: {}", topic);
|
|
|
|
const bool already = has_subscription_for_topic(topic);
|
|
|
|
// zawsze rejestrujemy callback
|
|
const auto * tok = add_callback(topic, std::move(cb));
|
|
|
|
if(already) {
|
|
_log.trace("subscription to {} already active, only callback registered", topic);
|
|
co_return tok;
|
|
}
|
|
|
|
_log.trace("subscription to {} started", topic);
|
|
auto status = co_await subscribe(topic);
|
|
if(!status) {
|
|
_log.error("failed to subscribe for topic {}", topic);
|
|
remove_callback(tok);
|
|
co_return unexpected{status.error()};
|
|
}
|
|
|
|
_log.trace("subscription to {} DONE", topic);
|
|
co_return tok;
|
|
}
|
|
|
|
awaitable_expected< void > subscribe(std::string_view topic) {
|
|
_log.trace("subscribe to {}", topic);
|
|
|
|
// switch to mqtt strand
|
|
co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable);
|
|
|
|
// Configure the request to subscribe to a Topic.
|
|
boost::mqtt5::subscribe_topic sub_topic = boost::mqtt5::subscribe_topic{topic.data(),
|
|
boost::mqtt5::subscribe_options{
|
|
.max_qos = boost::mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
|
|
.no_local = boost::mqtt5::no_local_e::yes, // Forward message from Clients with same ID.
|
|
.retain_as_published = boost::mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag.
|
|
.retain_handling = boost::mqtt5::retain_handling_e::send // Send retained messages when the subscription is established.
|
|
}};
|
|
|
|
// Subscribe to a single Topic.
|
|
_log.trace("calling async_subscribe");
|
|
auto && [ec, sub_codes, sub_props] =
|
|
co_await _mqtt_client.async_subscribe(sub_topic, boost::mqtt5::subscribe_props{}, use_nothrow_awaitable);
|
|
_log.trace("calling async_subscribe DONE");
|
|
// Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call.
|
|
|
|
// An error can occur as a result of:
|
|
// a) wrong subscribe parameters
|
|
// b) mqtt_client::cancel is called while the Client is in the process of subscribing
|
|
if(ec) {
|
|
_log.error("subscribe error: {}", ec.message());
|
|
for(int i{}; i < sub_codes.size(); i++)
|
|
_log.error("subscribe suberror[{}]: {}", i, sub_codes[i].message());
|
|
co_return unexpected{ec};
|
|
}
|
|
|
|
if(sub_codes.empty()) {
|
|
_log.error("subscribe result contains no subcodes");
|
|
co_return unexpected{make_error_code(boost::system::errc::protocol_error)};
|
|
}
|
|
|
|
_log.info("subscribed to {}", topic);
|
|
for(int i{}; i < sub_codes.size(); i++)
|
|
_log.info("subscribe accepted: {}", sub_codes[i].message());
|
|
|
|
co_return _void{}; // Success
|
|
}
|
|
|
|
awaitable_expected< void > publish(std::string_view topic, const boost::json::value & value) noexcept {
|
|
_log.trace("publish on: {}", topic);
|
|
|
|
// 1) execute in context of mqtt strand
|
|
co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable);
|
|
|
|
// 2) owning copies for topic and payload.
|
|
std::string t{topic};
|
|
std::string payload;
|
|
try {
|
|
payload = boost::json::serialize(value);
|
|
} catch(const std::bad_alloc & e) {
|
|
_log.error("cought bad_alloca exception: {}", e.what());
|
|
co_return unexpected{make_error_code(std::errc::not_enough_memory)};
|
|
}
|
|
|
|
// 3) QoS 0: only error code returned
|
|
auto [ec] = co_await _mqtt_client.async_publish< boost::mqtt5::qos_e::at_most_once >(
|
|
std::move(t), std::move(payload), boost::mqtt5::retain_e::no, boost::mqtt5::publish_props{}, boost::asio::as_tuple(boost::asio::use_awaitable));
|
|
|
|
if(ec) {
|
|
_log.error("publish returned an exception: {}", ec.message());
|
|
co_return unexpected{ec};
|
|
}
|
|
|
|
co_return _void{};
|
|
}
|
|
|
|
void
|
|
handle_received_message(const std::string & topic, const std::string & payload, const boost::mqtt5::publish_props & publish_props) {
|
|
perf_counter _ {"handle_received_message"};
|
|
_log.debug("received topic {}, payload {}", topic, payload);
|
|
|
|
bool run = false;
|
|
|
|
for(const auto & [registeredTopic, cb, token] : _callbacks) {
|
|
if(registeredTopic == topic) {
|
|
run = true;
|
|
|
|
// kopiujemy topic/payload do lambda, żeby mieć pewny lifetime
|
|
std::string topic_copy = topic;
|
|
std::string payload_copy = payload;
|
|
|
|
boost::asio::co_spawn(
|
|
_executor,
|
|
[this, handler = cb, topic = std::move(topic_copy), payload = std::move(payload_copy), publish_props]() mutable
|
|
-> boost::asio::awaitable< void > {
|
|
try {
|
|
// --- PMR dla requestu ---
|
|
memory_resource::MonotonicStack_2k_Resource mr;
|
|
|
|
json::pmr_memory_resource_adapter adapter_req(&mr);
|
|
boost::json::storage_ptr sp_req(&adapter_req);
|
|
|
|
// parse payload z PMR
|
|
auto value = boost::json::parse(payload, sp_req);
|
|
|
|
// --- response_topic z MQTT props ---
|
|
std::optional< std::string > responseTopic;
|
|
if(auto rt = publish_props[boost::mqtt5::prop::response_topic]; rt) {
|
|
responseTopic = *rt; // std::string
|
|
}
|
|
|
|
CallbackData cbdata{
|
|
.topic = topic,
|
|
.request = value,
|
|
.responseTopic = responseTopic // std::optional<std::string>
|
|
};
|
|
|
|
if(responseTopic) {
|
|
_log.trace("got response topic, handling it properly");
|
|
// --- PMR dla response ---
|
|
ResponseData response;
|
|
std::array< std::uint8_t, 2048 > responseBuffer{0};
|
|
|
|
std::pmr::monotonic_buffer_resource mr_resp{responseBuffer.data(), responseBuffer.size()};
|
|
json::pmr_memory_resource_adapter adapter_resp(&mr_resp);
|
|
boost::json::storage_ptr sp_resp(&adapter_resp);
|
|
boost::json::value r(sp_resp);
|
|
response = std::ref(r); // make a reference wrapper object
|
|
// wywołanie callbacka z możliwością wypełnienia response
|
|
if(auto result = co_await handler(cbdata, response); !result) {
|
|
_log.warn("callback error: {}", result.error().message());
|
|
}
|
|
|
|
// jeśli mamy ustawiony topic i wypełniony JSON -> odeślij status
|
|
if(response && cbdata.responseTopic) {
|
|
_log.debug("sending response on topic {}", *cbdata.responseTopic);
|
|
auto pub_res = co_await publish(*cbdata.responseTopic, *response);
|
|
if(!pub_res) {
|
|
_log.warn("response publish error: {}", pub_res.error().message());
|
|
}
|
|
}
|
|
}else{
|
|
// wywołanie callbacka z pustym response
|
|
ResponseData response;
|
|
if(auto result = co_await handler(cbdata, response); !result) {
|
|
_log.warn("callback error: {}", result.error().message());
|
|
}
|
|
}
|
|
} catch(const boost::system::system_error & e) {
|
|
_log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
|
|
} catch(const std::exception & e) {
|
|
_log.error("callback threw exception: {}", e.what());
|
|
} catch(...) {
|
|
_log.error("callback threw unknown exception");
|
|
}
|
|
|
|
co_return;
|
|
},
|
|
boost::asio::detached);
|
|
}
|
|
}
|
|
|
|
if(!run) {
|
|
_log.warn("received unsupported topic: {}", topic);
|
|
}
|
|
}
|
|
|
|
awaitable_expected< void > listen() {
|
|
_log.trace("client start");
|
|
for(;;) {
|
|
auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable);
|
|
|
|
if(ec) {
|
|
_log.warn("receive fail: {}", ec.message());
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
handle_received_message(topic, payload, publish_props);
|
|
} catch(const boost::system::system_error & e) {
|
|
_log.error("received bad json, parsing failed with error: {}/{}", e.code().message(), e.what());
|
|
continue;
|
|
} catch(const std::exception & e) {
|
|
_log.error("caught an exception: {}, during callback execution", e.what());
|
|
continue;
|
|
}
|
|
}
|
|
|
|
co_return _void{};
|
|
}
|
|
void start() {
|
|
_log.trace("co_spawn mqtt client");
|
|
boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached);
|
|
}
|
|
|
|
void cancel() {
|
|
_mqtt_client.cancel();
|
|
}
|
|
};
|
|
|
|
AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor)
|
|
: _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {}
|
|
|
|
awaitable_expected< const AsyncMqttClient::SubscribtionToken * > AsyncMqttClient::subscribe(std::string_view topic,
|
|
callback_t cb) noexcept {
|
|
BOOST_ASSERT(_impl);
|
|
BOOST_ASSERT(not topic.empty());
|
|
BOOST_ASSERT(cb);
|
|
|
|
_log.debug("subscribe");
|
|
co_return ASYNC_TRY(_impl->subscribe_with_callback(topic, std::move(cb)));
|
|
}
|
|
|
|
awaitable_expected< void > AsyncMqttClient::publish(std::string_view topic, const boost::json::value & value) noexcept {
|
|
BOOST_ASSERT(_impl);
|
|
BOOST_ASSERT(not topic.empty());
|
|
|
|
_log.debug("publish on {}", topic);
|
|
ASYNC_CHECK(_impl->publish(topic, value));
|
|
|
|
co_return _void{};
|
|
}
|
|
|
|
awaitable_expected< void > AsyncMqttClient::listen() const noexcept {
|
|
BOOST_ASSERT(_impl);
|
|
|
|
_log.debug("listen");
|
|
ASYNC_CHECK(_impl->listen());
|
|
|
|
co_return _void{};
|
|
}
|
|
|
|
void AsyncMqttClient::cancel() {
|
|
BOOST_ASSERT(_impl);
|
|
|
|
_log.debug("cancel");
|
|
_impl->cancel();
|
|
}
|
|
|
|
AsyncMqttClient::~AsyncMqttClient() = default;
|
|
|
|
} // namespace ranczo
|