add wireid type

This commit is contained in:
Bartosz Wieczorek 2025-04-24 16:31:47 +02:00
parent 0e67e17712
commit f50d9c7b48
7 changed files with 235 additions and 134 deletions

125
rims_app/src/id.hpp Normal file
View File

@ -0,0 +1,125 @@
#pragma once
#include <cstdint>
namespace rims {
/**
* @brief The ID class
* ID is max 29 bytes.
*/
enum Priority {
High = 0x7, //
Mid = 0x3,
Low = 0
};
enum Type {
Broadcast = 0, //
Request = 1,
Response = 2
};
struct WireFormatID {
int32_t reserved : 6 {0}; // bits 5..0
uint32_t sender : 9 {0}; // bits 14..6
uint32_t receiver : 9 {0}; // bits 23..15
uint32_t type : 2 {0}; // bits 25..24
uint32_t priority : 3 {0}; // bits 28..26
uint32_t checksum : 3 {0}; // bits 31..29 (MSBs)
constexpr uint32_t toWire() const {
uint32_t raw = 0;
// raw |= (reserved & 0x3F);
raw |= (sender & 0x1FF) << 6;
raw |= (receiver & 0x1FF) << 15;
raw |= (type & 0x03) << 24;
raw |= (priority & 0x07) << 26;
// Now compute and insert checksum
uint8_t cs = compute_checksum3(raw);
raw |= (cs & 0x07) << 29;
return raw;
}
// swap sender and receiver
constexpr WireFormatID &swapEdpoints() {
auto tmp = sender;
sender = receiver;
receiver = tmp;
return *this;
}
constexpr WireFormatID &makeResponse() {
type = static_cast<uint32_t>(Type::Response);
swapEdpoints();
return *this;
}
constexpr WireFormatID &makeBroadcastFrom(uint32_t sender) {
setSender(sender);
type = Type::Broadcast;
setPriority(Priority::Low);
return *this;
}
constexpr WireFormatID &setSender(uint16_t id) {
sender = id;
return *this;
}
constexpr WireFormatID &setReceiver(uint16_t id) {
receiver = id;
return *this;
}
constexpr WireFormatID &setPriority(Priority prior) {
priority = static_cast<uint32_t>(prior);
return *this;
}
constexpr WireFormatID &setType(Type t) {
type = static_cast<uint32_t>(t);
return *this;
}
constexpr bool isValid() const {
uint32_t raw = 0;
// raw |= (reserved & 0x3F);
raw |= (sender & 0x1FF) << 6;
raw |= (receiver & 0x1FF) << 15;
raw |= (type & 0x03) << 24;
raw |= (priority & 0x07) << 26;
// Now compute and insert checksum
uint8_t expected = compute_checksum3(raw);
return expected == checksum;
}
constexpr WireFormatID &fromWire(uint32_t packed) {
// reserved = packed & 0x3F; // bits 0..5
sender = (packed >> 6) & 0x1FF; // bits 6..14
receiver = (packed >> 15) & 0x1FF; // bits 15..23
type = (packed >> 24) & 0x03; // bits 24..25
priority = (packed >> 26) & 0x07; // bits 26..28
checksum = (packed >> 29) & 0x07; // bits 29..31 (MSBs)
return *this;
}
private:
constexpr static uint8_t compute_checksum3(uint32_t raw_id) {
// Mask out top 3 bits — ensure we only use bits 0..28
raw_id &= 0x1FFFFFFF;
// Simple XOR folding to reduce to 3 bits
uint8_t checksum = 0;
// Fold 29 bits into 3 using 3-bit chunks
for (int i = 0; i < 29; i += 3) {
checksum ^= (raw_id >> i) & 0x07; // XOR each 3-bit group
}
return checksum & 0x07; // final 3-bit result
}
};
static_assert(sizeof(uint32_t) == sizeof(WireFormatID));
} // namespace rims

View File

@ -72,19 +72,17 @@ gpio_dt_spec ch_1_zcd = GPIO_DT_SPEC_GET(DT_NODELABEL(ch1_zcd), gpios);
gpio_dt_spec ch_2_zcd = GPIO_DT_SPEC_GET(DT_NODELABEL(ch2_zcd), gpios); // pompka
void mythread_analyzer_cb(struct thread_analyzer_info *info) {
ULOG_DEBUG(
"thread name, memfree, cpu : %s, %d, %d",
info->name,
info->stack_size - info->stack_used,
info->utilization // cpu usage in %
);
// ULOG_DEBUG(
// "thread name, memfree, cpu : %s, %d, %d",
// info->name,
// info->stack_size - info->stack_used,
// info->utilization // cpu usage in %
// );
}
int main() {
using namespace rims;
const auto wait = []() { std::this_thread::sleep_for(std::chrono::milliseconds{100}); };
messengerTread.init(messengerStack);
uartThread.init(uartStack);
temperatureSampler.init(temperatureSamplerStack);
@ -93,23 +91,18 @@ int main() {
messengerTread->init_hw();
messengerTread->start();
wait();
uartThread->init_hw();
uartThread->start();
wait();
temperatureSampler->init_hw();
temperatureSampler->start();
wait();
zcd->init_hw();
zcd->start();
wait();
phaseModulation->init_hw();
phaseModulation->start();
wait();
zephyr::gpio::pin_configure(status, GPIO_OUTPUT_INACTIVE);
zephyr::gpio::pin_configure(gprelay_1_en, GPIO_OUTPUT_INACTIVE);

View File

@ -17,6 +17,8 @@
#include "proto/message.pb.h"
#include "proto/temperature.pb.h"
#include "id.hpp"
#include <array>
#include <cstring>
#include <exception>
@ -33,48 +35,13 @@
namespace rims {
K_MSGQ_DEFINE(messenger_buffer_arrived_queue, sizeof(rims::buffer), 2, 1);
namespace {
/*
def encode_id(id_value: int) -> int:
"""Encodes a 29-bit ID by adding a 3-bit checksum at the end."""
data = id_value & 0x1FFFFFFF # Mask to ensure only 29 bits
checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7 # Reduce to 3 bits
return (data << 3) | checksum # Store checksum in the last 3 bits
def verify_id(received_id: int) -> bool:
"""Verifies if the received ID has a valid checksum."""
data = received_id >> 3 # Extract the first 29 bits
received_checksum = received_id & 0x7 # Extract the last 3 bits (checksum)
# Recalculate checksum from the extracted 29-bit data
calculated_checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7
return received_checksum == calculated_checksum # Return True if checksum matches
*/
static constexpr bool idIsInvalid(uint32_t receivedID) {
// uint32_t data = receivedID >> 3; // Extract the first 29 bits
// uint32_t receivedChecksum = receivedID & 0x7; // Extract the last 3 bits (checksum)
// // Recalculate checksum from the extracted 29-bit data
// uint32_t calculatedChecksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7;
// return receivedChecksum == calculatedChecksum; // Return true if checksum matches
return false;
}
static bool crcIsInvalid(std::span<uint8_t> data, std::uint32_t crc) {
bool crcIsInvalid(std::span<uint8_t> data, std::uint32_t crc) {
return zephyr::crc::crc32_ieee(data) != crc;
}
// static uint32_t encodeID(uint32_t id) {
// /// TODO error handling
// uint32_t data = id & 0x1FFFFFFF; // Mask to get the first 29 bits
// uint32_t checksum = (data ^ (data >> 3) ^ (data >> 6) ^ (data >> 9) ^ (data >> 12)) & 0x7; // Reduce to 3 bits
// return (data << 3) | checksum; // Store checksum in the last 3 bits
// }
} // namespace
namespace cobs {
class error : public std::exception {};
@ -217,9 +184,10 @@ MessengerThread::MessengerThread(TStackBase &stack) : ZephyrThread{stack, 9, 0,
zephyr::event_pool::k_init(_events.at(0), messenger_buffer_arrived_queue);
for (auto &ar : _activeRequests) {
ar.type = -1;
ar.cb = nullptr;
ar.requestId = 0;
ar.wireId = {};
ar.wireId.reserved = -1; // maark unused slot
ar.cb = {};
ar.requestId = {};
}
}
@ -245,50 +213,25 @@ void MessengerThread::threadMain() {
}
}
void rims::MessengerThread::event_temperatureEgress() {
temperatureEgressQueue.try_consume([&](temperature_EgressMessages &in) {
egressPussh(&in, temperature_EgressMessages_msg, 1, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void rims::MessengerThread::event_ctrlEgress() {
ctrlEgressQueue.try_consume([&](ctrl_EgressMessages &in) {
egressPussh(&in, ctrl_EgressMessages_msg, 2, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void rims::MessengerThread::event_configEgress() {
configEgressQueue.try_consume([&](config_EgressMessages &in) {
egressPussh(&in, config_EgressMessages_msg, 3, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void rims::MessengerThread::event_logEgress() {
logEgressFifoQueueBuffer.try_consume([&](log_EgressMessages &in) {
egressPussh(&in, log_EgressMessages_msg, 4, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void MessengerThread::event_dataArrived() {
try {
ULOG_INFO("Data arrived");
buffer buf;
k_msgq_get(&messenger_buffer_arrived_queue, &buf, K_NO_WAIT);
const std::size_t decoded_len = cobs::decode(buf.data, buf.data);
pb_istream_t stream = pb_istream_from_buffer(buf.data.data(), decoded_len);
std::span<uint8_t> submessage;
uint32_t id{}, crc{};
std::span<uint8_t> submessage{};
WireFormatID id{};
uint32_t crc{};
const auto decode_id = [&]() {
try {
id = pb::decode_fixed32(stream);
id.fromWire(pb::decode_fixed32(stream));
} catch (const pb::error &e) {
throw messenger::no_id{};
}
if (idIsInvalid(id)) {
throw messenger::bad_id{};
}
};
const auto decode_crc = [&]() {
@ -305,7 +248,6 @@ void MessengerThread::event_dataArrived() {
auto submessageBegin = (pb_byte_t *)stream.state;
pb::skip_bytes(stream, submessageSize); // skip data
submessage = {submessageBegin, submessageSize};
} catch (const pb::error &e) {
throw messenger::no_data{};
}
@ -331,24 +273,28 @@ void MessengerThread::event_dataArrived() {
decode();
decode();
if (not id.isValid()) {
throw messenger::bad_id{};
}
if (crcIsInvalid(submessage, crc)) {
throw messenger::bad_crc{};
}
stream = pb::istream_from_span(submessage);
switch (id) {
case 0x01: { /// TODO temperature endpoint ID
handle_temperatureIngressMsg(stream, buf.device);
/// we now got a proper frame, starting to decode a frame
stream = pb::istream_from_span(submessage); // throws on error, no need to check any status
switch (id.receiver) { // check which endpoint is the receiver
case 0x01: { /// TODO temperature endpoint ID
handle_temperatureIngressMsg(id, stream, buf.device);
break;
}
case 0x02: /// TODO configuration endpoint ID
{
handle_configIngressMsg(stream, buf.device);
handle_configIngressMsg(id, stream, buf.device);
break;
}
case 0x03: /// TODO phase controll endpoint ID
{
handle_ctrlIngressMsg(stream, buf.device);
handle_ctrlIngressMsg(id, stream, buf.device);
break;
}
case 0x04: {
@ -378,56 +324,81 @@ void MessengerThread::event_dataArrived() {
}
}
void MessengerThread::handle_temperatureIngressMsg(pb_istream_t &stream, AsyncUART *cb) {
/// TODO assert acllback
/// register req id
void MessengerThread::event_temperatureEgress() {
temperatureEgressQueue.try_consume([&](temperature_EgressMessages &in) {
egressPush(&in, temperature_EgressMessages_msg, 1, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void MessengerThread::event_ctrlEgress() {
ctrlEgressQueue.try_consume([&](ctrl_EgressMessages &in) {
egressPush(&in, ctrl_EgressMessages_msg, 2, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void MessengerThread::event_configEgress() {
configEgressQueue.try_consume([&](config_EgressMessages &in) {
egressPush(&in, config_EgressMessages_msg, 3, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void MessengerThread::event_logEgress() {
logEgressFifoQueueBuffer.try_consume([&](log_EgressMessages &in) {
egressPush(&in, log_EgressMessages_msg, 4, in.has_requestID ? std::optional{in.requestID} : std::nullopt);
});
}
void MessengerThread::handle_temperatureIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) {
temperatureIngressQueue.try_produce([&](temperature_IngressMessages &request) {
request = temperature_IngressMessages_init_zero;
decode(stream, temperature_IngressMessages_msg, &request);
putActiveRequest(1, request.requestID, cb);
putActiveRequest(id, request.requestID, cb);
return true;
});
}
void MessengerThread::handle_ctrlIngressMsg(pb_istream_t &stream, AsyncUART *cb) {
// register req id
ctrlIngressQueue.try_consume([&](ctrl_IngressMessages &request) {
void MessengerThread::handle_ctrlIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) {
ctrlIngressQueue.try_produce([&](ctrl_IngressMessages &request) {
request = ctrl_IngressMessages_init_zero;
decode(stream, ctrl_IngressMessages_msg, &request);
putActiveRequest(2, request.requestID, cb);
putActiveRequest(id, request.requestID, cb);
return true;
});
}
void MessengerThread::handle_configIngressMsg(pb_istream_t &stream, AsyncUART *cb) {
// register req id
configIngressQueue.try_consume([&](config_IngressMessages &request) {
void MessengerThread::handle_configIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb) {
configIngressQueue.try_produce([&](config_IngressMessages &request) {
request = config_IngressMessages_init_zero;
decode(stream, config_IngressMessages_msg, &request);
putActiveRequest(3, request.requestID, cb);
putActiveRequest(id, request.requestID, cb);
return true;
});
}
void MessengerThread::egressPush(Message &message, int id, std::optional<uint32_t> requestId) {
// set IF od sender
message.id = id; /// TODO make response ID
void MessengerThread::egressPush(Message &message, int wireId_receiver, std::optional<uint32_t> requestId) {
// set as broadcast message by default, this way no message will be left without a ID
message.id = WireFormatID{}.makeBroadcastFrom(wireId_receiver).toWire();
message.crc = zephyr::crc::crc32_ieee({message.data.bytes, message.data.size});
// for requests, we have to have a callback function to return from where we received message
if (requestId.has_value()) {
auto cb = takeActiveRequest(message.id, requestId.value());
if (cb == nullptr) {
cb = defaultUart();
auto requestData = takeActiveRequest(message.id, requestId.value());
if (requestData.has_value()) {
if (requestData->cb == nullptr) {
requestData->cb = defaultUart();
}
requestData->wireId.makeResponse(); // switch sender/receiver, set type to response
message.id = requestData->wireId.toWire();
transmit(message, requestData->cb);
}
transmit(message, cb);
// if no request was sent before, just drop the message
} else {
transmit(message, defaultUart());
}
}
void MessengerThread::egressPussh(void *submessage, const pb_msgdesc_t &fields, int id, std::optional<uint32_t> requestId) {
void MessengerThread::egressPush(void *submessage, const pb_msgdesc_t &fields, int wireId_receiver, std::optional<uint32_t> requestId) {
// encode embedded message directly to the buffer of output message to save stack
Message message = Message_init_zero;
@ -435,7 +406,7 @@ void MessengerThread::egressPussh(void *submessage, const pb_msgdesc_t &fields,
pb::encode(ostream, fields, submessage);
message.data.size = ostream.bytes_written;
egressPush(message, id, requestId);
egressPush(message, wireId_receiver, requestId);
}
void MessengerThread::decode(pb_istream_t &stream, const pb_msgdesc_t &fields, void *dest) const {
@ -462,26 +433,36 @@ bool MessengerThread::transmit(Message &msg, AsyncUART *cb) {
return true;
}
void MessengerThread::putActiveRequest(int type, uint32_t id, AsyncUART *cb) {
void MessengerThread::putActiveRequest(WireFormatID wireId, uint32_t id, AsyncUART *cb) {
if (wireId.type != Type::Request) // We only need to track requests
{
return;
}
for (auto &req : _activeRequests) {
if (req.type == -1) { // Find an empty slot
req.type = type;
if (req.wireId.reserved != 0) { // Find an empty slot
req.wireId = wireId;
req.requestId = id;
req.cb = cb;
return;
}
}
throw messenger::request_queue_full{};
}
AsyncUART *MessengerThread::takeActiveRequest(int type, uint32_t id) {
std::optional<MessengerThread::requestData> MessengerThread::takeActiveRequest(int wireId_receiver, uint32_t id) {
std::optional<MessengerThread::requestData> ret{};
// find first of active type/requestId pair
for (auto &req : _activeRequests) {
if (req.type == type && req.requestId == id) {
req.type = -1; // Mark as available
return req.cb;
if (req.wireId.receiver == wireId_receiver && req.requestId == id) {
ret = req; // copy to return variable
req.wireId.reserved = -1; // mark as empty slot
break;
}
}
return {};
return ret;
}
} // namespace rims

View File

@ -4,6 +4,8 @@
#include "uart.hpp"
#include "zephyr/kernel.h"
#include "id.hpp"
#include "proto/message.pb.h"
#include <cstdint>
@ -30,9 +32,9 @@ class MessengerThread : public ZephyrThread {
private:
struct requestData {
int type;
uint32_t requestId;
AsyncUART *cb;
WireFormatID wireId; // original ID of IngressMessage
uint32_t requestId; // requestID from message
AsyncUART *cb;
};
void event_dataArrived();
@ -41,18 +43,18 @@ class MessengerThread : public ZephyrThread {
void event_configEgress();
void event_logEgress();
void handle_temperatureIngressMsg(pb_istream_t &stream, AsyncUART *cb);
void handle_ctrlIngressMsg(pb_istream_t &stream, AsyncUART *cb);
void handle_configIngressMsg(pb_istream_t &stream, AsyncUART *cb);
void handle_temperatureIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb);
void handle_ctrlIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb);
void handle_configIngressMsg(WireFormatID id, pb_istream_t &stream, AsyncUART *cb);
void decode(pb_istream_t &stream, const pb_msgdesc_t &fields, void *dest) const;
bool transmit(Message &msg, AsyncUART *cb);
void egressPush(Message &message, int id, std::optional<uint32_t> requestId);
void egressPussh(void *submessage, const pb_msgdesc_t &fields, int id, std::optional<uint32_t> requestId);
void putActiveRequest(int type, uint32_t id, AsyncUART *cb);
AsyncUART *takeActiveRequest(int type, uint32_t id);
void egressPush(void *submessage, const pb_msgdesc_t &fields, int id, std::optional<uint32_t> requestId);
void putActiveRequest(WireFormatID wireId, uint32_t id, AsyncUART *cb);
std::optional<requestData> takeActiveRequest(int type, uint32_t id);
std::array<k_poll_event, 5> _events;
std::array<requestData, 8> _activeRequests;

View File

@ -80,7 +80,7 @@ class TemperatureSampler {
RecurringSemaphoreTimer _samplerTimer{_samplerSem, std::chrono::milliseconds{75}};
zephyr::semaphore::sem _broadcastSem{0, 1};
RecurringSemaphoreTimer _broadcastTimer{_broadcastSem, std::chrono::seconds{10}};
RecurringSemaphoreTimer _broadcastTimer{_broadcastSem, std::chrono::seconds{1000}};
ring_buffer<adc_sample, MaxSampleSize> _samples;
std::uint8_t _samplesNumber{MaxSampleSize};

View File

@ -135,7 +135,6 @@ void AsyncUART::uartCallback(const device *dev, void *user_data) {
}
void AsyncUART::uartISR() {
__ASSERT(device_is_ready(_dev), "device needs to work");
try {
while (uart_irq_update(_dev) && uart_irq_is_pending(_dev)) {
if (uart_irq_rx_ready(_dev)) {
@ -189,6 +188,7 @@ uint8_t AsyncUART::rxByte() {
}
void AsyncUART::processMessage() {
int a;
buffer buf{.data = {rxBuffer().data(), rxBuffer().size()}, .device = this};
switchRxBuffer();
k_msgq_put(&messenger_buffer_arrived_queue, &buf, K_MSEC(10));

View File

@ -85,7 +85,7 @@ class AsyncUART {
class UARTThread : public ZephyrThread {
public:
// set high priority, to not die when sending logs, but low enough that control is working without interrupt
UARTThread(TStackBase &stack) : ZephyrThread(stack, 4, 0){};
UARTThread(TStackBase &stack) : ZephyrThread(stack, 4, 0, "UART"){};
// ZephyrThread interface
protected: