zephyr/subsys/rtio/rtio_executor.c
Tom Burdick 054f453ea7 rtio: Callback chaining and testing
Callbacks were a bit neglected in terms of test coverage, especially
when used in chains. It was clear from the code that chained callbacks
may not actually work, and callback ordering then was hard to verify.
Test callbacks chained to transactions work as expected.

The test iodev had built up some cruft over time and in the process
showed a few bugs once callback chaining was fixed so the test iodev now
better matches typical iodev implementations at this point.

Cancellation testing now includes an added case for cancelling a the
second submission in the chain prior to calling submit noting that no
completions notifications should be given back for those.

Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
2024-05-29 08:41:12 +02:00

200 lines
5.5 KiB
C

/*
* Copyright (c) 2022 Intel Corporation.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
/**
* @brief Executor handled submissions
*/
static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
{
const struct rtio_sqe *sqe = &iodev_sqe->sqe;
switch (sqe->op) {
case RTIO_OP_CALLBACK:
sqe->callback(iodev_sqe->r, sqe, sqe->arg0);
rtio_iodev_sqe_ok(iodev_sqe, 0);
break;
default:
rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
}
}
/**
* @brief Submit to an iodev a submission to work on
*
* Should be called by the executor when it wishes to submit work
* to an iodev.
*
* @param iodev_sqe Submission to work on
*/
static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
{
if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
return;
}
/* No iodev means its an executor specific operation */
if (iodev_sqe->sqe.iodev == NULL) {
rtio_executor_op(iodev_sqe);
return;
}
iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
}
/**
* @brief Submit operations in the queue to iodevs
*
* @param r RTIO context
*
* @retval 0 Always succeeds
*/
void rtio_executor_submit(struct rtio *r)
{
const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq);
while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
/* If this submission was cancelled before submit, then generate no response */
if (iodev_sqe->sqe.flags & RTIO_SQE_CANCELED) {
iodev_sqe->sqe.flags |= cancel_no_response;
}
iodev_sqe->r = r;
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
/* Link up transaction or queue list if needed */
while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
#ifdef CONFIG_ASSERT
bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
__ASSERT(transaction != chained,
"Expected chained or transaction flag, not both");
#endif
node = rtio_mpsc_pop(&iodev_sqe->r->sq);
next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
/* If the current submission was cancelled before submit,
* then cancel the next one and generate no response
*/
if (curr->sqe.flags & RTIO_SQE_CANCELED) {
next->sqe.flags |= cancel_no_response;
}
curr->next = next;
curr = next;
curr->r = r;
__ASSERT(
curr != NULL,
"Expected a valid sqe following transaction or chain flag");
}
curr->next = NULL;
curr->r = r;
rtio_iodev_submit(iodev_sqe);
node = rtio_mpsc_pop(&r->sq);
}
}
/**
* @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
*
* @param[in] r RTIO context
* @param[in] curr Current IODev SQE that's being marked for finished.
* @param[in] is_canceled Whether or not the SQE is canceled
*/
static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_iodev_sqe *curr,
bool is_canceled)
{
/* Reset the mempool if needed */
if (curr->sqe.op == RTIO_OP_RX && FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, curr->sqe.flags)) {
if (is_canceled) {
/* Free the memory first since no CQE will be generated */
LOG_DBG("Releasing memory @%p size=%u", (void *)curr->sqe.buf,
curr->sqe.buf_len);
rtio_release_buffer(r, curr->sqe.buf, curr->sqe.buf_len);
}
/* Reset the buffer info so the next request can get a new one */
curr->sqe.buf = NULL;
curr->sqe.buf_len = 0;
}
if (!is_canceled) {
/* Request was not canceled, put the SQE back in the queue */
rtio_mpsc_push(&r->sq, &curr->q);
rtio_executor_submit(r);
}
}
static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
{
const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
struct rtio *r = iodev_sqe->r;
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
void *userdata;
uint32_t sqe_flags, cqe_flags;
do {
userdata = curr->sqe.userdata;
sqe_flags = curr->sqe.flags;
cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
next = rtio_iodev_sqe_next(curr);
if (is_multishot) {
rtio_executor_handle_multishot(r, curr, is_canceled);
}
if (!is_multishot || is_canceled) {
/* SQE is no longer needed, release it */
rtio_sqe_pool_free(r->sqe_pool, curr);
}
if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
/* Request was not canceled, generate a CQE */
rtio_cqe_submit(r, result, userdata, cqe_flags);
}
curr = next;
if (!is_ok) {
/* This is an error path, so cancel any chained SQEs */
result = -ECANCELED;
}
} while (sqe_flags & RTIO_SQE_TRANSACTION);
/* curr should now be the last sqe in the transaction if that is what completed */
if (sqe_flags & RTIO_SQE_CHAINED) {
rtio_iodev_submit(curr);
}
}
/**
* @brief Callback from an iodev describing success
*/
void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
{
rtio_executor_done(iodev_sqe, result, true);
}
/**
* @brief Callback from an iodev describing error
*
* Some assumptions are made and should have been validated on rtio_submit
* - a sqe marked as chained or transaction has a next sqe
* - a sqe is marked either chained or transaction but not both
*/
void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
{
rtio_executor_done(iodev_sqe, result, false);
}