From 228401f5bbb14fc415347b74df3f94738a6e76cf Mon Sep 17 00:00:00 2001 From: Bartosz Wieczorek Date: Thu, 14 Aug 2025 11:06:13 +0200 Subject: [PATCH] Add thermometer class + refactor --- config.hpp | 63 +++++----- services/floorheat_svc/main.cpp | 48 +++++++- services/floorheat_svc/relay.hpp | 13 ++- .../floorheat_svc/temperature_controller.cpp | 62 +++++----- services/floorheat_svc/thermometer.cpp | 95 +++++++++++++++ services/floorheat_svc/thermometer.hpp | 109 ++++-------------- 6 files changed, 233 insertions(+), 157 deletions(-) diff --git a/config.hpp b/config.hpp index f5483ed..5d4f6e3 100644 --- a/config.hpp +++ b/config.hpp @@ -11,9 +11,12 @@ namespace ranczo { template < typename T > using awaitable_expected = boost::asio::awaitable< std::expected< T, boost::system::error_code > >; + template < typename Tp, typename Er > using expected = std::expected< Tp, Er >; +using _void = expected< void, boost::system::error_code >; + template < typename T > using unexpected = std::unexpected< T >; @@ -24,13 +27,13 @@ using ::boost::system::errc::make_error_code; // if(auto _res = co_await (expr); !_res) \ // co_return std::unexpected(_res.error()) -#define SYNC_TRY(failable) \ -({ \ - auto _result = failable; \ - if(!_result) \ - return unexpected{_result.error()}; \ - *_result; \ -}) +#define SYNC_TRY(failable) \ + ({ \ + auto _result = failable; \ + if(!_result) \ + return unexpected{_result.error()}; \ + *_result; \ + }) #define TRY(failable) \ ({ \ @@ -48,22 +51,22 @@ using ::boost::system::errc::make_error_code; *_result; \ }) -#define ASYNC_TRY_TRANSFORM_ERROR(failable, transform) \ - ({ \ - auto _result = co_await (failable); \ - if(!_result) \ +#define ASYNC_TRY_TRANSFORM_ERROR(failable, transform) \ + ({ \ + auto _result = co_await (failable); \ + if(!_result) \ co_return unexpected{transform(_result.error())}; \ - *_result; \ + *_result; \ }) -#define ASYNC_TRY_MSG(failable, ...) \ - ({ \ - auto _result = co_await (failable); \ - if(!_result) { \ - spdlog::error(__VA_ARGS__); \ +#define ASYNC_TRY_MSG(failable, ...) \ + ({ \ + auto _result = co_await (failable); \ + if(!_result) { \ + spdlog::error(__VA_ARGS__); \ co_return unexpected{_result.error()}; \ - } \ - *_result; \ + } \ + *_result; \ }) // // Same as above, but log a custom message on error before propagating @@ -73,20 +76,20 @@ using ::boost::system::errc::make_error_code; // co_return std::unexpected(_res.error()); \ // } -#define ASYNC_CHECK(failable) \ - do { \ - auto _result = co_await (failable); \ - if(!_result) \ - co_return unexpected(_result.error()); \ +#define ASYNC_CHECK(failable) \ + do { \ + auto _result = co_await (failable); \ + if(!_result) \ + co_return std::unexpected(_result.error()); \ } while(0) -#define ASYNC_CHECK_MSG(failable, ...) \ - do { \ - auto _result = co_await (failable); \ - if(!_result) { \ - spdlog::error(__VA_ARGS__); \ +#define ASYNC_CHECK_MSG(failable, ...) \ + do { \ + auto _result = co_await (failable); \ + if(!_result) { \ + spdlog::error(__VA_ARGS__); \ co_return unexpected{_result.error()}; \ - } \ + } \ } while(0) // // Check an awaitable that returns expected, but DO NOT propagate diff --git a/services/floorheat_svc/main.cpp b/services/floorheat_svc/main.cpp index e8f656b..3d92ff2 100644 --- a/services/floorheat_svc/main.cpp +++ b/services/floorheat_svc/main.cpp @@ -1,6 +1,7 @@ -#include +#include #include +#include #include #include #include @@ -8,14 +9,20 @@ #include #include +#include + #include +#include "config.hpp" #include "services/floorheat_svc/relay.hpp" #include "services/floorheat_svc/thermometer.hpp" #include "temperature_controller.hpp" #include +#include +#include #include +#include namespace ranczo { @@ -35,6 +42,39 @@ using namespace std::string_view_literals; std::atomic< bool > running = true; boost::asio::io_context * g_io = nullptr; +inline ranczo::awaitable_expected< void > forward_floor_temperature_all_rooms(ranczo::AsyncMqttClient & mqtt) { + const std::string filter = "home/+/floor/temperature"; + spdlog::info("Registering subscribtion for {}", filter); + + auto ec = + co_await mqtt.subscribe(filter, [&mqtt](const ranczo::AsyncMqttClient::CallbackData & data) -> ranczo::awaitable_expected< void > { + spdlog::debug("Got temperature on old topic: {}, trying to republish", data.topic); + std::string room; + { + // topic format: home/{room}/floor/temperature + std::vector< std::string > parts{}; + boost::algorithm::split(parts, data.topic, [](auto ch) { return ch == '/'; }); + if(parts.size() == 4) // got ok topic + room = parts.at(1); + } + + if(!room.empty()) { + std::string dst = "home/" + room + "/sensor/floor/temperature"; + spdlog::info("Republishing temperature from: {} to: {}", data.topic, dst); + ASYNC_CHECK(mqtt.publish(dst, data.payload)); + } + + co_return ranczo::_void{}; + }); + + spdlog::trace("Subscribtion pass, checking for errors"); + if(!ec) { + spdlog::error("Got error from subscribe: {}", ec.error().message()); + } + spdlog::trace("All good, returning"); + co_return ranczo::_void{}; +} + void signal_handler(int signum) { spdlog::warn("Signal received: {}, stopping io_context...", signum); running = false; @@ -58,6 +98,8 @@ int main() { boost::asio::any_io_executor io_executor = io_context.get_executor(); ranczo::AsyncMqttClient mqttClient{io_executor}; + co_spawn(io_executor, forward_floor_temperature_all_rooms(mqttClient), boost::asio::detached); + auto relayThermostatFactory = [&](auto room) { spdlog::debug("ThermostatFactory room {}: create relay", room); auto relay = std::make_unique< ranczo::MqttRelay >(io_executor, mqttClient); @@ -71,7 +113,7 @@ int main() { return std::make_shared< ranczo::RelayThermostat >(io_executor, mqttClient, std::move(relay), std::move(thermo), room); }; - // // PARTER + // Floor 0 _heaters.emplace_back(relayThermostatFactory("corridor"sv)); _heaters.emplace_back(relayThermostatFactory("utilityRoom"sv)); _heaters.emplace_back(relayThermostatFactory("wardrobe"sv)); @@ -83,7 +125,7 @@ int main() { _heaters.emplace_back(relayThermostatFactory("livingroom_zone2"sv)); _heaters.emplace_back(relayThermostatFactory("livingroom_zone3"sv)); - // // PIĘTRO + // Floor 1 _heaters.emplace_back(relayThermostatFactory("askaRoom"sv)); _heaters.emplace_back(relayThermostatFactory("maciejRoom"sv)); _heaters.emplace_back(relayThermostatFactory("playroom"sv)); diff --git a/services/floorheat_svc/relay.hpp b/services/floorheat_svc/relay.hpp index 4279e1d..61040f6 100644 --- a/services/floorheat_svc/relay.hpp +++ b/services/floorheat_svc/relay.hpp @@ -1,5 +1,6 @@ #pragma once +#include "services/floorheat_svc/thermometer.hpp" #include #include @@ -16,11 +17,13 @@ class Relay { class MqttRelay : public Relay { // Relay interface public: - MqttRelay(boost::asio::any_io_executor ex, AsyncMqttClient & mqtt){ - + MqttRelay(boost::asio::any_io_executor ex, AsyncMqttClient & mqtt) {} + + awaitable_expected< void > on() noexcept override { + co_return _void{}; + } + awaitable_expected< void > off() noexcept override { + co_return _void{}; } - - ranczo::awaitable_expected< void > on() noexcept override {} - ranczo::awaitable_expected< void > off() noexcept override {} }; } // namespace ranczo diff --git a/services/floorheat_svc/temperature_controller.cpp b/services/floorheat_svc/temperature_controller.cpp index 2de7d36..aff9545 100644 --- a/services/floorheat_svc/temperature_controller.cpp +++ b/services/floorheat_svc/temperature_controller.cpp @@ -1,7 +1,6 @@ #include "temperature_controller.hpp" #include "config.hpp" -#include "services/floorheat_svc/thermometer.hpp" #include #include #include @@ -9,11 +8,15 @@ #include #include #include +#include #include #include +#include "thermometer.hpp" + #include #include +#include #include @@ -33,8 +36,6 @@ */ namespace ranczo { -using heater_expected_void = expected< void, boost::system::error_code >; - struct TemperatureMeasurement { std::chrono::system_clock::time_point when; double temperature_C; @@ -68,8 +69,8 @@ struct RelayThermostat::Impl : private boost::noncopyable { std::unique_ptr< Thermometer > thermometer, std::string_view room) : _io{io}, _mqtt{mqtt}, _relay{std::move(relay)}, _temp{std::move(thermometer)}, _tickTimer{_io}, _room{room}, _measurements{100} { - BOOST_ASSERT(relay); - BOOST_ASSERT(not room.empty()); + BOOST_ASSERT(_relay); + BOOST_ASSERT(not _room.empty()); } ~Impl() = default; @@ -148,29 +149,15 @@ struct RelayThermostat::Impl : private boost::noncopyable { /// TODO fix spdlog::trace("RelayThermostat room {} subscribing to {}", _room, topic); ASYNC_CHECK_MSG(_mqtt.subscribe(topic, std::move(cb)), "Heater faild to subscribe on: {}", topic); - co_return heater_expected_void{}; - } - - inline expected< double, boost::system::error_code > to_double(const boost::json::value & v) const { - if(v.is_double()) - return v.as_double(); - if(v.is_int64()) - return static_cast< double >(v.as_int64()); - if(v.is_uint64()) - return static_cast< double >(v.as_uint64()); - if(v.is_string()) { - const auto & s = v.as_string(); - double out{}; - if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{}) - return out; - } - return unexpected{make_error_code(boost::system::errc::invalid_argument)}; + co_return _void{}; } inline expected< double, boost::system::error_code > get_value(const boost::json::value & jv, std::string_view key) { if(auto * obj = jv.if_object()) { if(auto * pv = obj->if_contains(key)) { - return SYNC_TRY(to_double(*pv)); + auto ovalue = json::as_number(*pv).value(); + if(ovalue) + return ovalue; } } return unexpected{make_error_code(boost::system::errc::invalid_argument)}; @@ -183,26 +170,41 @@ struct RelayThermostat::Impl : private boost::noncopyable { targetTemperature = TRY(get_value(data.payload, "value")); spdlog::trace("Heater target temperature update {} for {}", targetTemperature, _room); update = true; - co_return heater_expected_void{}; + co_return _void{}; }; ASYNC_CHECK(subscribe(topic, std::move(cb))); - co_return heater_expected_void{}; + co_return _void{}; + } + + awaitable_expected temperatureUpdate(const Thermometer::ThermometerData &data){ + spdlog::info("Got temperature update for: {}", _room); + + /// TODO sprawdzenie czy temp < treshold + /// TODO ustawienie przekaźniczka + /// TODO update watchdog'a + + co_return _void{}; } awaitable_expected< void > start() { + using namespace std::placeholders; + // subscribe to a thermometer readings - boost::asio::co_spawn(_io, _temp->listen(), boost::asio::detached); - + ASYNC_CHECK(_temp->on_update(std::bind(&Impl::temperatureUpdate, this, _1))); + // subscribe to a thermostat commands feed - ASYNC_CHECK_MSG(subscribeToCommandUpdate(), "subscribe to temp update failed"); - + ASYNC_CHECK_MSG(subscribeToCommandUpdate(), "subscribe to command stream failed"); + /// TODO subscribe to energy measurements + + // detaching listening thread + boost::asio::co_spawn(_io, _temp->listen(), boost::asio::detached); // ASYNC_CHECK_MSG(_tickTimer.start(), "failed to start timer"); // ASYNC_CHECK_MSG(_mqtt.listen(), "failed to listen"); - co_return heater_expected_void{}; + co_return _void{}; } }; diff --git a/services/floorheat_svc/thermometer.cpp b/services/floorheat_svc/thermometer.cpp index e69de29..1ddbc78 100644 --- a/services/floorheat_svc/thermometer.cpp +++ b/services/floorheat_svc/thermometer.cpp @@ -0,0 +1,95 @@ +#include "thermometer.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace ranczo { +awaitable_expected< void > MqttThermometer::on_update(async_cb handler) noexcept { + handler_ = std::move(handler); + auto token = ASYNC_TRY(mqtt_.subscribe( + topic::temperature::floor(cfg_.room), [this](const AsyncMqttClient::CallbackData & data) -> awaitable_expected< void > { + // Ensure controller state is touched on our strand + co_await boost::asio::dispatch(strand_, boost::asio::use_awaitable); + // Parse numeric value (root number or object[key]) + if(auto v = to_thermometer_data(data.payload); v.has_value()) { + // current_ = v->value; + if(handler_) + ASYNC_CHECK(handler_(*v)); + } else { + spdlog::warn("Thermometer data {} is invalid", boost::json::serialize(data.payload)); + } + co_return _void{}; + })); + co_return _void{}; +} + +awaitable_expected< void > MqttThermometer::listen() noexcept { + // Not owning the underlying client loop: nothing to do here. + // Keep an idle await so co_spawn'd task lives until cancel() (optional design). + boost::asio::steady_timer idle{strand_}; + for(;;) { + idle.expires_after(std::chrono::hours(24)); + auto [ec] = co_await idle.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable)); + if(ec == boost::asio::error::operation_aborted) { + spdlog::info("Temperature readding on {} aborted, exiting", cfg_.room); + co_return _void{}; + } + } + co_return _void{}; +} + +// Parse function: value required; others optional with defaults (now, "°C", true) +expected< Thermometer::ThermometerData, boost::system::error_code > MqttThermometer::to_thermometer_data(const boost::json::value & v) { + const auto * obj = v.if_object(); + if(!obj) { + return std::unexpected{make_error_code(std::errc::invalid_argument)}; + } + + // ---- value (required) ---- + const auto * pv = obj->if_contains("value"); + if(!pv) { + return std::unexpected{make_error_code(std::errc::invalid_argument)}; + } + auto val = ranczo::json::as_number(*pv); + if(!val) { + return std::unexpected{make_error_code(std::errc::invalid_argument)}; + } + + // ---- timestamp (optional; default: now()) ---- + std::chrono::system_clock::time_point ts = std::chrono::system_clock::now(); + if(const auto * pt = obj->if_contains("timestamp")) { + if(pt->is_string()) { + auto s = pt->as_string(); // boost::json::string_view-like + if(auto tp = date::parse_timestamp_utc({s.data(), s.size()})) { + ts = *tp; + } // else keep default 'now' + } + } + + // ---- unit (optional; default: "°C") ---- + std::string unit = R"(°C)"; + if(const auto * pu = obj->if_contains("unit")) { + if(pu->is_string()) { + auto s = pu->as_string(); + unit.assign(s.begin(), s.end()); + } + } + + // ---- update (optional; default: true) ---- + bool update = true; + if(const auto * pu = obj->if_contains("update")) { + if(pu->is_bool()) + update = pu->as_bool(); + } + + return ThermometerData{*val, ts, std::move(unit), update}; +} +} // namespace ranczo diff --git a/services/floorheat_svc/thermometer.hpp b/services/floorheat_svc/thermometer.hpp index 8dc79c2..6001056 100644 --- a/services/floorheat_svc/thermometer.hpp +++ b/services/floorheat_svc/thermometer.hpp @@ -1,24 +1,27 @@ #pragma once -#include -#include -#include -#include -#include + #include - -#include - #include -namespace ranczo { -using _void = expected< void, boost::system::error_code >; +namespace boost::json{ +class value; +} + +namespace ranczo { struct Thermometer { virtual ~Thermometer() = default; + //{"value":23.6875,"timestamp":"2025-08-14T04:00:27.614Z","unit":"°C","update":false} + struct ThermometerData { + double value; + std::chrono::system_clock::time_point timestamp; + std::string unit; + bool update; + }; // Register async handler for temperature updates (and wire subscriptions under the hood). // Return quickly after the subscription is issued. - using async_cb = std::function< awaitable_expected< void >(double) >; + using async_cb = std::function< awaitable_expected< void >(ThermometerData) >; virtual awaitable_expected< void > on_update(async_cb handler) noexcept = 0; @@ -27,31 +30,12 @@ struct Thermometer { // Stop internal activity (e.g., cancel). Optional in clean shutdown. virtual void cancel() noexcept = 0; - - // Convenience access to the latest value (if any). - virtual std::optional< double > current() const = 0; }; -inline expected< double, boost::system::error_code > to_double(const boost::json::value & v) { - if(v.is_double()) - return v.as_double(); - if(v.is_int64()) - return static_cast< double >(v.as_int64()); - if(v.is_uint64()) - return static_cast< double >(v.as_uint64()); - if(v.is_string()) { - const auto & s = v.as_string(); - double out{}; - if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{}) - return out; - } - return unexpected{make_error_code(boost::system::errc::invalid_argument)}; -} +class AsyncMqttClient; class MqttThermometer : public Thermometer { public: - using json = boost::json::value; - struct Settings { std::string room; // e.g. "bathroom" std::string key{"temperature"}; // JSON key to read if payload is an object @@ -61,70 +45,18 @@ class MqttThermometer : public Thermometer { : strand_(boost::asio::make_strand(ex)), mqtt_(mqtt), cfg_(std::move(cfg)) {} // Register handler + subscribe to the room topic. Do not block. - awaitable_expected< void > on_update(async_cb handler) noexcept override { - handler_ = std::move(handler); - - co_await mqtt_.subscribe(topic_temperature(), [this](const AsyncMqttClient::CallbackData & data) -> awaitable_expected< void > { - // Ensure controller state is touched on our strand - co_await boost::asio::dispatch(strand_, boost::asio::use_awaitable); - // Parse numeric value (root number or object[key]) - if(auto v = extract_value(data.payload)) { - current_ = *v; - if(handler_) - co_await handler_(*v); - } - co_return _void{}; - }); - co_return _void{}; - } + awaitable_expected< void > on_update(async_cb handler) noexcept override; // Perpetual loop - awaitable_expected< void > listen() noexcept override { - // Not owning the underlying client loop: nothing to do here. - // Keep an idle await so co_spawn'd task lives until cancel() (optional design). - boost::asio::steady_timer idle{strand_}; - for(;;) { - idle.expires_after(std::chrono::hours(24)); - auto [ec] = co_await idle.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable)); - if(ec == boost::asio::error::operation_aborted) - co_return _void{}; - } - co_return _void{}; - } + awaitable_expected< void > listen() noexcept override; void cancel() noexcept override { // subscriptions are managed by mqtt_ implementation - ///TODO unsubscribe + /// TODO unsubscribe } - - std::optional< double > current() const override { - return current_; - } - private: - std::string topic_temperature() const { - return "home/" + cfg_.room + "/heating/floor/temperature"; - } - - static std::optional< double > as_number(const json & v) { - if(v.is_double()) - return v.as_double(); - if(v.is_int64()) - return static_cast< double >(v.as_int64()); - if(v.is_uint64()) - return static_cast< double >(v.as_uint64()); - return std::nullopt; - } - - std::optional< double > extract_value(const json & payload) const { - if(auto num = as_number(payload)) - return num; - if(auto * obj = payload.if_object()) { - if(auto * p = obj->if_contains(cfg_.key)) - return as_number(*p); - } - return std::nullopt; - } + + expected< ThermometerData, boost::system::error_code > to_thermometer_data(const boost::json::value & v); private: boost::asio::strand< boost::asio::any_io_executor > strand_; @@ -132,7 +64,6 @@ class MqttThermometer : public Thermometer { Settings cfg_{}; async_cb handler_{}; - std::optional< double > current_{}; }; } // namespace ranczo