From 10c36bc30cfbab236680e983f3cc3e68b9ab3134 Mon Sep 17 00:00:00 2001 From: Bartosz Wieczorek Date: Tue, 5 Nov 2024 10:41:40 +0100 Subject: [PATCH] corutines implementation test --- CMakeLists.txt | 43 ++- CMakeLists.txt.user | 62 ++--- floorheat_hub/CMakeLists.txt | 20 +- floorheat_hub/heater.cpp | 2 +- floorheat_hub/heater.hpp | 12 + floorheat_hub/main.cpp | 254 +++++++++--------- floorheat_hub/mqtt_client.cpp | 136 ++++++++++ floorheat_hub/mqtt_client.hpp | 35 +++ floorheat_hub/relay.cpp | 3 + floorheat_hub/timer.cpp | 58 ++++ floorheat_hub/timer.hpp | 28 ++ .../boost/asio/.basic_repeating_timer.hpp.swp | Bin 0 -> 20480 bytes include/boost/asio/basic_repeating_timer.hpp | 178 ++++++++++++ 13 files changed, 664 insertions(+), 167 deletions(-) create mode 100644 floorheat_hub/mqtt_client.cpp create mode 100644 floorheat_hub/mqtt_client.hpp create mode 100644 floorheat_hub/timer.cpp create mode 100644 floorheat_hub/timer.hpp create mode 100644 include/boost/asio/.basic_repeating_timer.hpp.swp create mode 100644 include/boost/asio/basic_repeating_timer.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a29564..d9531e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,10 +4,45 @@ project(Ranczo-IO) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED ON) -find_package(Protobuf REQUIRED) -include_directories(${Protobuf_INCLUDE_DIRS}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) +# find_package(Protobuf REQUIRED) +# include_directories(${Protobuf_INCLUDE_DIRS}) -find_package(cppzmq) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) +set(BOOST_ROOT /usr/local) +find_package(Boost REQUIRED COMPONENTS system json) + +include(FetchContent) + +# spdlog +FetchContent_Declare( + spdlog + GIT_REPOSITORY https://github.com/gabime/spdlog + GIT_TAG v1.14.1 + GIT_SHALLOW TRUE +) + +FetchContent_GetProperties(spdlog) +if(NOT spdlog_POPULATED) + FetchContent_Populate(spdlog) + # set(SPDLOG_FMT_EXTERNAL_HO ON) + set(SPDLOG_USE_STD_FORMAT ON) +endif() + +add_subdirectory( + ${spdlog_SOURCE_DIR} + ${spdlog_BINARY_DIR} +) + +FetchContent_Declare( + asyncmqtt5 + GIT_REPOSITORY https://github.com/mireo/async-mqtt5 + GIT_TAG master + GIT_SHALLOW TRUE +) + +set(MQTT_BUILD_TESTS OFF) +set(MQTT_BUILD_EXAMPLES OFF) +set(async-mqtt5_INCLUDES_WITH_SYSTEM OFF) +FetchContent_MakeAvailable(asyncmqtt5) add_subdirectory(floorheat_hub) diff --git a/CMakeLists.txt.user b/CMakeLists.txt.user index 4949f2a..d0a1972 100644 --- a/CMakeLists.txt.user +++ b/CMakeLists.txt.user @@ -1,6 +1,6 @@ - + EnvironmentId @@ -109,15 +109,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=Debug --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=Debug -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-Debug @@ -166,15 +166,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=Release --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=Release -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-Release @@ -223,15 +223,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-RelWithDebInfo @@ -278,15 +278,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake 0 /home/bartoszek/builds/build-ranczo-io-GCC_12-Profile @@ -334,15 +334,15 @@ 2 false - -DCMAKE_BUILD_TYPE:STRING=MinSizeRel --DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} --DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} + -DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX} -DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable} --DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake -DCMAKE_GENERATOR:STRING=Ninja +-DCMAKE_BUILD_TYPE:STRING=MinSizeRel -DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc --DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc +-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx} +-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C} +-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake /home/bartoszek/builds/build-ranczo-io-GCC_12-MinSizeRel diff --git a/floorheat_hub/CMakeLists.txt b/floorheat_hub/CMakeLists.txt index d9bc024..c8df479 100644 --- a/floorheat_hub/CMakeLists.txt +++ b/floorheat_hub/CMakeLists.txt @@ -1,14 +1,16 @@ -protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS - ../proto/message.proto - ../proto/heating.proto - ../proto/ACPower.proto - ../proto/ACPowerInternal.proto -) +# protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS +# ../proto/message.proto +# ../proto/heating.proto +# ../proto/ACPower.proto +# ../proto/ACPowerInternal.proto +# ) add_executable(ranczo-io_floorheating main.cpp heater.cpp relay.cpp + mqtt_client.cpp mqtt_client.hpp + timer.cpp timer.hpp ${PROTO_SRCS} ) @@ -19,5 +21,9 @@ target_include_directories(ranczo-io_floorheating target_link_libraries(ranczo-io_floorheating PUBLIC - ${Protobuf_LIBRARIES} + # ${Protobuf_LIBRARIES} + Async::MQTT5 + Boost::system + Boost::json + spdlog::spdlog ) diff --git a/floorheat_hub/heater.cpp b/floorheat_hub/heater.cpp index 3cb5141..717f4ea 100644 --- a/floorheat_hub/heater.cpp +++ b/floorheat_hub/heater.cpp @@ -1,4 +1,4 @@ - +#include "heater.hpp" /* * TODO diff --git a/floorheat_hub/heater.hpp b/floorheat_hub/heater.hpp index e69de29..c718de8 100644 --- a/floorheat_hub/heater.hpp +++ b/floorheat_hub/heater.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace ranczo{ + +class IHeater { + public: + virtual ~IHeater() = default; + + virtual void setTargetTemperature(double temperatureC) = 0; + virtual void setMaxHisteresis(double temperatureC) = 0; +}; +} diff --git a/floorheat_hub/main.cpp b/floorheat_hub/main.cpp index e2b214c..52d3788 100644 --- a/floorheat_hub/main.cpp +++ b/floorheat_hub/main.cpp @@ -1,154 +1,160 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "timer.hpp" +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + + + + #include -#include -#include -#include +namespace ranczo { -#include -#include - -#include - -struct CANID { - u_int8_t priority : 2; // 4 priority level - u_int16_t id : 11; // 2048 id's per device - u_int8_t sourceAddress : 8; // 255 devices max 0xff -> broadcast - u_int8_t destinationAddress : 8; +class IHeater { + public: + virtual ~IHeater() = default; }; -static_assert((2 + 11 + 8 + 8) <= 29); -constexpr std::size_t maxDataAtMessage(std::size_t msgNum, std::size_t maxFrameSize, uint32_t type) { - if(msgNum == 1) { - return maxFrameSize - 2 - google::protobuf::internal::WireFormatLite::UInt32Size(type) - 1; - } else if(msgNum > 1 and msgNum <= 128) { - return maxFrameSize - 4; - } else if(msgNum > 128 and msgNum <= 16384) { - return maxFrameSize - 5; - } else { - return maxFrameSize - 6; - } -} +class HeaterBase : public IHeater { + private: + boost::asio::repeating_timer timer; -constexpr std::size_t msgnum(int32_t datasize, std::size_t maxFrameSize, uint32_t type) { - std::size_t messages = 0; + protected: + double _targetTemperature; + double _histeresis; - while(datasize > 0) { - messages++; - datasize -= maxDataAtMessage(messages, maxFrameSize, type); - } + public: + virtual void tick() { + spdlog::info("ping"); + }; - return messages; -} - -template < typename T > -struct IncrementAtExit { - T & value; - constexpr ~IncrementAtExit() { - value++; + HeaterBase(boost::asio::any_io_executor & io_executor) : timer{io_executor} { + timer.start(boost::posix_time::seconds{1}, [&](const boost::system::error_code & e) { + // TODO error handling + spdlog::debug("Running timer"); + this->tick(); + }); } }; -std::vector< Frame > encapsulate(const google::protobuf::MessageLite & message, uint32_t type) { - std::vector< Frame > frames; - auto data = message.SerializeAsString(); - frames.resize(msgnum(data.size(), 64, type)); +class IRelay { + public: + virtual ~IRelay() = default; + virtual void setState(bool on) = 0; + virtual void state() = 0; +}; - const char * ptr = reinterpret_cast< const char * >(data.data()); - const char * end = reinterpret_cast< const char * >(data.data() + data.size()); +class ResistiveFloorHeater : public HeaterBase {}; - int current = 0; - for(auto & frame : frames) { - IncrementAtExit _{current}; - frame.set_parts(current); - auto bytesToCopy = std::distance(ptr, std::min(ptr + maxDataAtMessage(current + 1, 64, type), end)); +class ModbusRelay : public IRelay {}; - frame.set_data(ptr, bytesToCopy); +/// TODO +/// * Przypisanie przełącznika do maty grzewczej +/// * Zapis danych w DB +/// * Zapis ustawień +/// * Nasłuchiwanie na MQTT - ptr += bytesToCopy; - } +} // namespace ranczo + +// Modified completion token that will prevent co_await from throwing exceptions. + + +using namespace std::chrono_literals; +using boost::asio::yield_context; +using timer = boost::asio::steady_timer; + +static auto now = timer::clock_type::now; + +// struct X : private boost::noncopyable { +// int _id; +// boost::asio::any_io_executor _executor; +// std::string _topic; - frames[0].set_type(type); - return frames; -} +// X(boost::asio::any_io_executor io_executor, int id, std::string topic) : _executor{io_executor}, _c{_executor}, _id{id}, _topic{topic} { +// spdlog::info("X for id/topic {}/{} created", _id, _topic); + +// spdlog::info("timer for id {} start", _id); +// co_spawn(_executor, run_timer_id(), boost::asio::detached); -std::vector< uint8_t > genData(std::size_t size) { - std::vector< uint8_t > data; - data.resize(size); - std::fill(data.begin(), data.end(), uint8_t{0xde}); - return data; -} +// spdlog::info("mqtt for id/topic {}/{} start", _id, _topic); +// co_spawn(_executor, subscribe_and_receive(_c, _topic), boost::asio::detached); +// } -template < typename T > -void set_arg(PrintfMessage & msg, T && t) { - using X = std::remove_cvref_t< T >; - auto * arg = msg.add_arguments(); +// ~X() { +// spdlog::info("X {} destroyed", _id); +// } - if constexpr(std::is_integral_v< X >) { - arg->set_int_val(t); - return; - } +// boost::asio::awaitable< void > run_timer_id() { +// auto executor = co_await boost::asio::this_coro::executor; +// int counter{0}; +// boost::asio::steady_timer timer{executor}; - if constexpr(std::is_same_v< float, X >) { - arg->set_float_val(t); - return; - } +// while(true) { +// timer.expires_after(std::chrono::seconds(1)); +// co_await timer.async_wait(boost::asio::use_awaitable); +// spdlog::info("Hello from {}:{}", counter++, _id); +// } - if constexpr(std::is_same_v< double, X >) { - arg->set_double_val(t); - return; - } +// co_return; +// } +// }; - if constexpr(std::is_same_v< const char *, X >) { - arg->set_string_val(t); - return; - } -} +#include "mqtt_client.hpp" -void canbud_send(std::vector< Frame > && frames) {} +#include -template < typename... Args > -void canbus_log(int severity, std::string fmt, Args... args) { - PrintfMessage msg; - msg.set_severity(severity); - msg.set_format_string(std::move(fmt)); - - (set_arg(msg, args), ...); - - canbud_send(encapsulate(msg, 0)); +boost::asio::awaitable spawn(ranczo::AsyncMqttClient & c1){ + spdlog::info("SPAWN subscribe"); + co_await c1.subscribe("home/corridor/floor/temperature",[](const boost::json::value &){ + spdlog::info("cb called"); + }); + + spdlog::info("SPAWN listen"); + co_await c1.listen(); + spdlog::info("SPAWN return"); + co_return; } int main() { - std::ostream_iterator< char > out(std::cout); - canbus_log(0, "Debug {}:{}:{}", __FILE_NAME__, __PRETTY_FUNCTION__, __LINE__); - - acpowercontrol::ACPower m; - auto & ac = *m.mutable_set_channel_request(); - ac.set_channel(7); - auto &ps = *ac.mutable_swipe(); - ps.set_start_percentage(0); - ps.set_end_percentage(1000); - - ps.set_duration_ms(3200); - ps.set_function(acpowercontrol::internal::ACPowerSwipe_Function::ACPowerSwipe_Function_EaseInOut); - - std::string o; - - auto frames = encapsulate(m, SetACPowerChannelControlID); - std::cout << frames.at(0).SerializeAsString().size() << '\n'; + spdlog::set_level(spdlog::level::trace); + std::vector< std::unique_ptr< ranczo::IHeater > > _heaters; + boost::asio::io_service io_service; + + /// Strand powoduje że zadania do niego przypisane zostają wykonane sekwencyjnie, + /// get_executor pobrany z io_service nie daje takiej możliwości i wtedy można wykonywać zadania równloegle + // boost::asio::any_io_executor io_executor = io_service.get_executor(); + boost::asio::any_io_executor io_executor = boost::asio::make_strand(io_service); + + char buffer[2048]; + std::pmr::monotonic_buffer_resource mbr(buffer, 2048, std::pmr::null_memory_resource()); + // "home/corridor/floor/temperature" + // "home/utilityRoom/floor/temperature" + // "home/wardrobe/floor/temperature" + // ranczo::AsyncMqttClient c1{io_executor}; + // c1.add_subscribtion("home/corridor/floor/temperature", [](const boost::json::value &) { spdlog::info("cb called"); }); + + ranczo::PeriodicTimer timer{io_executor, std::chrono::seconds{1}, []() { spdlog::info("timer called"); }}; + + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 1, "home/corridor/floor/temperature")); + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 2, "home/utilityRoom/floor/temperature")); + // xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 3, "home/wardrobe/floor/temperature")); + + boost::asio::io_service::work work(io_service); + io_service.run(); + return 0; } diff --git a/floorheat_hub/mqtt_client.cpp b/floorheat_hub/mqtt_client.cpp new file mode 100644 index 0000000..e9e6a7c --- /dev/null +++ b/floorheat_hub/mqtt_client.cpp @@ -0,0 +1,136 @@ +#include "mqtt_client.hpp" + +#include +#include + +#include +#include + +#include +#include + +namespace ranczo { +constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable); +using client_type = async_mqtt5::mqtt_client< boost::asio::ip::tcp::socket >; + +struct AsyncMqttClient::AsyncMqttClientImpl { + const boost::asio::any_io_executor & _executor; + client_type _mqtt_client; + std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks; + + AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} { + spdlog::trace("Creating mqtt client"); + // Configure the Client. + // It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client. + _mqtt_client + .brokers("192.168.100.6", 1883) // Broker that we want to connect to. 1883 is the default TCP port. + .async_run(boost::asio::detached); // Start the client. + + start(); + spdlog::trace("Creating mqtt client done"); + } + + ~AsyncMqttClientImpl() = default; + + boost::asio::awaitable< bool > subscribe(std::string_view topic) { + // Configure the request to subscribe to a Topic. + async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{topic.data(), + async_mqtt5::subscribe_options{ + async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2. + async_mqtt5::no_local_e::no, // Forward message from Clients with same ID. + async_mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag. + async_mqtt5::retain_handling_e::send // Send retained messages when the subscription is established. + }}; + + // Subscribe to a single Topic. + auto && [ec, sub_codes, sub_props] = + co_await _mqtt_client.async_subscribe(sub_topic, async_mqtt5::subscribe_props{}, use_nothrow_awaitable); + // Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call. + + // An error can occur as a result of: + // a) wrong subscribe parameters + // b) mqtt_client::cancel is called while the Client is in the process of subscribing + if(ec) + spdlog::error("Subscribe error occurred: {}", ec.message()); + else + spdlog::info("Result of subscribe request: {}", sub_codes[0].message()); + + co_return !ec && !sub_codes[0]; // True if the subscription was successfully established. + } + + /// TODO should be const + boost::asio::awaitable< void > listen() { + spdlog::trace("listen invoked"); + for(;;) { + // Receive an Appplication Message from the subscribed Topic(s). + auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable); + + if(ec == async_mqtt5::client::error::session_expired) { + /// TODO connect first, then subscribe to all topics + // The Client has reconnected, and the prior session has expired. + // As a result, any previous subscriptions have been lost and must be reinstated. + if(co_await subscribe(topic)) + continue; + else + break; + } else if(ec) + break; + + spdlog::info("Received message from the Broker topic {}, payload {}", topic, payload); + + auto value = boost::json::parse(payload); /// TODO pass memory resource + + if(_callbacks.contains(topic)) { + /// TODO topic filtering as part of unordered map? + /// unordered map can map opny one thing, maybe vector would be better? + spdlog::info("Callback for topic {} found, executing", topic); + _callbacks.at(topic)(value); /// TODO pass topic to callbacl? + } else { + spdlog::info("Callback for topic {} not found", topic); + } + } + + co_return; + } + + void start() { + spdlog::trace("co_spawn mqtt client"); + boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached); + } +}; + +AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor) + : _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {} + +AsyncMqttClient::awaitable< bool > AsyncMqttClient::subscribe(std::string_view topic, + std::function< void(const boost::json::value &) > cb) { + assert(_impl); + spdlog::trace("subscribtion to {} started", topic); + if(!(co_await _impl->subscribe(topic))) { + spdlog::error("subscribtion to {} failed", topic); + co_return false; + } + + spdlog::trace("subscribtion to {} ok, registering callback", topic); + _impl->_callbacks[std::string{topic.data()}] = cb; + + co_return true; +} + +bool AsyncMqttClient::add_subscribtion(std::string_view topic, std::function< void(const boost::json::value &) > cb) { + assert(_impl); + co_spawn(_impl->_mqtt_client.get_executor(), + subscribe(topic, [](const boost::json::value &) { spdlog::info("cb called"); }), + boost::asio::detached); + return true; +} + +AsyncMqttClient::awaitable< void > AsyncMqttClient::listen() const { + assert(_impl); + spdlog::info("client listen start"); + return _impl->listen(); +} + +AsyncMqttClient::~AsyncMqttClient() = default; + +} // namespace ranczo diff --git a/floorheat_hub/mqtt_client.hpp b/floorheat_hub/mqtt_client.hpp new file mode 100644 index 0000000..2176d03 --- /dev/null +++ b/floorheat_hub/mqtt_client.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include + +#include +#include + +namespace boost::json { +class value; +} + +namespace boost::asio { +class any_io_executor; +} + +namespace ranczo { + +class AsyncMqttClient { + public: + template < typename T > + using awaitable = boost::asio::awaitable< T >; + using executor = boost::asio::any_io_executor; + + struct AsyncMqttClientImpl; + std::unique_ptr< AsyncMqttClientImpl > _impl; + + AsyncMqttClient(const executor & executor); + ~AsyncMqttClient(); + + awaitable< bool > subscribe(std::string_view topic, std::function< void(const boost::json::value & value) >); + bool add_subscribtion(std::string_view topic, std::function< void(const boost::json::value & value) >cb); + awaitable< void > listen() const; +}; + +} // namespace ranczo diff --git a/floorheat_hub/relay.cpp b/floorheat_hub/relay.cpp index e69de29..b28b04f 100644 --- a/floorheat_hub/relay.cpp +++ b/floorheat_hub/relay.cpp @@ -0,0 +1,3 @@ + + + diff --git a/floorheat_hub/timer.cpp b/floorheat_hub/timer.cpp new file mode 100644 index 0000000..afb826e --- /dev/null +++ b/floorheat_hub/timer.cpp @@ -0,0 +1,58 @@ +#include "timer.hpp" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ranczo { +struct PeriodicTimer::Impl { + private: + std::chrono::milliseconds _interval; + boost::asio::steady_timer _timer; + std::function< void() > _callback; + + public: + Impl(boost::asio::any_io_executor & io_context, std::chrono::milliseconds interval, std::function< void() > cb) + : _interval{interval}, _timer(io_context, _interval), _callback{std::move(cb)} { + start(); + } + + // Coroutine function to handle the timer expiration + boost::asio::awaitable< void > timerCoroutine() { + assert(_callback); + + try { + while(true) { + // call my callback + _callback(); + + // Wait for the timer to expire + co_await _timer.async_wait(boost::asio::use_awaitable); + + // Reset the timer for the next interval + _timer.expires_after(_interval); + } + } catch(const std::exception & e) { + spdlog::error("Error: {}", e.what()); + } + } + + void start() { + spdlog::trace("co_spawn timer"); + // Start the coroutine by invoking the timerCoroutine() with boost::asio::co_spawn + boost::asio::co_spawn(_timer.get_executor(), timerCoroutine(), boost::asio::detached); + } +}; + +PeriodicTimer::PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb) + : _impl{std::make_unique< Impl >(executor, period, std::move(cb))} {} + +PeriodicTimer::~PeriodicTimer() = default; + +} // namespace ranczo diff --git a/floorheat_hub/timer.hpp b/floorheat_hub/timer.hpp new file mode 100644 index 0000000..fad1039 --- /dev/null +++ b/floorheat_hub/timer.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include +#include +#include + +namespace boost::asio { +class any_io_executor; +} + +namespace ranczo { + +class PeriodicTimer { + public: + template < typename T > + using awaitable = boost::asio::awaitable< T >; + using executor = boost::asio::any_io_executor; + + struct Impl; + std::unique_ptr< Impl > _impl; + + PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb); + ~PeriodicTimer(); +}; + +} // namespace ranczo diff --git a/include/boost/asio/.basic_repeating_timer.hpp.swp b/include/boost/asio/.basic_repeating_timer.hpp.swp new file mode 100644 index 0000000000000000000000000000000000000000..51ec72d0f0860901168d82f8c0bbc4941ab5d193 GIT binary patch literal 20480 zcmeI4d5|1c8Ndf3m)r>;9%y4A%*yP}CLAG?u##h8mt2_hFCBYl-tIQ(ogTY;_8=r2 z0tHoCS_)bPsNo1$hAI#&Ey5KF{?IB=30S3nsR(j}Tm3^tpR{hYEyKx!*%gRYkPZp z7o63*ptpPFk`9Tuy|rFo-g8TN*Q}H-ak?!hZ&gZmt*r7xj^p`x)8p08=ZDuXh7Y3W z;qCO2Y9Q4>s)1AksRmLFq#8&ykZK^+K&pXM1E~g54SWC^Fw2>Y#O^0bfY1Ja6#wr& zI+J+{Zi8Fl7T5-h;XF7Jj(~@c%48mbop3)`FbWFFumH}1v*1j4_Q*`;_i!8B3b(*E zxCX9ulX(%g!WM9#0y#JxJ_;woZw}35ehv4+58>~hG8KTK=%WRF`7Ol+w?rcs`$#Sn09a! zWnpU)eR8Exa8;@1de*oqj@5iMX&AOs+Ek3CEjbm>H~J?@>pSioLrpfPsJhm;>8k=V zvs*{f5yM6HGF1Zhwt6oDIt~y4s&6 z*St{hN={Xki=qHyq-MJ1tky;Row4_gJ673f6WpJws){+LLNVlLp-`^591U_RMfqt2 zN}bbQ$5x5+`LJ})@JCh9rszihsHjO_zd`@fS~M)r7`9wh?uj=?ASfr8l%4G|HhaEP z&2}xMMOxpN&_r8A&C#bdb#ymq!5&w6%BR%iLeE365FHWt-i)Re1l z8a*W(?^=tEsfgpA31>L7%ptAWVgp&M6tU2P%#m3&8Jm98WYxAxmTx4#s6NG#TV}4- zw@S^+YS;+J8##1ZCMe8_v+80f$l5Jt5q-t#l5tyt+O!+iPAGi&P zvNh5NxhOqq@yIV=6lxI-8!cK+QH5@UK`TXhqiekkk$9cUVBy6}krA%w>qkMoa*bjV ztU?MRMI#O4n66Mb{MGx5F=6?mhUa6wd9e+{Q)6buw@O|(>W6xfS=_6dC1r@haXs-M z>$h}$M&X7 z;`~uY>T*8j%XKKNOgGYYlCL_RH5vcfA=i*qDd*)+hv%Z4@DX89-sm=Jp7I!{Fy+8) zWQy{9#59~?xs7Fb${n{*tmDcr{2?4rAMKNUeO1IP8}oYS%{7LmjFlEHma>fjb4XQk z#xPk*&R7+9YRIxJe=2CrC2|_unA?_qpfv%#$K@v4c3pp4bCLI~DCU zC3T>B`Y|c5s+_8=c!O?F>l=&q;HB~6OOw{%uE}Y*0CO;>xI(&RK=4NRfq3W z7*-&Omfp68T+^L0Mt#3p$mb^}CVI3<^*HWGe#PL@{#9%Hi*t*;J^rMx>*Ojt7O9ZD zJLuiox2%m-#o07!m9ld63p{a1b)Lm#vnuKp%hQh>*BY@16>>&djVoImVRYX+Uv$6B z9LL37+GA{-vT4$6D47;byrdB_+qT(La(Zf;^r$r(RWKSEOIEL5yRIk=QC!o1QUBs~ zgR3qmt{Yt0zos})99*?@#rkFa%cOS&R z4)`{F3s%5#_#1KiU*QG#BRl{*;rnnWTnd}PhG-qY;`=dhG#mgKc!jur4{V3qA&T{% zAs&Ak9)(>H#q;y|el{Ejv)};8z}~}%b>VmLTX+(lfUDrk@CEohEQbqW5C-6R;_~NU zH#`N`!L_gvHo!7i3QJ%yl$IUc!zC!}X-tdJOXb?+0W{wZ<%!KINpW(pXprdCOMwV6 zp>!JN04Q0T=)iH*2ye*+^=-M*&1Yt5Gmv2d+%a9#qkU`&8zVg~@l28jB@&-!O z(;TnNMp~X?G8!uslv1g;(qa7m3iHp96W3f>`-63IfT@HlCBFYqxc z&P2Apf>6}$)m}@<92p+YSon2ea>8h-rbx=zVoiq}QalFXRO~6Ee?$Kon%%bbbX-z7 zW85;su(SPr2@NcTMW3i)J9VFEy(O$rQcu+OqFuuG7SK%7csf+55Vl==_#jnnHKw|$ z1tt=Axs$>ODI~dT%ag`OFMcI7V%=vCEKGjJm$n)Q~O9wTCL&<*D=HB0`hwFmV>{-&L zuJUUx(~XSIF;kGo^3l&2c_j#JTWqeck9C~aTDzizdqIg=y$;s;G^<1U#G(?KI}oq) z-%!^bU4?0kb)5?jo=l^gM{kJ@{5hlDvQROhb!_m`8jBVxgTkV<1leQAwHDL1a3ONW zbeFX6!CFih6@^;s5PUCcZLsz=r351y2$nX_uT@IXL|Gi>NAvbbr05QvD;nx_wk%V1 z&GwTP&|@`~QpYXVudx--e`8TcpHV*YG&m& zn+dVHO~0v{U)X4<=&ty^twxZr;xo`_gK4$dyVY73E>3TP4MgQ_ZI)VgSl+Wzg6g*_ zV1GS75+WvO&@rNiQZNZxOKvrP-8Z%ACQha@I<&1t)>nF>FM6XZCP^%3(-77w0PB|t z!v%#&{QoOr(SwM8CH_BOD#AzN|L5U3cpM&s?QlD6f{P(q4~WhJyhaTFDqIa$!D?6q zr@|?4FdPJXh~Zy`XW(gg7#@O~;Uq!kCLbFvoRz0K?+XQbk+teJE*pNol} z#INjCG!7@QRU;=Yk_B$gNiJE?sp=vN=S6Vp&gRb1o-e%TDDq z-q+T{Hg0?tGO literal 0 HcmV?d00001 diff --git a/include/boost/asio/basic_repeating_timer.hpp b/include/boost/asio/basic_repeating_timer.hpp new file mode 100644 index 0000000..cec9e34 --- /dev/null +++ b/include/boost/asio/basic_repeating_timer.hpp @@ -0,0 +1,178 @@ +#ifndef BOOST_ASIO_REPEATING_TIMER_H_INCLUDED +#define BOOST_ASIO_REPEATING_TIMER_H_INCLUDED + +// Adapted from the original, developed 2007 by David C. Wyles (http:///www.codegorilla.co.uk) +// released for public consumption under the same +// licensing policy as the boost library http://www.boost.org/LICENSE_1_0.txt +// +// most people will use the repeating_timer typedef for working with posix time +// +// Is is based on the basic_deadline_timer +// +// Updated 2021 by Michael Haben, for compatibility with Boost 1.77 / ASIO 1.18.2 +// - uses an executor instead of an io_service or io_context. + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace asio { + // basic repeating timer with start/stop semantics. + + template < typename Time, typename TimeTraits = boost::asio::time_traits< Time >, typename Executor = boost::asio::any_io_executor > + class basic_repeating_timer { + public: + typedef boost::asio::basic_deadline_timer< Time > timer_type; + + explicit basic_repeating_timer(Executor ex) : ex_(ex) {} + + ~basic_repeating_timer() { + stop(); + } + + template < typename WaitHandler > + void start(typename timer_type::duration_type const & repeat_interval, WaitHandler handler) { + boost::recursive_mutex::scoped_lock guard(lock_); + { + // cleanup code, cancel any existing timer + handler_.reset(); + if(timer_) { + timer_->cancel(); + } + timer_.reset(); + } + + // create new handler. + handler_.reset(new handler_impl< WaitHandler >(handler)); + // create new timer + timer_ = internal_timer::create(this->ex_, repeat_interval, handler_); + } + + void stop() { + boost::recursive_mutex::scoped_lock guard(lock_); + { + // cleanup code. + handler_.reset(); + if(timer_) { + timer_->cancel(); + } + timer_.reset(); + } + } + + void cancel() { + stop(); + } + + // changes the interval the next time the timer is fired. + void change_interval(typename timer_type::duration_type const & repeat_interval) { + boost::recursive_mutex::scoped_lock guard(lock_); + if(timer_) { + timer_->change_interval(repeat_interval); + } + } + + private: + const Executor & ex_; + boost::recursive_mutex lock_; + class internal_timer; + typedef boost::shared_ptr< internal_timer > internal_timer_ptr; + typedef boost::asio::executor executor_type; + internal_timer_ptr timer_; + + class handler_base; + boost::shared_ptr< handler_base > handler_; + + class handler_base { + public: + virtual ~handler_base() {} + virtual void handler(boost::system::error_code const &) = 0; + }; + + template < typename HandlerFunc > + class handler_impl : public handler_base { + public: + handler_impl(HandlerFunc func) : handler_func_(func) {} + virtual void handler(boost::system::error_code const & result) { + handler_func_(result); + } + HandlerFunc handler_func_; + }; + + class internal_timer : public boost::enable_shared_from_this< internal_timer > { + public: + static internal_timer_ptr create(const Executor & ex, + typename timer_type::duration_type const & repeat_interval, + boost::shared_ptr< handler_base > const & handler) { + internal_timer_ptr timer(new internal_timer(ex)); + timer->start(repeat_interval, handler); + return timer; + } + + void cancel() { + boost::recursive_mutex::scoped_lock guard(lock_); + timer_.cancel(); + } + + void change_interval(typename timer_type::duration_type const & repeat_interval) { + boost::recursive_mutex::scoped_lock guard(lock_); + interval_ = repeat_interval; + } + + private: + timer_type timer_; + boost::weak_ptr< handler_base > handler_; + typename timer_type::duration_type interval_; + boost::recursive_mutex lock_; + + internal_timer(boost::asio::any_io_executor ex) : timer_(ex) {} + + void start(typename timer_type::duration_type const & repeat_interval, boost::shared_ptr< handler_base > const & handler) { + // only EVER called once, via create + interval_ = repeat_interval; + handler_ = handler; + + timer_.expires_from_now(interval_); + timer_.async_wait(boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error)); + } + + void handle_timeout(boost::system::error_code const & error) { + // we lock in the timeout to block the cancel operation until this timeout completes + boost::recursive_mutex::scoped_lock guard(lock_); + { + // do the fire. + boost::shared_ptr< handler_base > Handler = handler_.lock(); + if(Handler) { + try { + Handler->handler(error); + } catch(std::exception const & e) { + // consume for now, no much else we can do, we don't want to damage the + // io_context thread + ( void ) e; + } + } + } + + if(!error) { + // check if we need to reschedule. + boost::shared_ptr< handler_base > Handler = handler_.lock(); + if(Handler) { + timer_.expires_from_now(interval_); + timer_.async_wait( + boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error)); + } + } + } + }; + }; + + typedef basic_repeating_timer< boost::posix_time::ptime > repeating_timer; +} // namespace asio +} // namespace boost + +#endif