diff --git a/lib/os/mpsc_pbuf.c b/lib/os/mpsc_pbuf.c index 4b3b68b1c9a..52a26b5d416 100644 --- a/lib/os/mpsc_pbuf.c +++ b/lib/os/mpsc_pbuf.c @@ -124,7 +124,7 @@ static inline bool is_invalid(union mpsc_pbuf_generic *item) } static inline uint32_t idx_inc(struct mpsc_pbuf_buffer *buffer, - uint32_t idx, uint32_t val) + uint32_t idx, int32_t val) { uint32_t i = idx + val; @@ -145,7 +145,7 @@ static inline uint32_t get_skip(union mpsc_pbuf_generic *item) } -static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen) +static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, int32_t wlen) { buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen); if (buffer->tmp_wr_idx == buffer->rd_idx) { @@ -153,7 +153,7 @@ static ALWAYS_INLINE void tmp_wr_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32 } } -static void rd_idx_inc(struct mpsc_pbuf_buffer *buffer, uint32_t wlen) +static void rd_idx_inc(struct mpsc_pbuf_buffer *buffer, int32_t wlen) { buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen); buffer->flags &= ~MPSC_PBUF_FULL; @@ -170,66 +170,119 @@ static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen) buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen); } -/* Attempts to drop a packet. If user packets dropping is allowed then any - * type of packet is dropped. Otherwise only skip packets (internal padding). - * - * If user packet was dropped @p user_packet is set to true. Function returns - * a pointer to a dropped packet or null if nothing was dropped. It may point - * to user packet (@p user_packet set to true) or internal, skip packet. - */ -static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer, - uint32_t free_wlen, - bool allow_drop, - bool *user_packet) +static bool drop_item_locked(struct mpsc_pbuf_buffer *buffer, + uint32_t free_wlen, + union mpsc_pbuf_generic **item_to_drop, + uint32_t *tmp_wr_idx_shift) { union mpsc_pbuf_generic *item; - uint32_t rd_wlen; uint32_t skip_wlen; - *user_packet = false; item = (union mpsc_pbuf_generic *)&buffer->buf[buffer->rd_idx]; skip_wlen = get_skip(item); + *item_to_drop = NULL; + *tmp_wr_idx_shift = 0; - rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item); if (skip_wlen) { - MPSC_PBUF_DBG(NULL, "Skip packet found (len: %u)", skip_wlen); - allow_drop = true; - } else if (allow_drop) { - if (item->hdr.busy) { - MPSC_PBUF_DBG(NULL, "Busy user packet found"); - /* item is currently processed and cannot be overwritten. */ - if (free_wlen) { - add_skip_item(buffer, free_wlen); - } - buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen); - tmp_wr_idx_inc(buffer, rd_wlen); + /* Skip packet found, can be dropped to free some space */ + MPSC_PBUF_DBG(buffer, "no space: Found skip packet %d len", skip_wlen); - /* Get next itme followed the busy one. */ - uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen); - - item = (union mpsc_pbuf_generic *)&buffer->buf[next_rd_idx]; - skip_wlen = get_skip(item); - if (skip_wlen) { - rd_wlen += skip_wlen; - } else { - rd_wlen += buffer->get_wlen(item); - *user_packet = true; - } - } else { - MPSC_PBUF_DBG(NULL, "User packet to drop (len %u)", rd_wlen); - *user_packet = true; - } - } else { - item = NULL; + rd_idx_inc(buffer, skip_wlen); + buffer->tmp_rd_idx = buffer->rd_idx; + return true; } - if (allow_drop) { + /* Other options for dropping available only in overwrite mode. */ + if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE)) { + return false; + } + + uint32_t rd_wlen = buffer->get_wlen(item); + + /* If packet is busy need to be ommited. */ + if (!is_valid(item)) { + return false; + } else if (item->hdr.busy) { + MPSC_PBUF_DBG(buffer, "no space: Found busy packet %p (len:%d)", item, rd_wlen); + /* Add skip packet before claimed packet. */ + if (free_wlen) { + add_skip_item(buffer, free_wlen); + MPSC_PBUF_DBG(buffer, "no space: Added skip packet (len:%d)", free_wlen); + } + /* Move all indexes forward, after claimed packet. */ + buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen); + + /* If allocation wrapped around the buffer and found busy packet + * that was already ommited, skip it again. + */ + if (buffer->rd_idx == buffer->tmp_rd_idx) { + buffer->tmp_rd_idx = idx_inc(buffer, buffer->tmp_rd_idx, rd_wlen); + } + + buffer->tmp_wr_idx = buffer->tmp_rd_idx; + buffer->rd_idx = buffer->tmp_rd_idx; + buffer->flags |= MPSC_PBUF_FULL; + } else { + /* Prepare packet dropping. */ rd_idx_inc(buffer, rd_wlen); buffer->tmp_rd_idx = buffer->rd_idx; - MPSC_PBUF_DBG(buffer, "Incremented rd indexes after drop"); + /* Temporary move tmp_wr idx forward to ensure that packet + * will not be dropped twice and content will not be + * overwritten. + */ + if (free_wlen) { + /* Free location mark as invalid to prevent + * reading incomplete data. + */ + union mpsc_pbuf_generic invalid = { + .hdr = { + .valid = 0, + .busy = 0 + } + }; + + buffer->buf[buffer->tmp_wr_idx] = invalid.raw; + } + + *tmp_wr_idx_shift = rd_wlen + free_wlen; + buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, *tmp_wr_idx_shift); + buffer->flags |= MPSC_PBUF_FULL; + item->hdr.valid = 0; + *item_to_drop = item; + MPSC_PBUF_DBG(buffer, "no space: dropping packet %p (len: %d)", + item, rd_wlen); } - return item; + return true; +} + +static void post_drop_action(struct mpsc_pbuf_buffer *buffer, + uint32_t prev_tmp_wr_idx, + uint32_t tmp_wr_idx_shift) +{ + uint32_t cmp_tmp_wr_idx = idx_inc(buffer, prev_tmp_wr_idx, tmp_wr_idx_shift); + + if (cmp_tmp_wr_idx == buffer->tmp_wr_idx) { + /* Operation not interrupted by another alloc. */ + buffer->tmp_wr_idx = prev_tmp_wr_idx; + buffer->flags &= ~MPSC_PBUF_FULL; + return; + } + + /* Operation interrupted, mark area as to be skipped. */ + union mpsc_pbuf_generic skip = { + .skip = { + .valid = 0, + .busy = 1, + .len = tmp_wr_idx_shift + } + }; + + buffer->buf[prev_tmp_wr_idx] = skip.raw; + buffer->wr_idx = idx_inc(buffer, + buffer->wr_idx, + tmp_wr_idx_shift); + /* full flag? */ } void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer, @@ -239,11 +292,17 @@ void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer, uint32_t free_wlen; k_spinlock_key_t key; union mpsc_pbuf_generic *dropped_item = NULL; - bool valid_drop; + uint32_t tmp_wr_idx_shift = 0; + uint32_t tmp_wr_idx_val = 0; do { - cont = false; key = k_spin_lock(&buffer->lock); + + if (tmp_wr_idx_shift) { + post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift); + tmp_wr_idx_shift = 0; + } + (void)free_space(buffer, &free_wlen); MPSC_PBUF_DBG(buffer, "put_word (%d free space)", (int)free_wlen); @@ -251,26 +310,25 @@ void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer, if (free_wlen) { buffer->buf[buffer->tmp_wr_idx] = item.raw; tmp_wr_idx_inc(buffer, 1); + cont = false; buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1); max_utilization_update(buffer); } else { - bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE; - - dropped_item = drop_item_locked(buffer, free_wlen, - user_drop, &valid_drop); - cont = dropped_item != NULL; + tmp_wr_idx_val = buffer->tmp_wr_idx; + cont = drop_item_locked(buffer, free_wlen, + &dropped_item, &tmp_wr_idx_shift); } k_spin_unlock(&buffer->lock, key); - if (cont && valid_drop) { + if (dropped_item) { /* Notify about item being dropped. */ if (buffer->notify_drop) { buffer->notify_drop(buffer, dropped_item); } + dropped_item = NULL; } } while (cont); - } union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, @@ -278,9 +336,10 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, { union mpsc_pbuf_generic *item = NULL; union mpsc_pbuf_generic *dropped_item = NULL; - bool cont; + bool cont = true; uint32_t free_wlen; - bool valid_drop; + uint32_t tmp_wr_idx_shift = 0; + uint32_t tmp_wr_idx_val = 0; MPSC_PBUF_DBG(buffer, "alloc %d words", (int)wlen); @@ -293,8 +352,12 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, k_spinlock_key_t key; bool wrap; - cont = false; key = k_spin_lock(&buffer->lock); + if (tmp_wr_idx_shift) { + post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift); + tmp_wr_idx_shift = 0; + } + wrap = free_space(buffer, &free_wlen); if (free_wlen >= wlen) { @@ -303,30 +366,25 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, item->hdr.valid = 0; item->hdr.busy = 0; tmp_wr_idx_inc(buffer, wlen); + cont = false; } else if (wrap) { add_skip_item(buffer, free_wlen); cont = true; - } else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) && - !k_is_in_isr()) { + } else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) && !k_is_in_isr()) { int err; k_spin_unlock(&buffer->lock, key); err = k_sem_take(&buffer->sem, timeout); key = k_spin_lock(&buffer->lock); - if (err == 0) { - cont = true; - } - } else { - bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE; - - dropped_item = drop_item_locked(buffer, free_wlen, - user_drop, &valid_drop); - cont = dropped_item != NULL; + cont = (err == 0) ? true : false; + } else if (cont) { + tmp_wr_idx_val = buffer->tmp_wr_idx; + cont = drop_item_locked(buffer, free_wlen, + &dropped_item, &tmp_wr_idx_shift); } - k_spin_unlock(&buffer->lock, key); - if (cont && dropped_item && valid_drop) { + if (dropped_item) { /* Notify about item being dropped. */ if (buffer->notify_drop) { buffer->notify_drop(buffer, dropped_item); @@ -335,6 +393,7 @@ union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer, } } while (cont); + MPSC_PBUF_DBG(buffer, "allocated %p", item); if (IS_ENABLED(CONFIG_MPSC_CLEAR_ALLOCATED) && item) { @@ -367,15 +426,21 @@ void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer, (sizeof(item) + sizeof(data)) / sizeof(uint32_t); union mpsc_pbuf_generic *dropped_item = NULL; bool cont; - bool valid_drop; + uint32_t tmp_wr_idx_shift = 0; + uint32_t tmp_wr_idx_val = 0; do { k_spinlock_key_t key; uint32_t free_wlen; bool wrap; - cont = false; key = k_spin_lock(&buffer->lock); + + if (tmp_wr_idx_shift) { + post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift); + tmp_wr_idx_shift = 0; + } + wrap = free_space(buffer, &free_wlen); if (free_wlen >= l) { @@ -386,21 +451,20 @@ void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer, *p = (void *)data; tmp_wr_idx_inc(buffer, l); buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l); + cont = false; max_utilization_update(buffer); } else if (wrap) { add_skip_item(buffer, free_wlen); cont = true; } else { - bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE; - - dropped_item = drop_item_locked(buffer, free_wlen, - user_drop, &valid_drop); - cont = dropped_item != NULL; + tmp_wr_idx_val = buffer->tmp_wr_idx; + cont = drop_item_locked(buffer, free_wlen, + &dropped_item, &tmp_wr_idx_shift); } k_spin_unlock(&buffer->lock, key); - if (cont && dropped_item && valid_drop) { + if (dropped_item) { /* Notify about item being dropped. */ if (buffer->notify_drop) { buffer->notify_drop(buffer, dropped_item); @@ -415,15 +479,21 @@ void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data, { bool cont; union mpsc_pbuf_generic *dropped_item = NULL; - bool valid_drop; + uint32_t tmp_wr_idx_shift = 0; + uint32_t tmp_wr_idx_val = 0; do { uint32_t free_wlen; k_spinlock_key_t key; bool wrap; - cont = false; key = k_spin_lock(&buffer->lock); + + if (tmp_wr_idx_shift) { + post_drop_action(buffer, tmp_wr_idx_val, tmp_wr_idx_shift); + tmp_wr_idx_shift = 0; + } + wrap = free_space(buffer, &free_wlen); if (free_wlen >= wlen) { @@ -431,22 +501,22 @@ void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data, wlen * sizeof(uint32_t)); buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen); tmp_wr_idx_inc(buffer, wlen); + cont = false; max_utilization_update(buffer); } else if (wrap) { add_skip_item(buffer, free_wlen); cont = true; } else { - bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE; - - dropped_item = drop_item_locked(buffer, free_wlen, - user_drop, &valid_drop); - cont = dropped_item != NULL; + tmp_wr_idx_val = buffer->tmp_wr_idx; + cont = drop_item_locked(buffer, free_wlen, + &dropped_item, &tmp_wr_idx_shift); } k_spin_unlock(&buffer->lock, key); - if (cont && dropped_item && valid_drop) { + if (dropped_item) { /* Notify about item being dropped. */ + dropped_item->hdr.valid = 0; if (buffer->notify_drop) { buffer->notify_drop(buffer, dropped_item); } @@ -471,6 +541,7 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer) &buffer->buf[buffer->tmp_rd_idx]; if (!a || is_invalid(item)) { + MPSC_PBUF_DBG(buffer, "invalid claim %d: %p", a, item); item = NULL; } else { uint32_t skip = get_skip(item); @@ -492,7 +563,7 @@ const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer) } if (!cont) { - MPSC_PBUF_DBG(buffer, "claimed: %p", item); + MPSC_PBUF_DBG(buffer, ">>claimed %d: %p", a, item); } k_spin_unlock(&buffer->lock, key); } while (cont); @@ -511,12 +582,22 @@ void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer, if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) || ((uint32_t *)item == &buffer->buf[buffer->rd_idx])) { witem->hdr.busy = 0; + if (buffer->rd_idx == buffer->tmp_rd_idx) { + /* There is a chance that there are so many new packets + * added between claim and free that rd_idx points again + * at claimed item. In that case tmp_rd_idx points at + * the same location. In that case increment also tmp_rd_idx + * which will mark freed buffer as the only free space in + * the buffer. + */ + buffer->tmp_rd_idx = idx_inc(buffer, buffer->tmp_rd_idx, wlen); + } rd_idx_inc(buffer, wlen); } else { MPSC_PBUF_DBG(buffer, "Allocation occurred during claim"); witem->skip.len = wlen; } - MPSC_PBUF_DBG(buffer, "freed: %p", item); + MPSC_PBUF_DBG(buffer, "<lock, key); k_sem_give(&buffer->sem);