diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a29564..d9531e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,10 +4,45 @@ project(Ranczo-IO) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) -find_package(Protobuf REQUIRED) -include_directories(${Protobuf_INCLUDE_DIRS}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) +# find_package(Protobuf REQUIRED) +# include_directories(${Protobuf_INCLUDE_DIRS}) -find_package(cppzmq) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) +set(BOOST_ROOT /usr/local) +find_package(Boost REQUIRED COMPONENTS system json) + +include(FetchContent) + +# spdlog +FetchContent_Declare( + spdlog + GIT_REPOSITORY https://github.com/gabime/spdlog + GIT_TAG v1.14.1 + GIT_SHALLOW TRUE +) + +FetchContent_GetProperties(spdlog) +if(NOT spdlog_POPULATED) + FetchContent_Populate(spdlog) + # set(SPDLOG_FMT_EXTERNAL_HO ON) + set(SPDLOG_USE_STD_FORMAT ON) +endif() + +add_subdirectory( + ${spdlog_SOURCE_DIR} + ${spdlog_BINARY_DIR} +) + +FetchContent_Declare( + asyncmqtt5 + GIT_REPOSITORY https://github.com/mireo/async-mqtt5 + GIT_TAG master + GIT_SHALLOW TRUE +) + +set(MQTT_BUILD_TESTS OFF) +set(MQTT_BUILD_EXAMPLES OFF) +set(async-mqtt5_INCLUDES_WITH_SYSTEM OFF) +FetchContent_MakeAvailable(asyncmqtt5) add_subdirectory(floorheat_hub) diff --git a/CMakeLists.txt.user b/CMakeLists.txt.user index 4949f2a..d0a1972 100644 --- a/CMakeLists.txt.user +++ b/CMakeLists.txt.user @@ -1,6 +1,6 @@ - + EnvironmentId @@ -109,15 +109,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=Debug --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=Debug -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-Debug @@ -166,15 +166,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=Release --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=Release -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-Release @@ -223,15 +223,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-RelWithDebInfo @@ -278,15 +278,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake 0 /home/bartoszek/builds/build-ranczo-io-GCC_12-Profile @@ -334,15 +334,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=MinSizeRel --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=MinSizeRel -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-MinSizeRel diff --git a/floorheat_hub/CMakeLists.txt b/floorheat_hub/CMakeLists.txt index d9bc024..c8df479 100644 --- a/floorheat_hub/CMakeLists.txt +++ b/floorheat_hub/CMakeLists.txt @@ -1,14 +1,16 @@ -protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS - ../proto/message.proto - ../proto/heating.proto - ../proto/ACPower.proto - ../proto/ACPowerInternal.proto -) +# protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS +# ../proto/message.proto +# ../proto/heating.proto +# ../proto/ACPower.proto +# ../proto/ACPowerInternal.proto +# ) add_executable(ranczo-io_floorheating main.cpp heater.cpp relay.cpp + mqtt_client.cpp mqtt_client.hpp + timer.cpp timer.hpp ${PROTO_SRCS} ) @@ -19,5 +21,9 @@ target_include_directories(ranczo-io_floorheating target_link_libraries(ranczo-io_floorheating PUBLIC - ${Protobuf_LIBRARIES} + # ${Protobuf_LIBRARIES} + Async::MQTT5 + Boost::system + Boost::json + spdlog::spdlog ) diff --git a/floorheat_hub/heater.cpp b/floorheat_hub/heater.cpp index 3cb5141..717f4ea 100644 --- a/floorheat_hub/heater.cpp +++ b/floorheat_hub/heater.cpp @@ -1,4 +1,4 @@ - +#include "heater.hpp" /* * TODO diff --git a/floorheat_hub/heater.hpp b/floorheat_hub/heater.hpp index e69de29..c718de8 100644 --- a/floorheat_hub/heater.hpp +++ b/floorheat_hub/heater.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace ranczo{ + +class IHeater { + public: + virtual ~IHeater() = default; + + virtual void setTargetTemperature(double temperatureC) = 0; + virtual void setMaxHisteresis(double temperatureC) = 0; +}; +} diff --git a/floorheat_hub/main.cpp b/floorheat_hub/main.cpp index e2b214c..52d3788 100644 --- a/floorheat_hub/main.cpp +++ b/floorheat_hub/main.cpp @@ -1,154 +1,160 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "timer.hpp" +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + + + + #include -#include -#include -#include +namespace ranczo { -#include -#include - -#include - -struct CANID { - u_int8_t priority : 2; // 4 priority level - u_int16_t id : 11; // 2048 id's per device - u_int8_t sourceAddress : 8; // 255 devices max 0xff -> broadcast - u_int8_t destinationAddress : 8; +class IHeater { + public: + virtual ~IHeater() = default; }; -static_assert((2 + 11 + 8 + 8) <= 29); -constexpr std::size_t maxDataAtMessage(std::size_t msgNum, std::size_t maxFrameSize, uint32_t type) { - if(msgNum == 1) { - return maxFrameSize - 2 - google::protobuf::internal::WireFormatLite::UInt32Size(type) - 1; - } else if(msgNum > 1 and msgNum <= 128) { - return maxFrameSize - 4; - } else if(msgNum > 128 and msgNum <= 16384) { - return maxFrameSize - 5; - } else { - return maxFrameSize - 6; - } -} +class HeaterBase : public IHeater { + private: + boost::asio::repeating_timer timer; -constexpr std::size_t msgnum(int32_t datasize, std::size_t maxFrameSize, uint32_t type) { - std::size_t messages = 0; + protected: + double _targetTemperature; + double _histeresis; - while(datasize > 0) { - messages++; - datasize -= maxDataAtMessage(messages, maxFrameSize, type); - } + public: + virtual void tick() { + spdlog::info("ping"); + }; - return messages; -} - -template < typename T > -struct IncrementAtExit { - T & value; - constexpr ~IncrementAtExit() { - value++; + HeaterBase(boost::asio::any_io_executor & io_executor) : timer{io_executor} { + timer.start(boost::posix_time::seconds{1}, [&](const boost::system::error_code & e) { + // TODO error handling + spdlog::debug("Running timer"); + this->tick(); + }); } }; -std::vector< Frame > encapsulate(const google::protobuf::MessageLite & message, uint32_t type) { - std::vector< Frame > frames; - auto data = message.SerializeAsString(); - frames.resize(msgnum(data.size(), 64, type)); +class IRelay { + public: + virtual ~IRelay() = default; + virtual void setState(bool on) = 0; + virtual void state() = 0; +}; - const char * ptr = reinterpret_cast< const char * >(data.data()); - const char * end = reinterpret_cast< const char * >(data.data() + data.size()); +class ResistiveFloorHeater : public HeaterBase {}; - int current = 0; - for(auto & frame : frames) { - IncrementAtExit _{current}; - frame.set_parts(current); - auto bytesToCopy = std::distance(ptr, std::min(ptr + maxDataAtMessage(current + 1, 64, type), end)); +class ModbusRelay : public IRelay {}; - frame.set_data(ptr, bytesToCopy); +/// TODO +/// * Przypisanie przełącznika do maty grzewczej +/// * Zapis danych w DB +/// * Zapis ustawień +/// * Nasłuchiwanie na MQTT - ptr += bytesToCopy; - } +} // namespace ranczo + +// Modified completion token that will prevent co_await from throwing exceptions. + + +using namespace std::chrono_literals; +using boost::asio::yield_context; +using timer = boost::asio::steady_timer; + +static auto now = timer::clock_type::now; + +// struct X : private boost::noncopyable { +// int _id; +// boost::asio::any_io_executor _executor; +// std::string _topic; - frames[0].set_type(type); - return frames; -} +// X(boost::asio::any_io_executor io_executor, int id, std::string topic) : _executor{io_executor}, _c{_executor}, _id{id}, _topic{topic} { +// spdlog::info("X for id/topic {}/{} created", _id, _topic); + +// spdlog::info("timer for id {} start", _id); +// co_spawn(_executor, run_timer_id(), boost::asio::detached); -std::vector< uint8_t > genData(std::size_t size) { - std::vector< uint8_t > data; - data.resize(size); - std::fill(data.begin(), data.end(), uint8_t{0xde}); - return data; -} +// spdlog::info("mqtt for id/topic {}/{} start", _id, _topic); +// co_spawn(_executor, subscribe_and_receive(_c, _topic), boost::asio::detached); +// } -template < typename T > -void set_arg(PrintfMessage & msg, T && t) { - using X = std::remove_cvref_t< T >; - auto * arg = msg.add_arguments(); +// ~X() { +// spdlog::info("X {} destroyed", _id); +// } - if constexpr(std::is_integral_v< X >) { - arg->set_int_val(t); - return; - } +// boost::asio::awaitable< void > run_timer_id() { +// auto executor = co_await boost::asio::this_coro::executor; +// int counter{0}; +// boost::asio::steady_timer timer{executor}; - if constexpr(std::is_same_v< float, X >) { - arg->set_float_val(t); - return; - } +// while(true) { +// timer.expires_after(std::chrono::seconds(1)); +// co_await timer.async_wait(boost::asio::use_awaitable); +// spdlog::info("Hello from {}:{}", counter++, _id); +// } - if constexpr(std::is_same_v< double, X >) { - arg->set_double_val(t); - return; - } +// co_return; +// } +// }; - if constexpr(std::is_same_v< const char *, X >) { - arg->set_string_val(t); - return; - } -} +#include "mqtt_client.hpp" -void canbud_send(std::vector< Frame > && frames) {} +#include -template < typename... Args > -void canbus_log(int severity, std::string fmt, Args... args) { - PrintfMessage msg; - msg.set_severity(severity); - msg.set_format_string(std::move(fmt)); - - (set_arg(msg, args), ...); - - canbud_send(encapsulate(msg, 0)); +boost::asio::awaitable spawn(ranczo::AsyncMqttClient & c1){ + spdlog::info("SPAWN subscribe"); + co_await c1.subscribe("home/corridor/floor/temperature",[](const boost::json::value &){ + spdlog::info("cb called"); + }); + + spdlog::info("SPAWN listen"); + co_await c1.listen(); + spdlog::info("SPAWN return"); + co_return; } int main() { - std::ostream_iterator< char > out(std::cout); - canbus_log(0, "Debug {}:{}:{}", __FILE_NAME__, __PRETTY_FUNCTION__, __LINE__); - - acpowercontrol::ACPower m; - auto & ac = *m.mutable_set_channel_request(); - ac.set_channel(7); - auto &ps = *ac.mutable_swipe(); - ps.set_start_percentage(0); - ps.set_end_percentage(1000); - - ps.set_duration_ms(3200); - ps.set_function(acpowercontrol::internal::ACPowerSwipe_Function::ACPowerSwipe_Function_EaseInOut); - - std::string o; - - auto frames = encapsulate(m, SetACPowerChannelControlID); - std::cout << frames.at(0).SerializeAsString().size() << '\n'; + spdlog::set_level(spdlog::level::trace); + std::vector< std::unique_ptr< ranczo::IHeater > > _heaters; + boost::asio::io_service io_service; + + /// Strand powoduje że zadania do niego przypisane zostają wykonane sekwencyjnie, + /// get_executor pobrany z io_service nie daje takiej możliwości i wtedy można wykonywać zadania równloegle + // boost::asio::any_io_executor io_executor = io_service.get_executor(); + boost::asio::any_io_executor io_executor = boost::asio::make_strand(io_service); + + char buffer[2048]; + std::pmr::monotonic_buffer_resource mbr(buffer, 2048, std::pmr::null_memory_resource()); + // "home/corridor/floor/temperature" + // "home/utilityRoom/floor/temperature" + // "home/wardrobe/floor/temperature" + // ranczo::AsyncMqttClient c1{io_executor}; + // c1.add_subscribtion("home/corridor/floor/temperature", [](const boost::json::value &) { spdlog::info("cb called"); }); + + ranczo::PeriodicTimer timer{io_executor, std::chrono::seconds{1}, []() { spdlog::info("timer called"); }}; + + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 1, "home/corridor/floor/temperature")); + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 2, "home/utilityRoom/floor/temperature")); + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 3, "home/wardrobe/floor/temperature")); + + boost::asio::io_service::work work(io_service); + io_service.run(); + return 0; } diff --git a/floorheat_hub/mqtt_client.cpp b/floorheat_hub/mqtt_client.cpp new file mode 100644 index 0000000..e9e6a7c --- /dev/null +++ b/floorheat_hub/mqtt_client.cpp @@ -0,0 +1,136 @@ +#include "mqtt_client.hpp" + +#include +#include + +#include +#include + +#include +#include + +namespace ranczo { +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); +using client_type = async_mqtt5::mqtt_client< boost::asio::ip::tcp::socket >; + +struct AsyncMqttClient::AsyncMqttClientImpl { + const boost::asio::any_io_executor & _executor; + client_type _mqtt_client; + std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks; + + AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} { + spdlog::trace("Creating mqtt client"); + // Configure the Client. + // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. + _mqtt_client + .brokers("192.168.100.6", 1883) // Broker that we want to connect to. 1883 is the default TCP port. + .async_run(boost::asio::detached); // Start the client. + + start(); + spdlog::trace("Creating mqtt client done"); + } + + ~AsyncMqttClientImpl() = default; + + boost::asio::awaitable< bool > subscribe(std::string_view topic) { + // Configure the request to subscribe to a Topic. + async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{topic.data(), + async_mqtt5::subscribe_options{ + async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2. + async_mqtt5::no_local_e::no, // Forward message from Clients with same ID. + async_mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag. + async_mqtt5::retain_handling_e::send // Send retained messages when the subscription is established. + }}; + + // Subscribe to a single Topic. + auto && [ec, sub_codes, sub_props] = + co_await _mqtt_client.async_subscribe(sub_topic, async_mqtt5::subscribe_props{}, use_nothrow_awaitable); + // Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call. + + // An error can occur as a result of: + // a) wrong subscribe parameters + // b) mqtt_client::cancel is called while the Client is in the process of subscribing + if(ec) + spdlog::error("Subscribe error occurred: {}", ec.message()); + else + spdlog::info("Result of subscribe request: {}", sub_codes[0].message()); + + co_return !ec && !sub_codes[0]; // True if the subscription was successfully established. + } + + /// TODO should be const + boost::asio::awaitable< void > listen() { + spdlog::trace("listen invoked"); + for(;;) { + // Receive an Appplication Message from the subscribed Topic(s). + auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable); + + if(ec == async_mqtt5::client::error::session_expired) { + /// TODO connect first, then subscribe to all topics + // The Client has reconnected, and the prior session has expired. + // As a result, any previous subscriptions have been lost and must be reinstated. + if(co_await subscribe(topic)) + continue; + else + break; + } else if(ec) + break; + + spdlog::info("Received message from the Broker topic {}, payload {}", topic, payload); + + auto value = boost::json::parse(payload); /// TODO pass memory resource + + if(_callbacks.contains(topic)) { + /// TODO topic filtering as part of unordered map? + /// unordered map can map opny one thing, maybe vector would be better? + spdlog::info("Callback for topic {} found, executing", topic); + _callbacks.at(topic)(value); /// TODO pass topic to callbacl? + } else { + spdlog::info("Callback for topic {} not found", topic); + } + } + + co_return; + } + + void start() { + spdlog::trace("co_spawn mqtt client"); + boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached); + } +}; + +AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) + : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} + +AsyncMqttClient::awaitable< bool > AsyncMqttClient::subscribe(std::string_view topic, + std::function< void(const boost::json::value &) > cb) { + assert(_impl); + spdlog::trace("subscribtion to {} started", topic); + if(!(co_await _impl->subscribe(topic))) { + spdlog::error("subscribtion to {} failed", topic); + co_return false; + } + + spdlog::trace("subscribtion to {} ok, registering callback", topic); + _impl->_callbacks[std::string{topic.data()}] = cb; + + co_return true; +} + +bool AsyncMqttClient::add_subscribtion(std::string_view topic, std::function< void(const boost::json::value &) > cb) { + assert(_impl); + co_spawn(_impl->_mqtt_client.get_executor(), + subscribe(topic, [](const boost::json::value &) { spdlog::info("cb called"); }), + boost::asio::detached); + return true; +} + +AsyncMqttClient::awaitable< void > AsyncMqttClient::listen() const { + assert(_impl); + spdlog::info("client listen start"); + return _impl->listen(); +} + +AsyncMqttClient::~AsyncMqttClient() = default; + +} // namespace ranczo diff --git a/floorheat_hub/mqtt_client.hpp b/floorheat_hub/mqtt_client.hpp new file mode 100644 index 0000000..2176d03 --- /dev/null +++ b/floorheat_hub/mqtt_client.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include + +#include +#include + +namespace boost::json { +class value; +} + +namespace boost::asio { +class any_io_executor; +} + +namespace ranczo { + +class AsyncMqttClient { + public: + template < typename T > + using awaitable = boost::asio::awaitable< T >; + using executor = boost::asio::any_io_executor; + + struct AsyncMqttClientImpl; + std::unique_ptr< AsyncMqttClientImpl > _impl; + + AsyncMqttClient(const executor & executor); + ~AsyncMqttClient(); + + awaitable< bool > subscribe(std::string_view topic, std::function< void(const boost::json::value & value) >); + bool add_subscribtion(std::string_view topic, std::function< void(const boost::json::value & value) >cb); + awaitable< void > listen() const; +}; + +} // namespace ranczo diff --git a/floorheat_hub/relay.cpp b/floorheat_hub/relay.cpp index e69de29..b28b04f 100644 --- a/floorheat_hub/relay.cpp +++ b/floorheat_hub/relay.cpp @@ -0,0 +1,3 @@ + + + diff --git a/floorheat_hub/timer.cpp b/floorheat_hub/timer.cpp new file mode 100644 index 0000000..afb826e --- /dev/null +++ b/floorheat_hub/timer.cpp @@ -0,0 +1,58 @@ +#include "timer.hpp" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ranczo { +struct PeriodicTimer::Impl { + private: + std::chrono::milliseconds _interval; + boost::asio::steady_timer _timer; + std::function< void() > _callback; + + public: + Impl(boost::asio::any_io_executor & io_context, std::chrono::milliseconds interval, std::function< void() > cb) + : _interval{interval}, _timer(io_context, _interval), _callback{std::move(cb)} { + start(); + } + + // Coroutine function to handle the timer expiration + boost::asio::awaitable< void > timerCoroutine() { + assert(_callback); + + try { + while(true) { + // call my callback + _callback(); + + // Wait for the timer to expire + co_await _timer.async_wait(boost::asio::use_awaitable); + + // Reset the timer for the next interval + _timer.expires_after(_interval); + } + } catch(const std::exception & e) { + spdlog::error("Error: {}", e.what()); + } + } + + void start() { + spdlog::trace("co_spawn timer"); + // Start the coroutine by invoking the timerCoroutine() with boost::asio::co_spawn + boost::asio::co_spawn(_timer.get_executor(), timerCoroutine(), boost::asio::detached); + } +}; + +PeriodicTimer::PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb) + : _impl{std::make_unique< Impl >(executor, period, std::move(cb))} {} + +PeriodicTimer::~PeriodicTimer() = default; + +} // namespace ranczo diff --git a/floorheat_hub/timer.hpp b/floorheat_hub/timer.hpp new file mode 100644 index 0000000..fad1039 --- /dev/null +++ b/floorheat_hub/timer.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include +#include +#include + +namespace boost::asio { +class any_io_executor; +} + +namespace ranczo { + +class PeriodicTimer { + public: + template < typename T > + using awaitable = boost::asio::awaitable< T >; + using executor = boost::asio::any_io_executor; + + struct Impl; + std::unique_ptr< Impl > _impl; + + PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb); + ~PeriodicTimer(); +}; + +} // namespace ranczo diff --git a/include/boost/asio/.basic_repeating_timer.hpp.swp b/include/boost/asio/.basic_repeating_timer.hpp.swp new file mode 100644 index 0000000..51ec72d Binary files /dev/null and b/include/boost/asio/.basic_repeating_timer.hpp.swp differ diff --git a/include/boost/asio/basic_repeating_timer.hpp b/include/boost/asio/basic_repeating_timer.hpp new file mode 100644 index 0000000..cec9e34 --- /dev/null +++ b/include/boost/asio/basic_repeating_timer.hpp @@ -0,0 +1,178 @@ +#ifndef BOOST_ASIO_REPEATING_TIMER_H_INCLUDED +#define BOOST_ASIO_REPEATING_TIMER_H_INCLUDED + +// Adapted from the original, developed 2007 by David C. Wyles (http:///www.codegorilla.co.uk) +// released for public consumption under the same +// licensing policy as the boost library http://www.boost.org/LICENSE_1_0.txt +// +// most people will use the repeating_timer typedef for working with posix time +// +// Is is based on the basic_deadline_timer +// +// Updated 2021 by Michael Haben, for compatibility with Boost 1.77 / ASIO 1.18.2 +// - uses an executor instead of an io_service or io_context. + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace asio { + // basic repeating timer with start/stop semantics. + + template < typename Time, typename TimeTraits = boost::asio::time_traits< Time >, typename Executor = boost::asio::any_io_executor > + class basic_repeating_timer { + public: + typedef boost::asio::basic_deadline_timer< Time > timer_type; + + explicit basic_repeating_timer(Executor ex) : ex_(ex) {} + + ~basic_repeating_timer() { + stop(); + } + + template < typename WaitHandler > + void start(typename timer_type::duration_type const & repeat_interval, WaitHandler handler) { + boost::recursive_mutex::scoped_lock guard(lock_); + { + // cleanup code, cancel any existing timer + handler_.reset(); + if(timer_) { + timer_->cancel(); + } + timer_.reset(); + } + + // create new handler. + handler_.reset(new handler_impl< WaitHandler >(handler)); + // create new timer + timer_ = internal_timer::create(this->ex_, repeat_interval, handler_); + } + + void stop() { + boost::recursive_mutex::scoped_lock guard(lock_); + { + // cleanup code. + handler_.reset(); + if(timer_) { + timer_->cancel(); + } + timer_.reset(); + } + } + + void cancel() { + stop(); + } + + // changes the interval the next time the timer is fired. + void change_interval(typename timer_type::duration_type const & repeat_interval) { + boost::recursive_mutex::scoped_lock guard(lock_); + if(timer_) { + timer_->change_interval(repeat_interval); + } + } + + private: + const Executor & ex_; + boost::recursive_mutex lock_; + class internal_timer; + typedef boost::shared_ptr< internal_timer > internal_timer_ptr; + typedef boost::asio::executor executor_type; + internal_timer_ptr timer_; + + class handler_base; + boost::shared_ptr< handler_base > handler_; + + class handler_base { + public: + virtual ~handler_base() {} + virtual void handler(boost::system::error_code const &) = 0; + }; + + template < typename HandlerFunc > + class handler_impl : public handler_base { + public: + handler_impl(HandlerFunc func) : handler_func_(func) {} + virtual void handler(boost::system::error_code const & result) { + handler_func_(result); + } + HandlerFunc handler_func_; + }; + + class internal_timer : public boost::enable_shared_from_this< internal_timer > { + public: + static internal_timer_ptr create(const Executor & ex, + typename timer_type::duration_type const & repeat_interval, + boost::shared_ptr< handler_base > const & handler) { + internal_timer_ptr timer(new internal_timer(ex)); + timer->start(repeat_interval, handler); + return timer; + } + + void cancel() { + boost::recursive_mutex::scoped_lock guard(lock_); + timer_.cancel(); + } + + void change_interval(typename timer_type::duration_type const & repeat_interval) { + boost::recursive_mutex::scoped_lock guard(lock_); + interval_ = repeat_interval; + } + + private: + timer_type timer_; + boost::weak_ptr< handler_base > handler_; + typename timer_type::duration_type interval_; + boost::recursive_mutex lock_; + + internal_timer(boost::asio::any_io_executor ex) : timer_(ex) {} + + void start(typename timer_type::duration_type const & repeat_interval, boost::shared_ptr< handler_base > const & handler) { + // only EVER called once, via create + interval_ = repeat_interval; + handler_ = handler; + + timer_.expires_from_now(interval_); + timer_.async_wait(boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error)); + } + + void handle_timeout(boost::system::error_code const & error) { + // we lock in the timeout to block the cancel operation until this timeout completes + boost::recursive_mutex::scoped_lock guard(lock_); + { + // do the fire. + boost::shared_ptr< handler_base > Handler = handler_.lock(); + if(Handler) { + try { + Handler->handler(error); + } catch(std::exception const & e) { + // consume for now, no much else we can do, we don't want to damage the + // io_context thread + ( void ) e; + } + } + } + + if(!error) { + // check if we need to reschedule. + boost::shared_ptr< handler_base > Handler = handler_.lock(); + if(Handler) { + timer_.expires_from_now(interval_); + timer_.async_wait( + boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error)); + } + } + } + }; + }; + + typedef basic_repeating_timer< boost::posix_time::ptime > repeating_timer; +} // namespace asio +} // namespace boost + +#endif