diff --git a/services/floorheat_svc/heater.cpp b/services/floorheat_svc/heater.cpp index c903339..0ec3cde 100644 --- a/services/floorheat_svc/heater.cpp +++ b/services/floorheat_svc/heater.cpp @@ -133,30 +133,32 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable { } } - awaitable_expected< void > subscribe(std::string_view topic, std::function< void(const boost::json::value & object) > cb) { + awaitable_expected< void > subscribe(std::string_view topic, + std::function< awaitable_expected< void >(const boost::json::value & object) > cb) { spdlog::trace("Heater subscribing to {}", topic); ASYNC_CHECK_MSG(_mqtt_client.subscribe(topic, std::move(cb)), "Heater faild to subscribe on: {}", topic); co_return heater_expected_void{}; } - - inline double to_double(const boost::json::value& v) const { - if (v.is_double()) + + inline double to_double(const boost::json::value & v) const { + if(v.is_double()) return v.as_double(); - else if (v.is_int64()) - return static_cast(v.as_int64()); - else if (v.is_uint64()) - return static_cast(v.as_uint64()); + else if(v.is_int64()) + return static_cast< double >(v.as_int64()); + else if(v.is_uint64()) + return static_cast< double >(v.as_uint64()); throw std::runtime_error("Invalid type for double conversion"); } - + awaitable_expected< void > subscribeToTemperatureUpdate() { auto topic = std::format("home/{}/floor/temperature", _room); - auto cb = [=, this](const boost::json::value & object) { + auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > { temperature = to_double(object.at("value")); spdlog::trace("Heater temperature update {} for {}", temperature, _room); _measurements.push_back({std::chrono::system_clock::now(), temperature}); update = true; + co_return heater_expected_void{}; }; ASYNC_CHECK(subscribe(topic, std::move(cb))); @@ -166,10 +168,11 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable { awaitable_expected< void > subscribeToTargetTemperatureUpdate() { auto topic = std::format("home/heating/{}/floor/temperature/command", _room); - auto cb = [=, this](const boost::json::value & object) { + auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > { targetTemperature = to_double(object.at("value")); spdlog::trace("Heater target temperature update {} for {}", targetTemperature, _room); update = true; + co_return heater_expected_void{}; }; ASYNC_CHECK(subscribe(topic, std::move(cb))); @@ -179,8 +182,9 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable { awaitable_expected< void > subscribeToTargetProfileUpdate() { auto topic = std::format("home/{}/floor/heating/profile/set", _room); - auto cb = [=, this](const boost::json::value & object) { // + auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > { // spdlog::warn("not implemented"); + co_return heater_expected_void{}; }; ASYNC_CHECK(subscribe(topic, std::move(cb))); @@ -205,7 +209,7 @@ ResistiveFloorHeater::ResistiveFloorHeater(boost::asio::any_io_executor & io, st ResistiveFloorHeater::~ResistiveFloorHeater() = default; awaitable_expected< void > ResistiveFloorHeater::start() noexcept { BOOST_ASSERT(_impl); - + return _impl->start(); } diff --git a/services/floorheat_svc/mqtt_client.cpp b/services/floorheat_svc/mqtt_client.cpp index 286f84a..a23cc25 100644 --- a/services/floorheat_svc/mqtt_client.cpp +++ b/services/floorheat_svc/mqtt_client.cpp @@ -56,7 +56,7 @@ using mqtt_expected_void = expected< void, boost::system::error_code >; struct AsyncMqttClient::AsyncMqttClientImpl { const boost::asio::any_io_executor & _executor; client_type _mqtt_client; - std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks; + std::unordered_map< std::string, std::function< awaitable_expected< void >(const boost::json::value &) > > _callbacks; AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} { spdlog::trace("Creating mqtt client"); @@ -137,7 +137,10 @@ struct AsyncMqttClient::AsyncMqttClientImpl { /// TODO topic filtering as part of unordered map? /// unordered map can map only one thing, maybe vector would be better? spdlog::debug("MQTT executing callback for: {}", topic); - it->second(value); /// TODO pass topic to callbacl? + if(auto ret = co_await it->second(value); not ret) { + spdlog::warn("MQTT catched error: {}, during callback execution", ret.error().message()); + continue; + } } else { spdlog::warn("MQTT received unsupported topic: {}", topic); } @@ -162,7 +165,7 @@ struct AsyncMqttClient::AsyncMqttClientImpl { AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} -awaitable_expected AsyncMqttClient::subscribe(std::string_view topic, std::function< void(const boost::json::value &) > cb) noexcept { +awaitable_expected AsyncMqttClient::subscribe(std::string_view topic, std::function< awaitable_expected< void >(const boost::json::value &) > cb) noexcept { BOOST_ASSERT(_impl); BOOST_ASSERT(not topic.empty()); BOOST_ASSERT(cb); diff --git a/services/floorheat_svc/mqtt_client.hpp b/services/floorheat_svc/mqtt_client.hpp index c9d277c..b61d562 100644 --- a/services/floorheat_svc/mqtt_client.hpp +++ b/services/floorheat_svc/mqtt_client.hpp @@ -3,6 +3,7 @@ #include #include +#include #include namespace boost::json { @@ -24,12 +25,13 @@ class AsyncMqttClient { std::string_view topic; /* response topic */ - std::string_view responseTopic; + std::optional responseTopic; /* value assosiated to request */ const boost::json::value & value; /* memory_resource of client */ + // TBD }; struct AsyncMqttClientImpl; @@ -38,7 +40,7 @@ class AsyncMqttClient { AsyncMqttClient(const executor & executor); ~AsyncMqttClient(); - awaitable_expected< void > subscribe(std::string_view topic, std::function< void(const boost::json::value & value) >) noexcept; + awaitable_expected< void > subscribe(std::string_view topic, std::function< awaitable_expected< void >(const boost::json::value & value) >) noexcept; awaitable_expected< void > listen() const noexcept; };