diff --git a/include/zephyr/zbus/zbus.h b/include/zephyr/zbus/zbus.h index 466a2ad9dcf..8a4188aa821 100644 --- a/include/zephyr/zbus/zbus.h +++ b/include/zephyr/zbus/zbus.h @@ -92,7 +92,8 @@ struct zbus_channel { */ enum __packed zbus_observer_type { ZBUS_OBSERVER_LISTENER_TYPE, - ZBUS_OBSERVER_SUBSCRIBER_TYPE + ZBUS_OBSERVER_SUBSCRIBER_TYPE, + ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, }; /** @@ -127,6 +128,13 @@ struct zbus_observer { /** Observer callback function. It turns the observer into a listener. */ void (*const callback)(const struct zbus_channel *chan); + +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__) + /** Observer message FIFO. It turns the observer into a message subscriber. It only + * exists if the @kconfig{CONFIG_ZBUS_MSG_SUBSCRIBER} is enabled. + */ + struct k_fifo *const message_fifo; +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ }; }; @@ -156,8 +164,10 @@ struct zbus_channel_observation { #if defined(CONFIG_ZBUS_CHANNEL_NAME) #define ZBUS_CHANNEL_NAME_INIT(_name) .name = #_name, +#define _ZBUS_CHAN_NAME(_chan) (_chan)->name #else #define ZBUS_CHANNEL_NAME_INIT(_name) +#define _ZBUS_CHAN_NAME(_chan) "" #endif #if defined(CONFIG_ZBUS_OBSERVER_NAME) @@ -379,6 +389,37 @@ k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks); */ #define ZBUS_LISTENER_DEFINE(_name, _cb) ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, true) +/** + * @brief Define and initialize a message subscriber. + * + * This macro defines an observer of @ref ZBUS_OBSERVER_SUBSCRIBER_TYPE type. It defines a FIFO + * where the subscriber will receive the message asynchronously and initialize the @ref + * zbus_observer defining the subscriber. + * + * @param[in] _name The subscriber's name. + * @param[in] _enable The subscriber's initial state. + */ +#define ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable) \ + static K_FIFO_DEFINE(_zbus_observer_fifo_##_name); \ + STRUCT_SECTION_ITERABLE(zbus_observer, _name) = { \ + ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \ + .type = ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, \ + .enabled = _enable, \ + .message_fifo = &_zbus_observer_fifo_##_name, \ + } + +/** + * @brief Define and initialize an enabled message subscriber. + * + * This macro defines an observer of message subscriber type. It defines a FIFO where the + * subscriber will receive the message asynchronously and initialize the @ref + * zbus_observer defining the subscriber. The message subscribers are defined in the enabled state + * with this macro. + + * + * @param[in] _name The subscriber's name. + */ +#define ZBUS_MSG_SUBSCRIBER_DEFINE(_name) ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, true) /** * * @brief Publish to a channel @@ -741,6 +782,31 @@ static inline const char *zbus_obs_name(const struct zbus_observer *obs) int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout); +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) || defined(__DOXYGEN__) + +/** + * @brief Wait for a channel message. + * + * This routine makes the subscriber wait for the new message in case of channel publication. + * + * @param[in] sub The subscriber's reference. + * @param[out] chan The notification channel's reference. + * @param[out] msg A reference to a copy of the published message. + * @param[in] timeout Waiting period for a notification arrival, + * or one of the special values, K_NO_WAIT and K_FOREVER. + * + * @retval 0 Message received. + * @retval -EINVAL The observer is not a subscriber. + * @retval -ENOMSG Could not retrieve the net_buf from the subscriber FIFO. + * @retval -EILSEQ Received an invalid channel reference. + * @retval -EFAULT A parameter is incorrect, or the function context is invalid (inside an ISR). The + * function only returns this value when the @kconfig{CONFIG_ZBUS_ASSERT_MOCK} is enabled. + */ +int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, + k_timeout_t timeout); + +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ + /** * * @brief Iterate over channels. diff --git a/subsys/zbus/Kconfig b/subsys/zbus/Kconfig index 622f89641a2..f250865b466 100644 --- a/subsys/zbus/Kconfig +++ b/subsys/zbus/Kconfig @@ -18,6 +18,36 @@ config ZBUS_CHANNEL_NAME config ZBUS_OBSERVER_NAME bool "Observer name field" +config ZBUS_MSG_SUBSCRIBER + select NET_BUF + bool "Message subscribers will receive all messages in sequence." + +if ZBUS_MSG_SUBSCRIBER + +choice + prompt "ZBus msg_subscribers buffer allocation" + +config ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC + bool "Use heap to allocate msg_subscriber buffers data" + +config ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC + bool "Use fixed data size for msg_subscriber buffers pool" + +endchoice + +config ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE + default 16 + int "The count of net_buf available to be used simutaneously." + +if ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC + +config ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE + int "The size of the biggest message used with ZBus." + +endif # ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC + +endif # ZBUS_MSG_SUBSCRIBER + config ZBUS_RUNTIME_OBSERVERS bool "Runtime observers support." default n diff --git a/subsys/zbus/zbus.c b/subsys/zbus/zbus.c index abaf92c8641..4ea2d986a3f 100644 --- a/subsys/zbus/zbus.c +++ b/subsys/zbus/zbus.c @@ -8,11 +8,47 @@ #include #include #include +#include #include LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL); +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) + +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC) + +NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE, + sizeof(struct zbus_channel *), NULL); +BUILD_ASSERT(CONFIG_HEAP_MEM_POOL_SIZE > 0, "MSG_SUBSCRIBER feature requires heap memory pool."); + +static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, + k_timeout_t timeout) +{ + return net_buf_alloc_len(&_zbus_msg_subscribers_pool, size, timeout); +} + +#else + +NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool, + (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE), + (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE), + sizeof(struct zbus_channel *), NULL); + +static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size, + k_timeout_t timeout) +{ + __ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE, + "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to " + "%d", + (int)size); + return net_buf_alloc(&_zbus_msg_subscribers_pool, timeout); +} +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_DYNAMIC */ + +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ + int _zbus_init(void) { + const struct zbus_channel *curr = NULL; const struct zbus_channel *prev = NULL; @@ -44,32 +80,62 @@ int _zbus_init(void) SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY); static inline int _zbus_notify_observer(const struct zbus_channel *chan, - const struct zbus_observer *obs, k_timepoint_t end_time) + const struct zbus_observer *obs, k_timepoint_t end_time, + struct net_buf *buf) { - int err = 0; - - if (obs->type == ZBUS_OBSERVER_LISTENER_TYPE) { + switch (obs->type) { + case ZBUS_OBSERVER_LISTENER_TYPE: { obs->callback(chan); - - } else if (obs->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE) { - err = k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); - } else { - CODE_UNREACHABLE; + break; } - return err; + case ZBUS_OBSERVER_SUBSCRIBER_TYPE: { + return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); + } +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) + case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: { + struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time)); + + if (cloned_buf == NULL) { + return -ENOMEM; + } + memcpy(net_buf_user_data(cloned_buf), &chan, sizeof(struct zbus_channel *)); + + net_buf_put(obs->message_fifo, cloned_buf); + + break; + } +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ + + default: + _ZBUS_ASSERT(false, "Unreachable"); + } + return 0; } static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time) { int err = 0; int last_error = 0; - - _ZBUS_ASSERT(chan != NULL, "chan is required"); + struct net_buf *buf = NULL; /* Static observer event dispatcher logic */ struct zbus_channel_observation *observation; struct zbus_channel_observation_mask *observation_mask; +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) + buf = _zbus_create_net_buf(&_zbus_msg_subscribers_pool, zbus_chan_msg_size(chan), + sys_timepoint_timeout(end_time)); + + _ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is " + "unavailable or heap is full"); + + net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan)); +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ + + LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan)); + + int __maybe_unused index = 0; + for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; i < limit; ++i) { STRUCT_SECTION_GET(zbus_channel_observation, i, &observation); @@ -83,15 +149,21 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t continue; } - err = _zbus_notify_observer(chan, obs, end_time); - - _ZBUS_ASSERT(err == 0, - "could not deliver notification to observer %s. Error code %d", - _ZBUS_OBS_NAME(obs), err); + err = _zbus_notify_observer(chan, obs, end_time, buf); if (err) { last_error = err; + LOG_ERR("could not deliver notification to observer %s. Error code %d", + _ZBUS_OBS_NAME(obs), err); + if (err == -ENOMEM) { + if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) { + net_buf_unref(buf); + } + return err; + } } + + LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs)); } #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS) @@ -100,15 +172,13 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { - _ZBUS_ASSERT(obs_nd != NULL, "observer node is NULL"); - const struct zbus_observer *obs = obs_nd->obs; if (!obs->enabled) { continue; } - err = _zbus_notify_observer(chan, obs, end_time); + err = _zbus_notify_observer(chan, obs, end_time, buf); if (err) { last_error = err; @@ -116,6 +186,8 @@ static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t } #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */ + IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);)) + return last_error; } @@ -215,15 +287,43 @@ int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **c { _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs"); _ZBUS_ASSERT(sub != NULL, "sub is required"); + _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); + _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); _ZBUS_ASSERT(chan != NULL, "chan is required"); - if (sub->queue == NULL) { - return -EINVAL; - } - return k_msgq_get(sub->queue, chan, timeout); } +#if defined(CONFIG_ZBUS_MSG_SUBSCRIBER) + +int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, + k_timeout_t timeout) +{ + _ZBUS_ASSERT(!k_is_in_isr(), "zbus subscribers cannot be used inside ISRs"); + _ZBUS_ASSERT(sub != NULL, "sub is required"); + _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, + "sub must be a MSG_SUBSCRIBER"); + _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required"); + _ZBUS_ASSERT(chan != NULL, "chan is required"); + _ZBUS_ASSERT(msg != NULL, "msg is required"); + + struct net_buf *buf = net_buf_get(sub->message_fifo, timeout); + + if (buf == NULL) { + return -ENOMSG; + } + + *chan = *((struct zbus_channel **)net_buf_user_data(buf)); + + memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan)); + + net_buf_unref(buf); + + return 0; +} + +#endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */ + int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, const struct zbus_channel *chan, bool masked) {