corutines implementation test

This commit is contained in:
Bartosz Wieczorek 2024-11-05 10:41:40 +01:00
parent 8be725916c
commit 10c36bc30c
13 changed files with 664 additions and 167 deletions

View File

@ -4,10 +4,45 @@ project(Ranczo-IO)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(Protobuf REQUIRED)
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# find_package(Protobuf REQUIRED)
# include_directories(${Protobuf_INCLUDE_DIRS})
find_package(cppzmq)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
set(BOOST_ROOT /usr/local)
find_package(Boost REQUIRED COMPONENTS system json)
include(FetchContent)
# spdlog
FetchContent_Declare(
spdlog
GIT_REPOSITORY https://github.com/gabime/spdlog
GIT_TAG v1.14.1
GIT_SHALLOW TRUE
)
FetchContent_GetProperties(spdlog)
if(NOT spdlog_POPULATED)
FetchContent_Populate(spdlog)
# set(SPDLOG_FMT_EXTERNAL_HO ON)
set(SPDLOG_USE_STD_FORMAT ON)
endif()
add_subdirectory(
${spdlog_SOURCE_DIR}
${spdlog_BINARY_DIR}
)
FetchContent_Declare(
asyncmqtt5
GIT_REPOSITORY https://github.com/mireo/async-mqtt5
GIT_TAG master
GIT_SHALLOW TRUE
)
set(MQTT_BUILD_TESTS OFF)
set(MQTT_BUILD_EXAMPLES OFF)
set(async-mqtt5_INCLUDES_WITH_SYSTEM OFF)
FetchContent_MakeAvailable(asyncmqtt5)
add_subdirectory(floorheat_hub)

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE QtCreatorProject>
<!-- Written by QtCreator 14.0.1, 2024-09-26T11:10:15. -->
<!-- Written by QtCreator 14.0.1, 2024-11-05T07:28:35. -->
<qtcreator>
<data>
<variable>EnvironmentId</variable>
@ -109,15 +109,15 @@
<value type="int" key="CMake.Configure.BaseEnvironment">2</value>
<value type="bool" key="CMake.Configure.ClearSystemEnvironment">false</value>
<valuelist type="QVariantList" key="CMake.Configure.UserEnvironmentChanges"/>
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_BUILD_TYPE:STRING=Debug
-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable}
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake
-DCMAKE_GENERATOR:STRING=Ninja
-DCMAKE_BUILD_TYPE:STRING=Debug
-DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}</value>
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake</value>
<value type="QString" key="ProjectExplorer.BuildConfiguration.BuildDirectory">/home/bartoszek/builds/build-ranczo-io-GCC_12-Debug</value>
<valuemap type="QVariantMap" key="ProjectExplorer.BuildConfiguration.BuildStepList.0">
<valuemap type="QVariantMap" key="ProjectExplorer.BuildStepList.Step.0">
@ -166,15 +166,15 @@
<value type="int" key="CMake.Configure.BaseEnvironment">2</value>
<value type="bool" key="CMake.Configure.ClearSystemEnvironment">false</value>
<valuelist type="QVariantList" key="CMake.Configure.UserEnvironmentChanges"/>
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_BUILD_TYPE:STRING=Release
-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable}
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake
-DCMAKE_GENERATOR:STRING=Ninja
-DCMAKE_BUILD_TYPE:STRING=Release
-DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}</value>
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake</value>
<value type="QString" key="ProjectExplorer.BuildConfiguration.BuildDirectory">/home/bartoszek/builds/build-ranczo-io-GCC_12-Release</value>
<valuemap type="QVariantMap" key="ProjectExplorer.BuildConfiguration.BuildStepList.0">
<valuemap type="QVariantMap" key="ProjectExplorer.BuildStepList.Step.0">
@ -223,15 +223,15 @@
<value type="int" key="CMake.Configure.BaseEnvironment">2</value>
<value type="bool" key="CMake.Configure.ClearSystemEnvironment">false</value>
<valuelist type="QVariantList" key="CMake.Configure.UserEnvironmentChanges"/>
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo
-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable}
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake
-DCMAKE_GENERATOR:STRING=Ninja
-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo
-DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}</value>
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake</value>
<value type="QString" key="ProjectExplorer.BuildConfiguration.BuildDirectory">/home/bartoszek/builds/build-ranczo-io-GCC_12-RelWithDebInfo</value>
<valuemap type="QVariantMap" key="ProjectExplorer.BuildConfiguration.BuildStepList.0">
<valuemap type="QVariantMap" key="ProjectExplorer.BuildStepList.Step.0">
@ -278,15 +278,15 @@
<value type="int" key="CMake.Configure.BaseEnvironment">2</value>
<value type="bool" key="CMake.Configure.ClearSystemEnvironment">false</value>
<valuelist type="QVariantList" key="CMake.Configure.UserEnvironmentChanges"/>
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo
-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable}
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake
-DCMAKE_GENERATOR:STRING=Ninja
-DCMAKE_BUILD_TYPE:STRING=RelWithDebInfo
-DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}</value>
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake</value>
<value type="int" key="EnableQmlDebugging">0</value>
<value type="QString" key="ProjectExplorer.BuildConfiguration.BuildDirectory">/home/bartoszek/builds/build-ranczo-io-GCC_12-Profile</value>
<valuemap type="QVariantMap" key="ProjectExplorer.BuildConfiguration.BuildStepList.0">
@ -334,15 +334,15 @@
<value type="int" key="CMake.Configure.BaseEnvironment">2</value>
<value type="bool" key="CMake.Configure.ClearSystemEnvironment">false</value>
<valuelist type="QVariantList" key="CMake.Configure.UserEnvironmentChanges"/>
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_BUILD_TYPE:STRING=MinSizeRel
-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
<value type="QString" key="CMake.Initial.Parameters">-DCMAKE_PREFIX_PATH:PATH=%{Qt:QT_INSTALL_PREFIX}
-DQT_QMAKE_EXECUTABLE:FILEPATH=%{Qt:qmakeExecutable}
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake
-DCMAKE_GENERATOR:STRING=Ninja
-DCMAKE_BUILD_TYPE:STRING=MinSizeRel
-DCMAKE_CXX_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}</value>
-DCMAKE_C_COMPILER_LAUNCHER:UNINITIALIZED=distcc
-DCMAKE_CXX_COMPILER:FILEPATH=%{Compiler:Executable:Cxx}
-DCMAKE_C_COMPILER:FILEPATH=%{Compiler:Executable:C}
-DCMAKE_PROJECT_INCLUDE_BEFORE:FILEPATH=%{BuildConfig:BuildDirectory:NativeFilePath}/.qtc/package-manager/auto-setup.cmake</value>
<value type="QString" key="ProjectExplorer.BuildConfiguration.BuildDirectory">/home/bartoszek/builds/build-ranczo-io-GCC_12-MinSizeRel</value>
<valuemap type="QVariantMap" key="ProjectExplorer.BuildConfiguration.BuildStepList.0">
<valuemap type="QVariantMap" key="ProjectExplorer.BuildStepList.Step.0">

View File

@ -1,14 +1,16 @@
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
../proto/message.proto
../proto/heating.proto
../proto/ACPower.proto
../proto/ACPowerInternal.proto
)
# protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
# ../proto/message.proto
# ../proto/heating.proto
# ../proto/ACPower.proto
# ../proto/ACPowerInternal.proto
# )
add_executable(ranczo-io_floorheating
main.cpp
heater.cpp
relay.cpp
mqtt_client.cpp mqtt_client.hpp
timer.cpp timer.hpp
${PROTO_SRCS}
)
@ -19,5 +21,9 @@ target_include_directories(ranczo-io_floorheating
target_link_libraries(ranczo-io_floorheating
PUBLIC
${Protobuf_LIBRARIES}
# ${Protobuf_LIBRARIES}
Async::MQTT5
Boost::system
Boost::json
spdlog::spdlog
)

View File

@ -1,4 +1,4 @@
#include "heater.hpp"
/*
* TODO

View File

@ -0,0 +1,12 @@
#pragma once
namespace ranczo{
class IHeater {
public:
virtual ~IHeater() = default;
virtual void setTargetTemperature(double temperatureC) = 0;
virtual void setMaxHisteresis(double temperatureC) = 0;
};
}

View File

@ -1,154 +1,160 @@
#include <algorithm>
#include <array>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <google/protobuf/stubs/strutil.h>
#include <google/protobuf/wire_format_lite.h>
#include <iostream>
#include <iterator>
#include <ostream>
#include <string>
#include <type_traits>
#include "timer.hpp"
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/this_coro.hpp>
#include <memory>
#include <memory_resource>
#include <spdlog/spdlog.h>
#include <boost/asio.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/basic_repeating_timer.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/system/error_code.hpp>
#include <vector>
#include <format>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/util/json_util.h>
namespace ranczo {
#include <message.pb.h>
#include <heating.pb.h>
#include <ACPower.pb.h>
struct CANID {
u_int8_t priority : 2; // 4 priority level
u_int16_t id : 11; // 2048 id's per device
u_int8_t sourceAddress : 8; // 255 devices max 0xff -> broadcast
u_int8_t destinationAddress : 8;
class IHeater {
public:
virtual ~IHeater() = default;
};
static_assert((2 + 11 + 8 + 8) <= 29);
constexpr std::size_t maxDataAtMessage(std::size_t msgNum, std::size_t maxFrameSize, uint32_t type) {
if(msgNum == 1) {
return maxFrameSize - 2 - google::protobuf::internal::WireFormatLite::UInt32Size(type) - 1;
} else if(msgNum > 1 and msgNum <= 128) {
return maxFrameSize - 4;
} else if(msgNum > 128 and msgNum <= 16384) {
return maxFrameSize - 5;
} else {
return maxFrameSize - 6;
}
}
class HeaterBase : public IHeater {
private:
boost::asio::repeating_timer timer;
constexpr std::size_t msgnum(int32_t datasize, std::size_t maxFrameSize, uint32_t type) {
std::size_t messages = 0;
protected:
double _targetTemperature;
double _histeresis;
while(datasize > 0) {
messages++;
datasize -= maxDataAtMessage(messages, maxFrameSize, type);
}
public:
virtual void tick() {
spdlog::info("ping");
};
return messages;
}
template < typename T >
struct IncrementAtExit {
T & value;
constexpr ~IncrementAtExit() {
value++;
HeaterBase(boost::asio::any_io_executor & io_executor) : timer{io_executor} {
timer.start(boost::posix_time::seconds{1}, [&](const boost::system::error_code & e) {
// TODO error handling
spdlog::debug("Running timer");
this->tick();
});
}
};
std::vector< Frame > encapsulate(const google::protobuf::MessageLite & message, uint32_t type) {
std::vector< Frame > frames;
auto data = message.SerializeAsString();
frames.resize(msgnum(data.size(), 64, type));
class IRelay {
public:
virtual ~IRelay() = default;
virtual void setState(bool on) = 0;
virtual void state() = 0;
};
const char * ptr = reinterpret_cast< const char * >(data.data());
const char * end = reinterpret_cast< const char * >(data.data() + data.size());
class ResistiveFloorHeater : public HeaterBase {};
int current = 0;
for(auto & frame : frames) {
IncrementAtExit _{current};
frame.set_parts(current);
auto bytesToCopy = std::distance(ptr, std::min(ptr + maxDataAtMessage(current + 1, 64, type), end));
class ModbusRelay : public IRelay {};
frame.set_data(ptr, bytesToCopy);
/// TODO
/// * Przypisanie prze³±cznika do maty grzewczej
/// * Zapis danych w DB
/// * Zapis ustawieñ
/// * Nas³uchiwanie na MQTT
ptr += bytesToCopy;
}
} // namespace ranczo
// Modified completion token that will prevent co_await from throwing exceptions.
using namespace std::chrono_literals;
using boost::asio::yield_context;
using timer = boost::asio::steady_timer;
static auto now = timer::clock_type::now;
// struct X : private boost::noncopyable {
// int _id;
// boost::asio::any_io_executor _executor;
// std::string _topic;
frames[0].set_type(type);
return frames;
}
// X(boost::asio::any_io_executor io_executor, int id, std::string topic) : _executor{io_executor}, _c{_executor}, _id{id}, _topic{topic} {
// spdlog::info("X for id/topic {}/{} created", _id, _topic);
// spdlog::info("timer for id {} start", _id);
// co_spawn(_executor, run_timer_id(), boost::asio::detached);
std::vector< uint8_t > genData(std::size_t size) {
std::vector< uint8_t > data;
data.resize(size);
std::fill(data.begin(), data.end(), uint8_t{0xde});
return data;
}
// spdlog::info("mqtt for id/topic {}/{} start", _id, _topic);
// co_spawn(_executor, subscribe_and_receive(_c, _topic), boost::asio::detached);
// }
template < typename T >
void set_arg(PrintfMessage & msg, T && t) {
using X = std::remove_cvref_t< T >;
auto * arg = msg.add_arguments();
// ~X() {
// spdlog::info("X {} destroyed", _id);
// }
if constexpr(std::is_integral_v< X >) {
arg->set_int_val(t);
return;
}
// boost::asio::awaitable< void > run_timer_id() {
// auto executor = co_await boost::asio::this_coro::executor;
// int counter{0};
// boost::asio::steady_timer timer{executor};
if constexpr(std::is_same_v< float, X >) {
arg->set_float_val(t);
return;
}
// while(true) {
// timer.expires_after(std::chrono::seconds(1));
// co_await timer.async_wait(boost::asio::use_awaitable);
// spdlog::info("Hello from {}:{}", counter++, _id);
// }
if constexpr(std::is_same_v< double, X >) {
arg->set_double_val(t);
return;
}
// co_return;
// }
// };
if constexpr(std::is_same_v< const char *, X >) {
arg->set_string_val(t);
return;
}
}
#include "mqtt_client.hpp"
void canbud_send(std::vector< Frame > && frames) {}
#include <boost/json.hpp>
template < typename... Args >
void canbus_log(int severity, std::string fmt, Args... args) {
PrintfMessage msg;
msg.set_severity(severity);
msg.set_format_string(std::move(fmt));
(set_arg(msg, args), ...);
canbud_send(encapsulate(msg, 0));
boost::asio::awaitable<void> spawn(ranczo::AsyncMqttClient & c1){
spdlog::info("SPAWN subscribe");
co_await c1.subscribe("home/corridor/floor/temperature",[](const boost::json::value &){
spdlog::info("cb called");
});
spdlog::info("SPAWN listen");
co_await c1.listen();
spdlog::info("SPAWN return");
co_return;
}
int main() {
std::ostream_iterator< char > out(std::cout);
canbus_log(0, "Debug {}:{}:{}", __FILE_NAME__, __PRETTY_FUNCTION__, __LINE__);
acpowercontrol::ACPower m;
auto & ac = *m.mutable_set_channel_request();
ac.set_channel(7);
auto &ps = *ac.mutable_swipe();
ps.set_start_percentage(0);
ps.set_end_percentage(1000);
ps.set_duration_ms(3200);
ps.set_function(acpowercontrol::internal::ACPowerSwipe_Function::ACPowerSwipe_Function_EaseInOut);
std::string o;
auto frames = encapsulate(m, SetACPowerChannelControlID);
std::cout << frames.at(0).SerializeAsString().size() << '\n';
spdlog::set_level(spdlog::level::trace);
std::vector< std::unique_ptr< ranczo::IHeater > > _heaters;
boost::asio::io_service io_service;
/// Strand powoduje ¿e zadania do niego przypisane zostaj± wykonane sekwencyjnie,
/// get_executor pobrany z io_service nie daje takiej mo¿liwo¶ci i wtedy mo¿na wykonywaæ zadania równloegle
// boost::asio::any_io_executor io_executor = io_service.get_executor();
boost::asio::any_io_executor io_executor = boost::asio::make_strand(io_service);
char buffer[2048];
std::pmr::monotonic_buffer_resource mbr(buffer, 2048, std::pmr::null_memory_resource());
// "home/corridor/floor/temperature"
// "home/utilityRoom/floor/temperature"
// "home/wardrobe/floor/temperature"
// ranczo::AsyncMqttClient c1{io_executor};
// c1.add_subscribtion("home/corridor/floor/temperature", [](const boost::json::value &) { spdlog::info("cb called"); });
ranczo::PeriodicTimer timer{io_executor, std::chrono::seconds{1}, []() { spdlog::info("timer called"); }};
// xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 1, "home/corridor/floor/temperature"));
// xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 2, "home/utilityRoom/floor/temperature"));
// xes.emplace_back(std::allocate_shared< X >(alloc, io_executor, 3, "home/wardrobe/floor/temperature"));
boost::asio::io_service::work work(io_service);
io_service.run();
return 0;
}

View File

@ -0,0 +1,136 @@
#include "mqtt_client.hpp"
#include <boost/asio.hpp>
#include <boost/json.hpp>
#include <memory>
#include <spdlog/spdlog.h>
#include <async_mqtt5.hpp>
#include <unordered_map>
namespace ranczo {
constexpr auto use_nothrow_awaitable = boost::asio::as_tuple(boost::asio::use_awaitable);
using client_type = async_mqtt5::mqtt_client< boost::asio::ip::tcp::socket >;
struct AsyncMqttClient::AsyncMqttClientImpl {
const boost::asio::any_io_executor & _executor;
client_type _mqtt_client;
std::unordered_map< std::string, std::function< void(const boost::json::value &) > > _callbacks;
AsyncMqttClientImpl(const boost::asio::any_io_executor & executor) : _executor{executor}, _mqtt_client{_executor} {
spdlog::trace("Creating mqtt client");
// Configure the Client.
// It is mandatory to call brokers() and async_run() to configure the Brokers to connect to and start the Client.
_mqtt_client
.brokers("192.168.100.6", 1883) // Broker that we want to connect to. 1883 is the default TCP port.
.async_run(boost::asio::detached); // Start the client.
start();
spdlog::trace("Creating mqtt client done");
}
~AsyncMqttClientImpl() = default;
boost::asio::awaitable< bool > subscribe(std::string_view topic) {
// Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{topic.data(),
async_mqtt5::subscribe_options{
async_mqtt5::qos_e::exactly_once, // All messages will arrive at QoS 2.
async_mqtt5::no_local_e::no, // Forward message from Clients with same ID.
async_mqtt5::retain_as_published_e::retain, // Keep the original RETAIN flag.
async_mqtt5::retain_handling_e::send // Send retained messages when the subscription is established.
}};
// Subscribe to a single Topic.
auto && [ec, sub_codes, sub_props] =
co_await _mqtt_client.async_subscribe(sub_topic, async_mqtt5::subscribe_props{}, use_nothrow_awaitable);
// Note: you can subscribe to multiple Topics in one mqtt_client::async_subscribe call.
// An error can occur as a result of:
// a) wrong subscribe parameters
// b) mqtt_client::cancel is called while the Client is in the process of subscribing
if(ec)
spdlog::error("Subscribe error occurred: {}", ec.message());
else
spdlog::info("Result of subscribe request: {}", sub_codes[0].message());
co_return !ec && !sub_codes[0]; // True if the subscription was successfully established.
}
/// TODO should be const
boost::asio::awaitable< void > listen() {
spdlog::trace("listen invoked");
for(;;) {
// Receive an Appplication Message from the subscribed Topic(s).
auto && [ec, topic, payload, publish_props] = co_await _mqtt_client.async_receive(use_nothrow_awaitable);
if(ec == async_mqtt5::client::error::session_expired) {
/// TODO connect first, then subscribe to all topics
// The Client has reconnected, and the prior session has expired.
// As a result, any previous subscriptions have been lost and must be reinstated.
if(co_await subscribe(topic))
continue;
else
break;
} else if(ec)
break;
spdlog::info("Received message from the Broker topic {}, payload {}", topic, payload);
auto value = boost::json::parse(payload); /// TODO pass memory resource
if(_callbacks.contains(topic)) {
/// TODO topic filtering as part of unordered map?
/// unordered map can map opny one thing, maybe vector would be better?
spdlog::info("Callback for topic {} found, executing", topic);
_callbacks.at(topic)(value); /// TODO pass topic to callbacl?
} else {
spdlog::info("Callback for topic {} not found", topic);
}
}
co_return;
}
void start() {
spdlog::trace("co_spawn mqtt client");
boost::asio::co_spawn(_mqtt_client.get_executor(), listen(), boost::asio::detached);
}
};
AsyncMqttClient::AsyncMqttClient(const boost::asio::any_io_executor & executor)
: _impl{std::make_unique< AsyncMqttClient::AsyncMqttClientImpl >(executor)} {}
AsyncMqttClient::awaitable< bool > AsyncMqttClient::subscribe(std::string_view topic,
std::function< void(const boost::json::value &) > cb) {
assert(_impl);
spdlog::trace("subscribtion to {} started", topic);
if(!(co_await _impl->subscribe(topic))) {
spdlog::error("subscribtion to {} failed", topic);
co_return false;
}
spdlog::trace("subscribtion to {} ok, registering callback", topic);
_impl->_callbacks[std::string{topic.data()}] = cb;
co_return true;
}
bool AsyncMqttClient::add_subscribtion(std::string_view topic, std::function< void(const boost::json::value &) > cb) {
assert(_impl);
co_spawn(_impl->_mqtt_client.get_executor(),
subscribe(topic, [](const boost::json::value &) { spdlog::info("cb called"); }),
boost::asio::detached);
return true;
}
AsyncMqttClient::awaitable< void > AsyncMqttClient::listen() const {
assert(_impl);
spdlog::info("client listen start");
return _impl->listen();
}
AsyncMqttClient::~AsyncMqttClient() = default;
} // namespace ranczo

View File

@ -0,0 +1,35 @@
#pragma once
#include <boost/asio/awaitable.hpp>
#include <memory>
#include <string_view>
namespace boost::json {
class value;
}
namespace boost::asio {
class any_io_executor;
}
namespace ranczo {
class AsyncMqttClient {
public:
template < typename T >
using awaitable = boost::asio::awaitable< T >;
using executor = boost::asio::any_io_executor;
struct AsyncMqttClientImpl;
std::unique_ptr< AsyncMqttClientImpl > _impl;
AsyncMqttClient(const executor & executor);
~AsyncMqttClient();
awaitable< bool > subscribe(std::string_view topic, std::function< void(const boost::json::value & value) >);
bool add_subscribtion(std::string_view topic, std::function< void(const boost::json::value & value) >cb);
awaitable< void > listen() const;
};
} // namespace ranczo

View File

@ -0,0 +1,3 @@

58
floorheat_hub/timer.cpp Normal file
View File

@ -0,0 +1,58 @@
#include "timer.hpp"
#include "spdlog/spdlog.h"
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <chrono>
#include <functional>
#include <memory>
namespace ranczo {
struct PeriodicTimer::Impl {
private:
std::chrono::milliseconds _interval;
boost::asio::steady_timer _timer;
std::function< void() > _callback;
public:
Impl(boost::asio::any_io_executor & io_context, std::chrono::milliseconds interval, std::function< void() > cb)
: _interval{interval}, _timer(io_context, _interval), _callback{std::move(cb)} {
start();
}
// Coroutine function to handle the timer expiration
boost::asio::awaitable< void > timerCoroutine() {
assert(_callback);
try {
while(true) {
// call my callback
_callback();
// Wait for the timer to expire
co_await _timer.async_wait(boost::asio::use_awaitable);
// Reset the timer for the next interval
_timer.expires_after(_interval);
}
} catch(const std::exception & e) {
spdlog::error("Error: {}", e.what());
}
}
void start() {
spdlog::trace("co_spawn timer");
// Start the coroutine by invoking the timerCoroutine() with boost::asio::co_spawn
boost::asio::co_spawn(_timer.get_executor(), timerCoroutine(), boost::asio::detached);
}
};
PeriodicTimer::PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb)
: _impl{std::make_unique< Impl >(executor, period, std::move(cb))} {}
PeriodicTimer::~PeriodicTimer() = default;
} // namespace ranczo

28
floorheat_hub/timer.hpp Normal file
View File

@ -0,0 +1,28 @@
#pragma once
#include <boost/asio/awaitable.hpp>
#include <chrono>
#include <functional>
#include <memory>
namespace boost::asio {
class any_io_executor;
}
namespace ranczo {
class PeriodicTimer {
public:
template < typename T >
using awaitable = boost::asio::awaitable< T >;
using executor = boost::asio::any_io_executor;
struct Impl;
std::unique_ptr< Impl > _impl;
PeriodicTimer(executor & executor, std::chrono::milliseconds period, std::function< void() > cb);
~PeriodicTimer();
};
} // namespace ranczo

Binary file not shown.

View File

@ -0,0 +1,178 @@
#ifndef BOOST_ASIO_REPEATING_TIMER_H_INCLUDED
#define BOOST_ASIO_REPEATING_TIMER_H_INCLUDED
// Adapted from the original, developed 2007 by David C. Wyles (http:///www.codegorilla.co.uk)
// released for public consumption under the same
// licensing policy as the boost library http://www.boost.org/LICENSE_1_0.txt
//
// most people will use the repeating_timer typedef for working with posix time
//
// Is is based on the basic_deadline_timer
//
// Updated 2021 by Michael Haben, for compatibility with Boost 1.77 / ASIO 1.18.2
// - uses an executor instead of an io_service or io_context.
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/weak_ptr.hpp>
namespace boost {
namespace asio {
// basic repeating timer with start/stop semantics.
template < typename Time, typename TimeTraits = boost::asio::time_traits< Time >, typename Executor = boost::asio::any_io_executor >
class basic_repeating_timer {
public:
typedef boost::asio::basic_deadline_timer< Time > timer_type;
explicit basic_repeating_timer(Executor ex) : ex_(ex) {}
~basic_repeating_timer() {
stop();
}
template < typename WaitHandler >
void start(typename timer_type::duration_type const & repeat_interval, WaitHandler handler) {
boost::recursive_mutex::scoped_lock guard(lock_);
{
// cleanup code, cancel any existing timer
handler_.reset();
if(timer_) {
timer_->cancel();
}
timer_.reset();
}
// create new handler.
handler_.reset(new handler_impl< WaitHandler >(handler));
// create new timer
timer_ = internal_timer::create(this->ex_, repeat_interval, handler_);
}
void stop() {
boost::recursive_mutex::scoped_lock guard(lock_);
{
// cleanup code.
handler_.reset();
if(timer_) {
timer_->cancel();
}
timer_.reset();
}
}
void cancel() {
stop();
}
// changes the interval the next time the timer is fired.
void change_interval(typename timer_type::duration_type const & repeat_interval) {
boost::recursive_mutex::scoped_lock guard(lock_);
if(timer_) {
timer_->change_interval(repeat_interval);
}
}
private:
const Executor & ex_;
boost::recursive_mutex lock_;
class internal_timer;
typedef boost::shared_ptr< internal_timer > internal_timer_ptr;
typedef boost::asio::executor executor_type;
internal_timer_ptr timer_;
class handler_base;
boost::shared_ptr< handler_base > handler_;
class handler_base {
public:
virtual ~handler_base() {}
virtual void handler(boost::system::error_code const &) = 0;
};
template < typename HandlerFunc >
class handler_impl : public handler_base {
public:
handler_impl(HandlerFunc func) : handler_func_(func) {}
virtual void handler(boost::system::error_code const & result) {
handler_func_(result);
}
HandlerFunc handler_func_;
};
class internal_timer : public boost::enable_shared_from_this< internal_timer > {
public:
static internal_timer_ptr create(const Executor & ex,
typename timer_type::duration_type const & repeat_interval,
boost::shared_ptr< handler_base > const & handler) {
internal_timer_ptr timer(new internal_timer(ex));
timer->start(repeat_interval, handler);
return timer;
}
void cancel() {
boost::recursive_mutex::scoped_lock guard(lock_);
timer_.cancel();
}
void change_interval(typename timer_type::duration_type const & repeat_interval) {
boost::recursive_mutex::scoped_lock guard(lock_);
interval_ = repeat_interval;
}
private:
timer_type timer_;
boost::weak_ptr< handler_base > handler_;
typename timer_type::duration_type interval_;
boost::recursive_mutex lock_;
internal_timer(boost::asio::any_io_executor ex) : timer_(ex) {}
void start(typename timer_type::duration_type const & repeat_interval, boost::shared_ptr< handler_base > const & handler) {
// only EVER called once, via create
interval_ = repeat_interval;
handler_ = handler;
timer_.expires_from_now(interval_);
timer_.async_wait(boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error));
}
void handle_timeout(boost::system::error_code const & error) {
// we lock in the timeout to block the cancel operation until this timeout completes
boost::recursive_mutex::scoped_lock guard(lock_);
{
// do the fire.
boost::shared_ptr< handler_base > Handler = handler_.lock();
if(Handler) {
try {
Handler->handler(error);
} catch(std::exception const & e) {
// consume for now, no much else we can do, we don't want to damage the
// io_context thread
( void ) e;
}
}
}
if(!error) {
// check if we need to reschedule.
boost::shared_ptr< handler_base > Handler = handler_.lock();
if(Handler) {
timer_.expires_from_now(interval_);
timer_.async_wait(
boost::bind(&internal_timer::handle_timeout, this->shared_from_this(), boost::asio::placeholders::error));
}
}
}
};
};
typedef basic_repeating_timer< boost::posix_time::ptime > repeating_timer;
} // namespace asio
} // namespace boost
#endif