Add thermometer class + refactor

This commit is contained in:
Bartosz Wieczorek 2025-08-14 11:06:13 +02:00
parent 17795d9835
commit 228401f5bb
6 changed files with 233 additions and 157 deletions

View File

@ -11,9 +11,12 @@ namespace ranczo {
template < typename T >
using awaitable_expected = boost::asio::awaitable< std::expected< T, boost::system::error_code > >;
template < typename Tp, typename Er >
using expected = std::expected< Tp, Er >;
using _void = expected< void, boost::system::error_code >;
template < typename T >
using unexpected = std::unexpected< T >;
@ -24,13 +27,13 @@ using ::boost::system::errc::make_error_code;
// if(auto _res = co_await (expr); !_res) \
// co_return std::unexpected(_res.error())
#define SYNC_TRY(failable) \
({ \
auto _result = failable; \
if(!_result) \
return unexpected{_result.error()}; \
*_result; \
})
#define SYNC_TRY(failable) \
({ \
auto _result = failable; \
if(!_result) \
return unexpected{_result.error()}; \
*_result; \
})
#define TRY(failable) \
({ \
@ -48,22 +51,22 @@ using ::boost::system::errc::make_error_code;
*_result; \
})
#define ASYNC_TRY_TRANSFORM_ERROR(failable, transform) \
({ \
auto _result = co_await (failable); \
if(!_result) \
#define ASYNC_TRY_TRANSFORM_ERROR(failable, transform) \
({ \
auto _result = co_await (failable); \
if(!_result) \
co_return unexpected{transform(_result.error())}; \
*_result; \
*_result; \
})
#define ASYNC_TRY_MSG(failable, ...) \
({ \
auto _result = co_await (failable); \
if(!_result) { \
spdlog::error(__VA_ARGS__); \
#define ASYNC_TRY_MSG(failable, ...) \
({ \
auto _result = co_await (failable); \
if(!_result) { \
spdlog::error(__VA_ARGS__); \
co_return unexpected{_result.error()}; \
} \
*_result; \
} \
*_result; \
})
// // Same as above, but log a custom message on error before propagating
@ -73,20 +76,20 @@ using ::boost::system::errc::make_error_code;
// co_return std::unexpected(_res.error()); \
// }
#define ASYNC_CHECK(failable) \
do { \
auto _result = co_await (failable); \
if(!_result) \
co_return unexpected(_result.error()); \
#define ASYNC_CHECK(failable) \
do { \
auto _result = co_await (failable); \
if(!_result) \
co_return std::unexpected(_result.error()); \
} while(0)
#define ASYNC_CHECK_MSG(failable, ...) \
do { \
auto _result = co_await (failable); \
if(!_result) { \
spdlog::error(__VA_ARGS__); \
#define ASYNC_CHECK_MSG(failable, ...) \
do { \
auto _result = co_await (failable); \
if(!_result) { \
spdlog::error(__VA_ARGS__); \
co_return unexpected{_result.error()}; \
} \
} \
} while(0)
// // Check an awaitable that returns expected<T, E>, but DO NOT propagate

View File

@ -1,6 +1,7 @@
#include <memory>
#include <boost/algorithm/string/compare.hpp>
#include <ranczo-io/utils/mqtt_client.hpp>
#include <boost/json/serialize.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
@ -8,14 +9,20 @@
#include <boost/asio/strand.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/algorithm/string/split.hpp>
#include <spdlog/spdlog.h>
#include "config.hpp"
#include "services/floorheat_svc/relay.hpp"
#include "services/floorheat_svc/thermometer.hpp"
#include "temperature_controller.hpp"
#include <ranczo-io/utils/mqtt_client.hpp>
#include <expected>
#include <memory>
#include <csignal>
#include <string>
namespace ranczo {
@ -35,6 +42,39 @@ using namespace std::string_view_literals;
std::atomic< bool > running = true;
boost::asio::io_context * g_io = nullptr;
inline ranczo::awaitable_expected< void > forward_floor_temperature_all_rooms(ranczo::AsyncMqttClient & mqtt) {
const std::string filter = "home/+/floor/temperature";
spdlog::info("Registering subscribtion for {}", filter);
auto ec =
co_await mqtt.subscribe(filter, [&mqtt](const ranczo::AsyncMqttClient::CallbackData & data) -> ranczo::awaitable_expected< void > {
spdlog::debug("Got temperature on old topic: {}, trying to republish", data.topic);
std::string room;
{
// topic format: home/{room}/floor/temperature
std::vector< std::string > parts{};
boost::algorithm::split(parts, data.topic, [](auto ch) { return ch == '/'; });
if(parts.size() == 4) // got ok topic
room = parts.at(1);
}
if(!room.empty()) {
std::string dst = "home/" + room + "/sensor/floor/temperature";
spdlog::info("Republishing temperature from: {} to: {}", data.topic, dst);
ASYNC_CHECK(mqtt.publish(dst, data.payload));
}
co_return ranczo::_void{};
});
spdlog::trace("Subscribtion pass, checking for errors");
if(!ec) {
spdlog::error("Got error from subscribe: {}", ec.error().message());
}
spdlog::trace("All good, returning");
co_return ranczo::_void{};
}
void signal_handler(int signum) {
spdlog::warn("Signal received: {}, stopping io_context...", signum);
running = false;
@ -58,6 +98,8 @@ int main() {
boost::asio::any_io_executor io_executor = io_context.get_executor();
ranczo::AsyncMqttClient mqttClient{io_executor};
co_spawn(io_executor, forward_floor_temperature_all_rooms(mqttClient), boost::asio::detached);
auto relayThermostatFactory = [&](auto room) {
spdlog::debug("ThermostatFactory room {}: create relay", room);
auto relay = std::make_unique< ranczo::MqttRelay >(io_executor, mqttClient);
@ -71,7 +113,7 @@ int main() {
return std::make_shared< ranczo::RelayThermostat >(io_executor, mqttClient, std::move(relay), std::move(thermo), room);
};
// // PARTER
// Floor 0
_heaters.emplace_back(relayThermostatFactory("corridor"sv));
_heaters.emplace_back(relayThermostatFactory("utilityRoom"sv));
_heaters.emplace_back(relayThermostatFactory("wardrobe"sv));
@ -83,7 +125,7 @@ int main() {
_heaters.emplace_back(relayThermostatFactory("livingroom_zone2"sv));
_heaters.emplace_back(relayThermostatFactory("livingroom_zone3"sv));
// // PIĘTRO
// Floor 1
_heaters.emplace_back(relayThermostatFactory("askaRoom"sv));
_heaters.emplace_back(relayThermostatFactory("maciejRoom"sv));
_heaters.emplace_back(relayThermostatFactory("playroom"sv));

View File

@ -1,5 +1,6 @@
#pragma once
#include "services/floorheat_svc/thermometer.hpp"
#include <config.hpp>
#include <ranczo-io/utils/mqtt_client.hpp>
@ -16,11 +17,13 @@ class Relay {
class MqttRelay : public Relay {
// Relay interface
public:
MqttRelay(boost::asio::any_io_executor ex, AsyncMqttClient & mqtt){
MqttRelay(boost::asio::any_io_executor ex, AsyncMqttClient & mqtt) {}
awaitable_expected< void > on() noexcept override {
co_return _void{};
}
awaitable_expected< void > off() noexcept override {
co_return _void{};
}
ranczo::awaitable_expected< void > on() noexcept override {}
ranczo::awaitable_expected< void > off() noexcept override {}
};
} // namespace ranczo

View File

@ -1,7 +1,6 @@
#include "temperature_controller.hpp"
#include "config.hpp"
#include "services/floorheat_svc/thermometer.hpp"
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
@ -9,11 +8,15 @@
#include <boost/system/detail/errc.hpp>
#include <boost/system/errc.hpp>
#include <boost/system/result.hpp>
#include <functional>
#include <memory>
#include <spdlog/spdlog.h>
#include "thermometer.hpp"
#include <ranczo-io/utils/mqtt_client.hpp>
#include <ranczo-io/utils/mqtt_topic_builder.hpp>
#include <ranczo-io/utils/json_helpers.hpp>
#include <chrono>
@ -33,8 +36,6 @@
*/
namespace ranczo {
using heater_expected_void = expected< void, boost::system::error_code >;
struct TemperatureMeasurement {
std::chrono::system_clock::time_point when;
double temperature_C;
@ -68,8 +69,8 @@ struct RelayThermostat::Impl : private boost::noncopyable {
std::unique_ptr< Thermometer > thermometer,
std::string_view room)
: _io{io}, _mqtt{mqtt}, _relay{std::move(relay)}, _temp{std::move(thermometer)}, _tickTimer{_io}, _room{room}, _measurements{100} {
BOOST_ASSERT(relay);
BOOST_ASSERT(not room.empty());
BOOST_ASSERT(_relay);
BOOST_ASSERT(not _room.empty());
}
~Impl() = default;
@ -148,29 +149,15 @@ struct RelayThermostat::Impl : private boost::noncopyable {
/// TODO fix
spdlog::trace("RelayThermostat room {} subscribing to {}", _room, topic);
ASYNC_CHECK_MSG(_mqtt.subscribe(topic, std::move(cb)), "Heater faild to subscribe on: {}", topic);
co_return heater_expected_void{};
}
inline expected< double, boost::system::error_code > to_double(const boost::json::value & v) const {
if(v.is_double())
return v.as_double();
if(v.is_int64())
return static_cast< double >(v.as_int64());
if(v.is_uint64())
return static_cast< double >(v.as_uint64());
if(v.is_string()) {
const auto & s = v.as_string();
double out{};
if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{})
return out;
}
return unexpected{make_error_code(boost::system::errc::invalid_argument)};
co_return _void{};
}
inline expected< double, boost::system::error_code > get_value(const boost::json::value & jv, std::string_view key) {
if(auto * obj = jv.if_object()) {
if(auto * pv = obj->if_contains(key)) {
return SYNC_TRY(to_double(*pv));
auto ovalue = json::as_number(*pv).value();
if(ovalue)
return ovalue;
}
}
return unexpected{make_error_code(boost::system::errc::invalid_argument)};
@ -183,26 +170,41 @@ struct RelayThermostat::Impl : private boost::noncopyable {
targetTemperature = TRY(get_value(data.payload, "value"));
spdlog::trace("Heater target temperature update {} for {}", targetTemperature, _room);
update = true;
co_return heater_expected_void{};
co_return _void{};
};
ASYNC_CHECK(subscribe(topic, std::move(cb)));
co_return heater_expected_void{};
co_return _void{};
}
awaitable_expected<void> temperatureUpdate(const Thermometer::ThermometerData &data){
spdlog::info("Got temperature update for: {}", _room);
/// TODO sprawdzenie czy temp < treshold
/// TODO ustawienie przekaźniczka
/// TODO update watchdog'a
co_return _void{};
}
awaitable_expected< void > start() {
using namespace std::placeholders;
// subscribe to a thermometer readings
boost::asio::co_spawn(_io, _temp->listen(), boost::asio::detached);
ASYNC_CHECK(_temp->on_update(std::bind(&Impl::temperatureUpdate, this, _1)));
// subscribe to a thermostat commands feed
ASYNC_CHECK_MSG(subscribeToCommandUpdate(), "subscribe to temp update failed");
ASYNC_CHECK_MSG(subscribeToCommandUpdate(), "subscribe to command stream failed");
/// TODO subscribe to energy measurements
// detaching listening thread
boost::asio::co_spawn(_io, _temp->listen(), boost::asio::detached);
// ASYNC_CHECK_MSG(_tickTimer.start(), "failed to start timer");
// ASYNC_CHECK_MSG(_mqtt.listen(), "failed to listen");
co_return heater_expected_void{};
co_return _void{};
}
};

View File

@ -0,0 +1,95 @@
#include "thermometer.hpp"
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/json/serialize.hpp>
#include <spdlog/spdlog.h>
#include <ranczo-io/utils/date_utils.hpp>
#include <ranczo-io/utils/json_helpers.hpp>
#include <ranczo-io/utils/mqtt_client.hpp>
#include <ranczo-io/utils/mqtt_topic_builder.hpp>
namespace ranczo {
awaitable_expected< void > MqttThermometer::on_update(async_cb handler) noexcept {
handler_ = std::move(handler);
auto token = ASYNC_TRY(mqtt_.subscribe(
topic::temperature::floor(cfg_.room), [this](const AsyncMqttClient::CallbackData & data) -> awaitable_expected< void > {
// Ensure controller state is touched on our strand
co_await boost::asio::dispatch(strand_, boost::asio::use_awaitable);
// Parse numeric value (root number or object[key])
if(auto v = to_thermometer_data(data.payload); v.has_value()) {
// current_ = v->value;
if(handler_)
ASYNC_CHECK(handler_(*v));
} else {
spdlog::warn("Thermometer data {} is invalid", boost::json::serialize(data.payload));
}
co_return _void{};
}));
co_return _void{};
}
awaitable_expected< void > MqttThermometer::listen() noexcept {
// Not owning the underlying client loop: nothing to do here.
// Keep an idle await so co_spawn'd task lives until cancel() (optional design).
boost::asio::steady_timer idle{strand_};
for(;;) {
idle.expires_after(std::chrono::hours(24));
auto [ec] = co_await idle.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec == boost::asio::error::operation_aborted) {
spdlog::info("Temperature readding on {} aborted, exiting", cfg_.room);
co_return _void{};
}
}
co_return _void{};
}
// Parse function: value required; others optional with defaults (now, "°C", true)
expected< Thermometer::ThermometerData, boost::system::error_code > MqttThermometer::to_thermometer_data(const boost::json::value & v) {
const auto * obj = v.if_object();
if(!obj) {
return std::unexpected{make_error_code(std::errc::invalid_argument)};
}
// ---- value (required) ----
const auto * pv = obj->if_contains("value");
if(!pv) {
return std::unexpected{make_error_code(std::errc::invalid_argument)};
}
auto val = ranczo::json::as_number(*pv);
if(!val) {
return std::unexpected{make_error_code(std::errc::invalid_argument)};
}
// ---- timestamp (optional; default: now()) ----
std::chrono::system_clock::time_point ts = std::chrono::system_clock::now();
if(const auto * pt = obj->if_contains("timestamp")) {
if(pt->is_string()) {
auto s = pt->as_string(); // boost::json::string_view-like
if(auto tp = date::parse_timestamp_utc({s.data(), s.size()})) {
ts = *tp;
} // else keep default 'now'
}
}
// ---- unit (optional; default: "°C") ----
std::string unit = R"(°C)";
if(const auto * pu = obj->if_contains("unit")) {
if(pu->is_string()) {
auto s = pu->as_string();
unit.assign(s.begin(), s.end());
}
}
// ---- update (optional; default: true) ----
bool update = true;
if(const auto * pu = obj->if_contains("update")) {
if(pu->is_bool())
update = pu->as_bool();
}
return ThermometerData{*val, ts, std::move(unit), update};
}
} // namespace ranczo

View File

@ -1,24 +1,27 @@
#pragma once
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/json/value.hpp>
#include <charconv>
#include <config.hpp>
#include <ranczo-io/utils/mqtt_client.hpp>
#include <boost/asio/strand.hpp>
namespace ranczo {
using _void = expected< void, boost::system::error_code >;
namespace boost::json{
class value;
}
namespace ranczo {
struct Thermometer {
virtual ~Thermometer() = default;
//{"value":23.6875,"timestamp":"2025-08-14T04:00:27.614Z","unit":"°C","update":false}
struct ThermometerData {
double value;
std::chrono::system_clock::time_point timestamp;
std::string unit;
bool update;
};
// Register async handler for temperature updates (and wire subscriptions under the hood).
// Return quickly after the subscription is issued.
using async_cb = std::function< awaitable_expected< void >(double) >;
using async_cb = std::function< awaitable_expected< void >(ThermometerData) >;
virtual awaitable_expected< void > on_update(async_cb handler) noexcept = 0;
@ -27,31 +30,12 @@ struct Thermometer {
// Stop internal activity (e.g., cancel). Optional in clean shutdown.
virtual void cancel() noexcept = 0;
// Convenience access to the latest value (if any).
virtual std::optional< double > current() const = 0;
};
inline expected< double, boost::system::error_code > to_double(const boost::json::value & v) {
if(v.is_double())
return v.as_double();
if(v.is_int64())
return static_cast< double >(v.as_int64());
if(v.is_uint64())
return static_cast< double >(v.as_uint64());
if(v.is_string()) {
const auto & s = v.as_string();
double out{};
if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{})
return out;
}
return unexpected{make_error_code(boost::system::errc::invalid_argument)};
}
class AsyncMqttClient;
class MqttThermometer : public Thermometer {
public:
using json = boost::json::value;
struct Settings {
std::string room; // e.g. "bathroom"
std::string key{"temperature"}; // JSON key to read if payload is an object
@ -61,70 +45,18 @@ class MqttThermometer : public Thermometer {
: strand_(boost::asio::make_strand(ex)), mqtt_(mqtt), cfg_(std::move(cfg)) {}
// Register handler + subscribe to the room topic. Do not block.
awaitable_expected< void > on_update(async_cb handler) noexcept override {
handler_ = std::move(handler);
co_await mqtt_.subscribe(topic_temperature(), [this](const AsyncMqttClient::CallbackData & data) -> awaitable_expected< void > {
// Ensure controller state is touched on our strand
co_await boost::asio::dispatch(strand_, boost::asio::use_awaitable);
// Parse numeric value (root number or object[key])
if(auto v = extract_value(data.payload)) {
current_ = *v;
if(handler_)
co_await handler_(*v);
}
co_return _void{};
});
co_return _void{};
}
awaitable_expected< void > on_update(async_cb handler) noexcept override;
// Perpetual loop
awaitable_expected< void > listen() noexcept override {
// Not owning the underlying client loop: nothing to do here.
// Keep an idle await so co_spawn'd task lives until cancel() (optional design).
boost::asio::steady_timer idle{strand_};
for(;;) {
idle.expires_after(std::chrono::hours(24));
auto [ec] = co_await idle.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable));
if(ec == boost::asio::error::operation_aborted)
co_return _void{};
}
co_return _void{};
}
awaitable_expected< void > listen() noexcept override;
void cancel() noexcept override {
// subscriptions are managed by mqtt_ implementation
///TODO unsubscribe
/// TODO unsubscribe
}
std::optional< double > current() const override {
return current_;
}
private:
std::string topic_temperature() const {
return "home/" + cfg_.room + "/heating/floor/temperature";
}
static std::optional< double > as_number(const json & v) {
if(v.is_double())
return v.as_double();
if(v.is_int64())
return static_cast< double >(v.as_int64());
if(v.is_uint64())
return static_cast< double >(v.as_uint64());
return std::nullopt;
}
std::optional< double > extract_value(const json & payload) const {
if(auto num = as_number(payload))
return num;
if(auto * obj = payload.if_object()) {
if(auto * p = obj->if_contains(cfg_.key))
return as_number(*p);
}
return std::nullopt;
}
expected< ThermometerData, boost::system::error_code > to_thermometer_data(const boost::json::value & v);
private:
boost::asio::strand< boost::asio::any_io_executor > strand_;
@ -132,7 +64,6 @@ class MqttThermometer : public Thermometer {
Settings cfg_{};
async_cb handler_{};
std::optional< double > current_{};
};
} // namespace ranczo