139 lines
4.6 KiB
C++
139 lines
4.6 KiB
C++
#pragma once
|
|
#include <boost/asio/as_tuple.hpp>
|
|
#include <boost/asio/steady_timer.hpp>
|
|
#include <boost/asio/use_awaitable.hpp>
|
|
#include <boost/json/value.hpp>
|
|
#include <charconv>
|
|
#include <config.hpp>
|
|
|
|
#include <ranczo-io/utils/mqtt_client.hpp>
|
|
|
|
#include <boost/asio/strand.hpp>
|
|
namespace ranczo {
|
|
|
|
using _void = expected< void, boost::system::error_code >;
|
|
|
|
struct Thermometer {
|
|
virtual ~Thermometer() = default;
|
|
|
|
// Register async handler for temperature updates (and wire subscriptions under the hood).
|
|
// Return quickly after the subscription is issued.
|
|
using async_cb = std::function< awaitable_expected< void >(double) >;
|
|
|
|
virtual awaitable_expected< void > on_update(async_cb handler) noexcept = 0;
|
|
|
|
// Perpetual listening loop (spawn this, do not co_await in start-up code).
|
|
virtual awaitable_expected< void > listen() noexcept = 0;
|
|
|
|
// Stop internal activity (e.g., cancel). Optional in clean shutdown.
|
|
virtual void cancel() noexcept = 0;
|
|
|
|
// Convenience access to the latest value (if any).
|
|
virtual std::optional< double > current() const = 0;
|
|
};
|
|
|
|
inline expected< double, boost::system::error_code > to_double(const boost::json::value & v) {
|
|
if(v.is_double())
|
|
return v.as_double();
|
|
if(v.is_int64())
|
|
return static_cast< double >(v.as_int64());
|
|
if(v.is_uint64())
|
|
return static_cast< double >(v.as_uint64());
|
|
if(v.is_string()) {
|
|
const auto & s = v.as_string();
|
|
double out{};
|
|
if(auto res = std::from_chars(s.data(), s.data() + s.size(), out); res.ec == std::errc{})
|
|
return out;
|
|
}
|
|
return unexpected{make_error_code(boost::system::errc::invalid_argument)};
|
|
}
|
|
|
|
class MqttThermometer : public Thermometer {
|
|
public:
|
|
using json = boost::json::value;
|
|
|
|
struct Settings {
|
|
std::string room; // e.g. "bathroom"
|
|
std::string key{"temperature"}; // JSON key to read if payload is an object
|
|
};
|
|
|
|
MqttThermometer(boost::asio::any_io_executor ex, AsyncMqttClient & mqtt, Settings cfg)
|
|
: strand_(boost::asio::make_strand(ex)), mqtt_(mqtt), cfg_(std::move(cfg)) {}
|
|
|
|
// Register handler + subscribe to the room topic. Do not block.
|
|
awaitable_expected< void > on_update(async_cb handler) noexcept override {
|
|
handler_ = std::move(handler);
|
|
|
|
co_await mqtt_.subscribe(topic_temperature(), [this](const AsyncMqttClient::CallbackData & data) -> awaitable_expected< void > {
|
|
// Ensure controller state is touched on our strand
|
|
co_await boost::asio::dispatch(strand_, boost::asio::use_awaitable);
|
|
// Parse numeric value (root number or object[key])
|
|
if(auto v = extract_value(data.payload)) {
|
|
current_ = *v;
|
|
if(handler_)
|
|
co_await handler_(*v);
|
|
}
|
|
co_return _void{};
|
|
});
|
|
co_return _void{};
|
|
}
|
|
|
|
// Perpetual loop
|
|
awaitable_expected< void > listen() noexcept override {
|
|
// Not owning the underlying client loop: nothing to do here.
|
|
// Keep an idle await so co_spawn'd task lives until cancel() (optional design).
|
|
boost::asio::steady_timer idle{strand_};
|
|
for(;;) {
|
|
idle.expires_after(std::chrono::hours(24));
|
|
auto [ec] = co_await idle.async_wait(boost::asio::as_tuple(boost::asio::use_awaitable));
|
|
if(ec == boost::asio::error::operation_aborted)
|
|
co_return _void{};
|
|
}
|
|
co_return _void{};
|
|
}
|
|
|
|
void cancel() noexcept override {
|
|
// subscriptions are managed by mqtt_ implementation
|
|
///TODO unsubscribe
|
|
}
|
|
|
|
std::optional< double > current() const override {
|
|
return current_;
|
|
}
|
|
|
|
private:
|
|
std::string topic_temperature() const {
|
|
return "home/" + cfg_.room + "/heating/floor/temperature";
|
|
}
|
|
|
|
static std::optional< double > as_number(const json & v) {
|
|
if(v.is_double())
|
|
return v.as_double();
|
|
if(v.is_int64())
|
|
return static_cast< double >(v.as_int64());
|
|
if(v.is_uint64())
|
|
return static_cast< double >(v.as_uint64());
|
|
return std::nullopt;
|
|
}
|
|
|
|
std::optional< double > extract_value(const json & payload) const {
|
|
if(auto num = as_number(payload))
|
|
return num;
|
|
if(auto * obj = payload.if_object()) {
|
|
if(auto * p = obj->if_contains(cfg_.key))
|
|
return as_number(*p);
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
private:
|
|
boost::asio::strand< boost::asio::any_io_executor > strand_;
|
|
AsyncMqttClient & mqtt_;
|
|
Settings cfg_{};
|
|
|
|
async_cb handler_{};
|
|
std::optional< double > current_{};
|
|
};
|
|
|
|
} // namespace ranczo
|