From 17795d9835c2f9cecc4db0d1de4d1036edd6987f Mon Sep 17 00:00:00 2001 From: Bartosz Wieczorek Date: Thu, 14 Aug 2025 10:59:01 +0200 Subject: [PATCH] Add thermometer class + refactor --- libs/CMakeLists.txt | 4 +- libs/date_utils.cpp | 26 ++++++++++++ libs/json_helpers.cpp | 19 +++++++++ libs/mqtt_client.cpp | 60 ++++++++++++++++++++++++--- libs/ranczo-io/utils/date_utils.hpp | 11 +++++ libs/ranczo-io/utils/json_helpers.hpp | 13 ++++++ 6 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 libs/date_utils.cpp create mode 100644 libs/json_helpers.cpp create mode 100644 libs/ranczo-io/utils/date_utils.hpp create mode 100644 libs/ranczo-io/utils/json_helpers.hpp diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 4a3a850..3155601 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -1,5 +1,7 @@ add_library(ranczo-io_utils - mqtt_client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ranczo-io/utils/mqtt_client.hpp + mqtt_client.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ranczo-io/utils/mqtt_client.hpp + json_helpers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ranczo-io/utils/json_helpers.hpp + date_utils.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ranczo-io/utils/date_utils.hpp ) add_library(ranczo-io::utils ALIAS ranczo-io_utils) diff --git a/libs/date_utils.cpp b/libs/date_utils.cpp new file mode 100644 index 0000000..2093844 --- /dev/null +++ b/libs/date_utils.cpp @@ -0,0 +1,26 @@ +#include + +#include + +namespace ranczo::date { +std::optional< std::chrono::system_clock::time_point > parse_timestamp_utc(std::string_view sv) noexcept { + try { + // Strip trailing 'Z' if present; Boost expects no timezone marker + std::string s(sv); + if(!s.empty() && (s.back() == 'Z' || s.back() == 'z')) + s.pop_back(); + + // Parse as extended ISO string + using namespace boost::posix_time; + ptime t = from_iso_extended_string(s); // may throw on invalid + static const ptime epoch{boost::gregorian::date{1970, 1, 1}}; + time_duration d = t - epoch; + + // Keep sub-second precision if present (microseconds is enough for the given example) + auto us = d.total_microseconds(); + return std::chrono::system_clock::time_point{std::chrono::microseconds{us}}; + } catch(...) { + return std::nullopt; + } +} +} // namespace ranczo::date diff --git a/libs/json_helpers.cpp b/libs/json_helpers.cpp new file mode 100644 index 0000000..4552e20 --- /dev/null +++ b/libs/json_helpers.cpp @@ -0,0 +1,19 @@ +#include + +#include + +std::optional ranczo::json::as_number(const boost::json::value &v) { + if(v.is_double()) + return v.as_double(); + if(v.is_int64()) + return static_cast< double >(v.as_int64()); + if(v.is_uint64()) + return static_cast< double >(v.as_uint64()); + if(v.is_string()) { + const auto & s = v.as_string(); + double out{}; + if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{}) + return out; + } + return std::nullopt; +} diff --git a/libs/mqtt_client.cpp b/libs/mqtt_client.cpp index f1871a8..672a87e 100644 --- a/libs/mqtt_client.cpp +++ b/libs/mqtt_client.cpp @@ -1,3 +1,7 @@ +#include "boost/mqtt5/types.hpp" +#include +#include +#include #include #include @@ -101,7 +105,7 @@ using client_type = boost::mqtt5::mqtt_client< boost::asio::i using mqtt_expected_void = expected< void, boost::system::error_code >; -///TODO this must old a reference count for all the possible callbacks +/// TODO this must old a reference count for all the possible callbacks struct AsyncMqttClient::SubscribtionToken { boost::uuids::uuid uuid; }; @@ -127,6 +131,10 @@ struct AsyncMqttClient::AsyncMqttClientImpl { awaitable_expected< void > subscribe(std::string_view topic) { spdlog::trace("MQTT subscribe to {}", topic); + + // switch to mqtt strand + co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable); + // Configure the request to subscribe to a Topic. boost::mqtt5::subscribe_topic sub_topic = boost::mqtt5::subscribe_topic{topic.data(), boost::mqtt5::subscribe_options{ @@ -163,6 +171,37 @@ struct AsyncMqttClient::AsyncMqttClientImpl { co_return mqtt_expected_void{}; // Success } + awaitable_expected publish(std::string_view topic, const boost::json::value & value) noexcept { + spdlog::debug("MQTT publish on: {}", topic); + + // 1) execute in context of mqtt strand + co_await boost::asio::dispatch(_mqtt_client.get_executor(), boost::asio::use_awaitable); + + // 2) owning copies for topic and payload. + std::string t{topic}; + std::string payload; + try { + payload = boost::json::serialize(value); // może rzucić bad_alloc + } catch(const std::bad_alloc & e) { + spdlog::error("METT cought bad_alloca exception: {}", e.what()); + co_return unexpected{make_error_code(std::errc::not_enough_memory)}; + } + + // 3) QoS 0: only error code returned + auto [ec] = co_await _mqtt_client.async_publish< boost::mqtt5::qos_e::at_most_once >(std::string{topic}, + boost::json::serialize(value), + boost::mqtt5::retain_e::no, + boost::mqtt5::publish_props{}, + boost::asio::as_tuple(boost::asio::use_awaitable)); + + if(ec){ + spdlog::error("MQTT publish returned an exception: {}", ec.message()); + co_return unexpected{ec}; + } + + co_return mqtt_expected_void{}; + } + /// TODO should be const awaitable_expected< void > listen() { std::array< uint8_t, 2048 > buf{}; @@ -238,7 +277,8 @@ struct AsyncMqttClient::AsyncMqttClientImpl { AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} -awaitable_expected AsyncMqttClient::subscribe(std::string_view topic, callback_t cb) noexcept { +awaitable_expected< const AsyncMqttClient::SubscribtionToken * > AsyncMqttClient::subscribe(std::string_view topic, + callback_t cb) noexcept { BOOST_ASSERT(_impl); BOOST_ASSERT(not topic.empty()); BOOST_ASSERT(cb); @@ -249,11 +289,21 @@ awaitable_expected AsyncMqttClient spdlog::trace("MQTT subscribtion to {} ok, registering callback", topic); const auto & [_1, _2, token] = _impl->_callbacks.emplace_back(Topic{topic}, cb, AsyncMqttClient::SubscribtionToken{boost::uuids::random_generator()()}); - - /// TODO return token + + spdlog::trace("MQTT subscribtion to {} done", topic); co_return &token; } +awaitable_expected< void > AsyncMqttClient::publish(std::string_view topic, const boost::json::value & value) noexcept { + BOOST_ASSERT(_impl); + BOOST_ASSERT(not topic.empty()); + + spdlog::info("MQTT client publish on {}", topic); + ASYNC_CHECK(_impl->publish(topic, value)); + + co_return mqtt_expected_void{}; +} + awaitable_expected< void > AsyncMqttClient::listen() const noexcept { BOOST_ASSERT(_impl); @@ -265,8 +315,8 @@ awaitable_expected< void > AsyncMqttClient::listen() const noexcept { void AsyncMqttClient::cancel() { BOOST_ASSERT(_impl); + spdlog::info("MQTT client cancel"); - _impl->cancel(); } diff --git a/libs/ranczo-io/utils/date_utils.hpp b/libs/ranczo-io/utils/date_utils.hpp new file mode 100644 index 0000000..45cdd8f --- /dev/null +++ b/libs/ranczo-io/utils/date_utils.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include +#include +#include + +namespace ranczo::date { +// Convert ISO-8601/RFC3339-ish string ("YYYY-MM-DDTHH:MM:SS[.fff]Z") to system_clock::time_point. +// Falls back to nullopt on parse failure. +std::optional< std::chrono::system_clock::time_point > parse_timestamp_utc(std::string_view sv) noexcept; +} // namespace ranczo::date diff --git a/libs/ranczo-io/utils/json_helpers.hpp b/libs/ranczo-io/utils/json_helpers.hpp new file mode 100644 index 0000000..caa5847 --- /dev/null +++ b/libs/ranczo-io/utils/json_helpers.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace boost::json{ +class value; +} + +namespace ranczo::json{ + +std::optional< double > as_number(const boost::json::value& v); + +}