From f50d9c7b480aa5bcc4c3be0f6da5aa3661d07d5d Mon Sep 17 00:00:00 2001 From: Bartosz Wieczorek Date: Thu, 24 Apr 2025 16:31:47 +0200 Subject: [PATCH] add wireid type --- rims_app/src/id.hpp | 125 ++++++++++++++ rims_app/src/main.cpp | 19 +-- rims_app/src/messenger.cpp | 197 ++++++++++------------ rims_app/src/messenger.hpp | 22 +-- rims_app/src/temperature_measurements.hpp | 2 +- rims_app/src/uart.cpp | 2 +- rims_app/src/uart.hpp | 2 +- 7 files changed, 235 insertions(+), 134 deletions(-) create mode 100644 rims_app/src/id.hpp diff --git a/rims_app/src/id.hpp b/rims_app/src/id.hpp new file mode 100644 index 00000000000..52811a2a856 --- /dev/null +++ b/rims_app/src/id.hpp @@ -0,0 +1,125 @@ +#pragma once + +#include + +namespace rims { + +/** + * @brief The ID class + * ID is max 29 bytes. + */ + +enum Priority { + High = 0x7, // + Mid = 0x3, + Low = 0 +}; +enum Type { + Broadcast = 0, // + Request = 1, + Response = 2 +}; + +struct WireFormatID { + int32_t reserved : 6 {0}; // bits 5..0 + uint32_t sender : 9 {0}; // bits 14..6 + uint32_t receiver : 9 {0}; // bits 23..15 + uint32_t type : 2 {0}; // bits 25..24 + uint32_t priority : 3 {0}; // bits 28..26 + uint32_t checksum : 3 {0}; // bits 31..29 (MSBs) + + constexpr uint32_t toWire() const { + uint32_t raw = 0; + // raw |= (reserved & 0x3F); + raw |= (sender & 0x1FF) << 6; + raw |= (receiver & 0x1FF) << 15; + raw |= (type & 0x03) << 24; + raw |= (priority & 0x07) << 26; + + // Now compute and insert checksum + uint8_t cs = compute_checksum3(raw); + raw |= (cs & 0x07) << 29; + + return raw; + } + + // swap sender and receiver + constexpr WireFormatID &swapEdpoints() { + auto tmp = sender; + sender = receiver; + receiver = tmp; + return *this; + } + + constexpr WireFormatID &makeResponse() { + type = static_cast(Type::Response); + swapEdpoints(); + return *this; + } + + constexpr WireFormatID &makeBroadcastFrom(uint32_t sender) { + setSender(sender); + type = Type::Broadcast; + setPriority(Priority::Low); + return *this; + } + + constexpr WireFormatID &setSender(uint16_t id) { + sender = id; + return *this; + } + constexpr WireFormatID &setReceiver(uint16_t id) { + receiver = id; + return *this; + } + constexpr WireFormatID &setPriority(Priority prior) { + priority = static_cast(prior); + return *this; + } + constexpr WireFormatID &setType(Type t) { + type = static_cast(t); + return *this; + } + constexpr bool isValid() const { + uint32_t raw = 0; + // raw |= (reserved & 0x3F); + raw |= (sender & 0x1FF) << 6; + raw |= (receiver & 0x1FF) << 15; + raw |= (type & 0x03) << 24; + raw |= (priority & 0x07) << 26; + + // Now compute and insert checksum + uint8_t expected = compute_checksum3(raw); + return expected == checksum; + } + + constexpr WireFormatID &fromWire(uint32_t packed) { + // reserved = packed & 0x3F; // bits 0..5 + sender = (packed >> 6) & 0x1FF; // bits 6..14 + receiver = (packed >> 15) & 0x1FF; // bits 15..23 + type = (packed >> 24) & 0x03; // bits 24..25 + priority = (packed >> 26) & 0x07; // bits 26..28 + checksum = (packed >> 29) & 0x07; // bits 29..31 (MSBs) + return *this; + } + + private: + constexpr static uint8_t compute_checksum3(uint32_t raw_id) { + // Mask out top 3 bits — ensure we only use bits 0..28 + raw_id &= 0x1FFFFFFF; + + // Simple XOR folding to reduce to 3 bits + uint8_t checksum = 0; + + // Fold 29 bits into 3 using 3-bit chunks + for (int i = 0; i < 29; i += 3) { + checksum ^= (raw_id >> i) & 0x07; // XOR each 3-bit group + } + + return checksum & 0x07; // final 3-bit result + } +}; + +static_assert(sizeof(uint32_t) == sizeof(WireFormatID)); + +} // namespace rims diff --git a/rims_app/src/main.cpp b/rims_app/src/main.cpp index 46c5a5dcfa0..a60e5be58b5 100644 --- a/rims_app/src/main.cpp +++ b/rims_app/src/main.cpp @@ -72,19 +72,17 @@ gpio_dt_spec ch_1_zcd = GPIO_DT_SPEC_GET(DT_NODELABEL(ch1_zcd), gpios); gpio_dt_spec ch_2_zcd = GPIO_DT_SPEC_GET(DT_NODELABEL(ch2_zcd), gpios); // pompka void mythread_analyzer_cb(struct thread_analyzer_info *info) { - ULOG_DEBUG( - "thread name, memfree, cpu : %s, %d, %d", - info->name, - info->stack_size - info->stack_used, - info->utilization // cpu usage in % - ); +// ULOG_DEBUG( +// "thread name, memfree, cpu : %s, %d, %d", +// info->name, +// info->stack_size - info->stack_used, +// info->utilization // cpu usage in % +// ); } int main() { using namespace rims; - const auto wait = []() { std::this_thread::sleep_for(std::chrono::milliseconds{100}); }; - messengerTread.init(messengerStack); uartThread.init(uartStack); temperatureSampler.init(temperatureSamplerStack); @@ -93,23 +91,18 @@ int main() { messengerTread->init_hw(); messengerTread->start(); - wait(); uartThread->init_hw(); uartThread->start(); - wait(); temperatureSampler->init_hw(); temperatureSampler->start(); - wait(); zcd->init_hw(); zcd->start(); - wait(); phaseModulation->init_hw(); phaseModulation->start(); - wait(); zephyr::gpio::pin_configure(status, GPIO_OUTPUT_INACTIVE); zephyr::gpio::pin_configure(gprelay_1_en, GPIO_OUTPUT_INACTIVE); diff --git a/rims_app/src/messenger.cpp b/rims_app/src/messenger.cpp index 0fd053f386e..b86e0849f50 100644 --- a/rims_app/src/messenger.cpp +++ b/rims_app/src/messenger.cpp @@ -17,6 +17,8 @@ #include "proto/message.pb.h" #include "proto/temperature.pb.h" +#include "id.hpp" + #include #include #include @@ -33,48 +35,13 @@ namespace rims { K_MSGQ_DEFINE(messenger_buffer_arrived_queue, sizeof(rims::buffer), 2, 1); +namespace { -/* -def encode_id(id_value: int) -> int: - """Encodes a 29-bit ID by adding a 3-bit checksum at the end.""" - data = id_value & 0x1FFFFFFF # Mask to ensure only 29 bits - checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7 # Reduce to 3 bits - return (data << 3) | checksum # Store checksum in the last 3 bits - - -def verify_id(received_id: int) -> bool: - """Verifies if the received ID has a valid checksum.""" - data = received_id >> 3 # Extract the first 29 bits - received_checksum = received_id & 0x7 # Extract the last 3 bits (checksum) - - # Recalculate checksum from the extracted 29-bit data - calculated_checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7 - - return received_checksum == calculated_checksum # Return True if checksum matches - - */ - -static constexpr bool idIsInvalid(uint32_t receivedID) { - // uint32_t data = receivedID >> 3; // Extract the first 29 bits - // uint32_t receivedChecksum = receivedID & 0x7; // Extract the last 3 bits (checksum) - - // // Recalculate checksum from the extracted 29-bit data - // uint32_t calculatedChecksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7; - - // return receivedChecksum == calculatedChecksum; // Return true if checksum matches - return false; -} - -static bool crcIsInvalid(std::span data, std::uint32_t crc) { +bool crcIsInvalid(std::span data, std::uint32_t crc) { return zephyr::crc::crc32_ieee(data) != crc; } -// static uint32_t encodeID(uint32_t id) { -// /// TODO error handling -// uint32_t data = id & 0x1FFFFFFF; // Mask to get the first 29 bits -// uint32_t checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7; // Reduce to 3 bits -// return (data << 3) | checksum; // Store checksum in the last 3 bits -// } +} // namespace namespace cobs { class error : public std::exception {}; @@ -217,9 +184,10 @@ MessengerThread::MessengerThread(TStackBase &stack) : ZephyrThread{stack, 9, 0, zephyr::event_pool::k_init(_events.at(0), messenger_buffer_arrived_queue); for (auto &ar : _activeRequests) { - ar.type = -1; - ar.cb = nullptr; - ar.requestId = 0; + ar.wireId = {}; + ar.wireId.reserved = -1; // maark unused slot + ar.cb = {}; + ar.requestId = {}; } } @@ -245,50 +213,25 @@ void MessengerThread::threadMain() { } } -void rims::MessengerThread::event_temperatureEgress() { - temperatureEgressQueue.try_consume([&](temperature_EgressMessages &in) { - egressPussh(&in, temperature_EgressMessages_msg, 1, in.has_requestID ? std::optional{in.requestID} : std::nullopt); - }); -} - -void rims::MessengerThread::event_ctrlEgress() { - ctrlEgressQueue.try_consume([&](ctrl_EgressMessages &in) { - egressPussh(&in, ctrl_EgressMessages_msg, 2, in.has_requestID ? std::optional{in.requestID} : std::nullopt); - }); -} - -void rims::MessengerThread::event_configEgress() { - configEgressQueue.try_consume([&](config_EgressMessages &in) { - egressPussh(&in, config_EgressMessages_msg, 3, in.has_requestID ? std::optional{in.requestID} : std::nullopt); - }); -} - -void rims::MessengerThread::event_logEgress() { - logEgressFifoQueueBuffer.try_consume([&](log_EgressMessages &in) { - egressPussh(&in, log_EgressMessages_msg, 4, in.has_requestID ? std::optional{in.requestID} : std::nullopt); - }); -} - void MessengerThread::event_dataArrived() { try { + ULOG_INFO("Data arrived"); buffer buf; k_msgq_get(&messenger_buffer_arrived_queue, &buf, K_NO_WAIT); const std::size_t decoded_len = cobs::decode(buf.data, buf.data); pb_istream_t stream = pb_istream_from_buffer(buf.data.data(), decoded_len); - std::span submessage; - uint32_t id{}, crc{}; + std::span submessage{}; + WireFormatID id{}; + uint32_t crc{}; const auto decode_id = [&]() { try { - id = pb::decode_fixed32(stream); + id.fromWire(pb::decode_fixed32(stream)); } catch (const pb::error &e) { throw messenger::no_id{}; } - if (idIsInvalid(id)) { - throw messenger::bad_id{}; - } }; const auto decode_crc = [&]() { @@ -305,7 +248,6 @@ void MessengerThread::event_dataArrived() { auto submessageBegin = (pb_byte_t *)stream.state; pb::skip_bytes(stream, submessageSize); // skip data submessage = {submessageBegin, submessageSize}; - } catch (const pb::error &e) { throw messenger::no_data{}; } @@ -331,24 +273,28 @@ void MessengerThread::event_dataArrived() { decode(); decode(); + if (not id.isValid()) { + throw messenger::bad_id{}; + } if (crcIsInvalid(submessage, crc)) { throw messenger::bad_crc{}; } - stream = pb::istream_from_span(submessage); - switch (id) { - case 0x01: { /// TODO temperature endpoint ID - handle_temperatureIngressMsg(stream, buf.device); + /// we now got a proper frame, starting to decode a frame + stream = pb::istream_from_span(submessage); // throws on error, no need to check any status + switch (id.receiver) { // check which endpoint is the receiver + case 0x01: { /// TODO temperature endpoint ID + handle_temperatureIngressMsg(id, stream, buf.device); break; } case 0x02: /// TODO configuration endpoint ID { - handle_configIngressMsg(stream, buf.device); + handle_configIngressMsg(id, stream, buf.device); break; } case 0x03: /// TODO phase controll endpoint ID { - handle_ctrlIngressMsg(stream, buf.device); + handle_ctrlIngressMsg(id, stream, buf.device); break; } case 0x04: { @@ -378,56 +324,81 @@ void MessengerThread::event_dataArrived() { } } -void MessengerThread::handle_temperatureIngressMsg(pb_istream_t &stream, AsyncUART *cb) { - /// TODO assert acllback - /// register req id +void MessengerThread::event_temperatureEgress() { + temperatureEgressQueue.try_consume([&](temperature_EgressMessages &in) { + egressPush(&in, temperature_EgressMessages_msg, 1, in.has_requestID ? std::optional{in.requestID} : std::nullopt); + }); +} +void MessengerThread::event_ctrlEgress() { + ctrlEgressQueue.try_consume([&](ctrl_EgressMessages &in) { + egressPush(&in, ctrl_EgressMessages_msg, 2, in.has_requestID ? std::optional{in.requestID} : std::nullopt); + }); +} + +void MessengerThread::event_configEgress() { + configEgressQueue.try_consume([&](config_EgressMessages &in) { + egressPush(&in, config_EgressMessages_msg, 3, in.has_requestID ? std::optional{in.requestID} : std::nullopt); + }); +} + +void MessengerThread::event_logEgress() { + logEgressFifoQueueBuffer.try_consume([&](log_EgressMessages &in) { + egressPush(&in, log_EgressMessages_msg, 4, in.has_requestID ? std::optional{in.requestID} : std::nullopt); + }); +} + +void MessengerThread::handle_temperatureIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) { temperatureIngressQueue.try_produce([&](temperature_IngressMessages &request) { request = temperature_IngressMessages_init_zero; decode(stream, temperature_IngressMessages_msg, &request); - putActiveRequest(1, request.requestID, cb); + putActiveRequest(id, request.requestID, cb); return true; }); } -void MessengerThread::handle_ctrlIngressMsg(pb_istream_t &stream, AsyncUART *cb) { - // register req id - ctrlIngressQueue.try_consume([&](ctrl_IngressMessages &request) { +void MessengerThread::handle_ctrlIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) { + ctrlIngressQueue.try_produce([&](ctrl_IngressMessages &request) { request = ctrl_IngressMessages_init_zero; decode(stream, ctrl_IngressMessages_msg, &request); - putActiveRequest(2, request.requestID, cb); + putActiveRequest(id, request.requestID, cb); return true; }); } -void MessengerThread::handle_configIngressMsg(pb_istream_t &stream, AsyncUART *cb) { - // register req id - configIngressQueue.try_consume([&](config_IngressMessages &request) { +void MessengerThread::handle_configIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) { + configIngressQueue.try_produce([&](config_IngressMessages &request) { request = config_IngressMessages_init_zero; decode(stream, config_IngressMessages_msg, &request); - putActiveRequest(3, request.requestID, cb); + putActiveRequest(id, request.requestID, cb); return true; }); } -void MessengerThread::egressPush(Message &message, int id, std::optional requestId) { - // set IF od sender - message.id = id; /// TODO make response ID +void MessengerThread::egressPush(Message &message, int wireId_receiver, std::optional requestId) { + // set as broadcast message by default, this way no message will be left without a ID + + message.id = WireFormatID{}.makeBroadcastFrom(wireId_receiver).toWire(); message.crc = zephyr::crc::crc32_ieee({message.data.bytes, message.data.size}); - // for requests, we have to have a callback function to return from where we received message if (requestId.has_value()) { - auto cb = takeActiveRequest(message.id, requestId.value()); - if (cb == nullptr) { - cb = defaultUart(); + auto requestData = takeActiveRequest(message.id, requestId.value()); + if (requestData.has_value()) { + if (requestData->cb == nullptr) { + requestData->cb = defaultUart(); + } + + requestData->wireId.makeResponse(); // switch sender/receiver, set type to response + message.id = requestData->wireId.toWire(); + transmit(message, requestData->cb); } - transmit(message, cb); + // if no request was sent before, just drop the message } else { transmit(message, defaultUart()); } } -void MessengerThread::egressPussh(void *submessage, const pb_msgdesc_t &fields, int id, std::optional requestId) { +void MessengerThread::egressPush(void *submessage, const pb_msgdesc_t &fields, int wireId_receiver, std::optional requestId) { // encode embedded message directly to the buffer of output message to save stack Message message = Message_init_zero; @@ -435,7 +406,7 @@ void MessengerThread::egressPussh(void *submessage, const pb_msgdesc_t &fields, pb::encode(ostream, fields, submessage); message.data.size = ostream.bytes_written; - egressPush(message, id, requestId); + egressPush(message, wireId_receiver, requestId); } void MessengerThread::decode(pb_istream_t &stream, const pb_msgdesc_t &fields, void *dest) const { @@ -462,26 +433,36 @@ bool MessengerThread::transmit(Message &msg, AsyncUART *cb) { return true; } -void MessengerThread::putActiveRequest(int type, uint32_t id, AsyncUART *cb) { +void MessengerThread::putActiveRequest(WireFormatID wireId, uint32_t id, AsyncUART *cb) { + if (wireId.type != Type::Request) // We only need to track requests + { + return; + } + for (auto &req : _activeRequests) { - if (req.type == -1) { // Find an empty slot - req.type = type; + if (req.wireId.reserved != 0) { // Find an empty slot + req.wireId = wireId; req.requestId = id; req.cb = cb; return; } } + throw messenger::request_queue_full{}; } -AsyncUART *MessengerThread::takeActiveRequest(int type, uint32_t id) { +std::optional MessengerThread::takeActiveRequest(int wireId_receiver, uint32_t id) { + std::optional ret{}; + // find first of active type/requestId pair for (auto &req : _activeRequests) { - if (req.type == type && req.requestId == id) { - req.type = -1; // Mark as available - return req.cb; + if (req.wireId.receiver == wireId_receiver && req.requestId == id) { + ret = req; // copy to return variable + req.wireId.reserved = -1; // mark as empty slot + break; } } - return {}; + + return ret; } } // namespace rims diff --git a/rims_app/src/messenger.hpp b/rims_app/src/messenger.hpp index 31fe0e938be..538191a048e 100644 --- a/rims_app/src/messenger.hpp +++ b/rims_app/src/messenger.hpp @@ -4,6 +4,8 @@ #include "uart.hpp" #include "zephyr/kernel.h" +#include "id.hpp" + #include "proto/message.pb.h" #include @@ -30,9 +32,9 @@ class MessengerThread : public ZephyrThread { private: struct requestData { - int type; - uint32_t requestId; - AsyncUART *cb; + WireFormatID wireId; // original ID of IngressMessage + uint32_t requestId; // requestID from message + AsyncUART *cb; }; void event_dataArrived(); @@ -41,18 +43,18 @@ class MessengerThread : public ZephyrThread { void event_configEgress(); void event_logEgress(); - void handle_temperatureIngressMsg(pb_istream_t &stream, AsyncUART *cb); - void handle_ctrlIngressMsg(pb_istream_t &stream, AsyncUART *cb); - void handle_configIngressMsg(pb_istream_t &stream, AsyncUART *cb); + void handle_temperatureIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb); + void handle_ctrlIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb); + void handle_configIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb); void decode(pb_istream_t &stream, const pb_msgdesc_t &fields, void *dest) const; bool transmit(Message &msg, AsyncUART *cb); void egressPush(Message &message, int id, std::optional requestId); - void egressPussh(void *submessage, const pb_msgdesc_t &fields, int id, std::optional requestId); - - void putActiveRequest(int type, uint32_t id, AsyncUART *cb); - AsyncUART *takeActiveRequest(int type, uint32_t id); + void egressPush(void *submessage, const pb_msgdesc_t &fields, int id, std::optional requestId); + + void putActiveRequest(WireFormatID wireId, uint32_t id, AsyncUART *cb); + std::optional takeActiveRequest(int type, uint32_t id); std::array _events; std::array _activeRequests; diff --git a/rims_app/src/temperature_measurements.hpp b/rims_app/src/temperature_measurements.hpp index b6b8ccd43c6..11b47eb404c 100644 --- a/rims_app/src/temperature_measurements.hpp +++ b/rims_app/src/temperature_measurements.hpp @@ -80,7 +80,7 @@ class TemperatureSampler { RecurringSemaphoreTimer _samplerTimer{_samplerSem, std::chrono::milliseconds{75}}; zephyr::semaphore::sem _broadcastSem{0, 1}; - RecurringSemaphoreTimer _broadcastTimer{_broadcastSem, std::chrono::seconds{10}}; + RecurringSemaphoreTimer _broadcastTimer{_broadcastSem, std::chrono::seconds{1000}}; ring_buffer _samples; std::uint8_t _samplesNumber{MaxSampleSize}; diff --git a/rims_app/src/uart.cpp b/rims_app/src/uart.cpp index ef7e4f3f962..eec65222766 100644 --- a/rims_app/src/uart.cpp +++ b/rims_app/src/uart.cpp @@ -135,7 +135,6 @@ void AsyncUART::uartCallback(const device *dev, void *user_data) { } void AsyncUART::uartISR() { - __ASSERT(device_is_ready(_dev), "device needs to work"); try { while (uart_irq_update(_dev) && uart_irq_is_pending(_dev)) { if (uart_irq_rx_ready(_dev)) { @@ -189,6 +188,7 @@ uint8_t AsyncUART::rxByte() { } void AsyncUART::processMessage() { + int a; buffer buf{.data = {rxBuffer().data(), rxBuffer().size()}, .device = this}; switchRxBuffer(); k_msgq_put(&messenger_buffer_arrived_queue, &buf, K_MSEC(10)); diff --git a/rims_app/src/uart.hpp b/rims_app/src/uart.hpp index d70f7436cb5..c24a333a8b0 100644 --- a/rims_app/src/uart.hpp +++ b/rims_app/src/uart.hpp @@ -85,7 +85,7 @@ class AsyncUART { class UARTThread : public ZephyrThread { public: // set high priority, to not die when sending logs, but low enough that control is working without interrupt - UARTThread(TStackBase &stack) : ZephyrThread(stack, 4, 0){}; + UARTThread(TStackBase &stack) : ZephyrThread(stack, 4, 0, "UART"){}; // ZephyrThread interface protected: