change callbacks to be async

This commit is contained in:
Bartosz Wieczorek 2025-08-05 14:45:02 +02:00
parent 549ddfc615
commit b8cc238b5e
3 changed files with 27 additions and 18 deletions

View File

@ -133,30 +133,32 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable {
} }
} }
awaitable_expected< void > subscribe(std::string_view topic, std::function< void(const boost::json::value & object) > cb) { awaitable_expected< void > subscribe(std::string_view topic,
std::function< awaitable_expected< void >(const boost::json::value & object) > cb) {
spdlog::trace("Heater subscribing to {}", topic); spdlog::trace("Heater subscribing to {}", topic);
ASYNC_CHECK_MSG(_mqtt_client.subscribe(topic, std::move(cb)), "Heater faild to subscribe on: {}", topic); ASYNC_CHECK_MSG(_mqtt_client.subscribe(topic, std::move(cb)), "Heater faild to subscribe on: {}", topic);
co_return heater_expected_void{}; co_return heater_expected_void{};
} }
inline double to_double(const boost::json::value& v) const { inline double to_double(const boost::json::value & v) const {
if (v.is_double()) if(v.is_double())
return v.as_double(); return v.as_double();
else if (v.is_int64()) else if(v.is_int64())
return static_cast<double>(v.as_int64()); return static_cast< double >(v.as_int64());
else if (v.is_uint64()) else if(v.is_uint64())
return static_cast<double>(v.as_uint64()); return static_cast< double >(v.as_uint64());
throw std::runtime_error("Invalid type for double conversion"); throw std::runtime_error("Invalid type for double conversion");
} }
awaitable_expected< void > subscribeToTemperatureUpdate() { awaitable_expected< void > subscribeToTemperatureUpdate() {
auto topic = std::format("home/{}/floor/temperature", _room); auto topic = std::format("home/{}/floor/temperature", _room);
auto cb = [=, this](const boost::json::value & object) { auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > {
temperature = to_double(object.at("value")); temperature = to_double(object.at("value"));
spdlog::trace("Heater temperature update {} for {}", temperature, _room); spdlog::trace("Heater temperature update {} for {}", temperature, _room);
_measurements.push_back({std::chrono::system_clock::now(), temperature}); _measurements.push_back({std::chrono::system_clock::now(), temperature});
update = true; update = true;
co_return heater_expected_void{};
}; };
ASYNC_CHECK(subscribe(topic, std::move(cb))); ASYNC_CHECK(subscribe(topic, std::move(cb)));
@ -166,10 +168,11 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable {
awaitable_expected< void > subscribeToTargetTemperatureUpdate() { awaitable_expected< void > subscribeToTargetTemperatureUpdate() {
auto topic = std::format("home/heating/{}/floor/temperature/command", _room); auto topic = std::format("home/heating/{}/floor/temperature/command", _room);
auto cb = [=, this](const boost::json::value & object) { auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > {
targetTemperature = to_double(object.at("value")); targetTemperature = to_double(object.at("value"));
spdlog::trace("Heater target temperature update {} for {}", targetTemperature, _room); spdlog::trace("Heater target temperature update {} for {}", targetTemperature, _room);
update = true; update = true;
co_return heater_expected_void{};
}; };
ASYNC_CHECK(subscribe(topic, std::move(cb))); ASYNC_CHECK(subscribe(topic, std::move(cb)));
@ -179,8 +182,9 @@ struct ResistiveFloorHeater::Impl : private boost::noncopyable {
awaitable_expected< void > subscribeToTargetProfileUpdate() { awaitable_expected< void > subscribeToTargetProfileUpdate() {
auto topic = std::format("home/{}/floor/heating/profile/set", _room); auto topic = std::format("home/{}/floor/heating/profile/set", _room);
auto cb = [=, this](const boost::json::value & object) { // auto cb = [=, this](const boost::json::value & object) -> awaitable_expected< void > { //
spdlog::warn("not implemented"); spdlog::warn("not implemented");
co_return heater_expected_void{};
}; };
ASYNC_CHECK(subscribe(topic, std::move(cb))); ASYNC_CHECK(subscribe(topic, std::move(cb)));
@ -205,7 +209,7 @@ ResistiveFloorHeater::ResistiveFloorHeater(boost::asio::any_io_executor & io, st
ResistiveFloorHeater::~ResistiveFloorHeater() = default; ResistiveFloorHeater::~ResistiveFloorHeater() = default;
awaitable_expected< void > ResistiveFloorHeater::start() noexcept { awaitable_expected< void > ResistiveFloorHeater::start() noexcept {
BOOST_ASSERT(_impl); BOOST_ASSERT(_impl);
return _impl->start(); return _impl->start();
} }

View File

@ -56,7 +56,7 @@ using mqtt_expected_void = expected< void, boost::system::error_code >;
struct AsyncMqttClient::AsyncMqttClientImpl { struct AsyncMqttClient::AsyncMqttClientImpl {
const boost::asio::any_io_executor & _executor; const boost::asio::any_io_executor & _executor;
client_type _mqtt_client; client_type _mqtt_client;
std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks; std::unordered_map< std::string, std::function< awaitable_expected< void >(const boost::json::value &) > > _callbacks;
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} { AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} {
spdlog::trace("Creating mqtt client"); spdlog::trace("Creating mqtt client");
@ -137,7 +137,10 @@ struct AsyncMqttClient::AsyncMqttClientImpl {
/// TODO topic filtering as part of unordered map? /// TODO topic filtering as part of unordered map?
/// unordered map can map only one thing, maybe vector would be better? /// unordered map can map only one thing, maybe vector would be better?
spdlog::debug("MQTT executing callback for: {}", topic); spdlog::debug("MQTT executing callback for: {}", topic);
it->second(value); /// TODO pass topic to callbacl? if(auto ret = co_await it->second(value); not ret) {
spdlog::warn("MQTT catched error: {}, during callback execution", ret.error().message());
continue;
}
} else { } else {
spdlog::warn("MQTT received unsupported topic: {}", topic); spdlog::warn("MQTT received unsupported topic: {}", topic);
} }
@ -162,7 +165,7 @@ struct AsyncMqttClient::AsyncMqttClientImpl {
AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor)
: _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {}
awaitable_expected<void> AsyncMqttClient::subscribe(std::string_view topic, std::function< void(const boost::json::value &) > cb) noexcept { awaitable_expected<void> AsyncMqttClient::subscribe(std::string_view topic, std::function< awaitable_expected< void >(const boost::json::value &) > cb) noexcept {
BOOST_ASSERT(_impl); BOOST_ASSERT(_impl);
BOOST_ASSERT(not topic.empty()); BOOST_ASSERT(not topic.empty());
BOOST_ASSERT(cb); BOOST_ASSERT(cb);

View File

@ -3,6 +3,7 @@
#include <config.hpp> #include <config.hpp>
#include <memory> #include <memory>
#include <optional>
#include <string_view> #include <string_view>
namespace boost::json { namespace boost::json {
@ -24,12 +25,13 @@ class AsyncMqttClient {
std::string_view topic; std::string_view topic;
/* response topic */ /* response topic */
std::string_view responseTopic; std::optional<std::string_view> responseTopic;
/* value assosiated to request */ /* value assosiated to request */
const boost::json::value & value; const boost::json::value & value;
/* memory_resource of client */ /* memory_resource of client */
// TBD
}; };
struct AsyncMqttClientImpl; struct AsyncMqttClientImpl;
@ -38,7 +40,7 @@ class AsyncMqttClient {
AsyncMqttClient(const executor & executor); AsyncMqttClient(const executor & executor);
~AsyncMqttClient(); ~AsyncMqttClient();
awaitable_expected< void > subscribe(std::string_view topic, std::function< void(const boost::json::value & value) >) noexcept; awaitable_expected< void > subscribe(std::string_view topic, std::function< awaitable_expected< void >(const boost::json::value & value) >) noexcept;
awaitable_expected< void > listen() const noexcept; awaitable_expected< void > listen() const noexcept;
}; };