By using an mpsc queue for each iodev, the iodev itself is shareable across contexts. Since its lock free, submits may occur even from an ISR context. Rather than a fixed size queue, and with it the possibility of running out of pre-allocated spots, each iodev now holds a wait-free mpsc queue head. This changes the parameter of iodev submit to be a struct containing 4 pointers for the rtio context, the submission queue entry, and the mpsc node for the iodevs submission queue. This solves the problem involving busy iodevs working with real devices. For example a busy SPI bus driver could enqueue, without locking, a request to start once the current request is done. The queue entries are expected to be owned and allocated by the executor rather than the iodev. This helps simplify potential tuning knobs to one place, the RTIO context and its executor an application directly uses. As the test case shows iodevs can operate effectively lock free with the mpsc queue and a single atomic denoting the current task. Signed-off-by: Tom Burdick <thomas.burdick@intel.com>
280 lines
7.3 KiB
C
280 lines
7.3 KiB
C
/*
|
|
* Copyright (c) 2022 Intel Corporation.
|
|
*
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
#include <zephyr/spinlock.h>
|
|
#include <zephyr/rtio/rtio_executor_concurrent.h>
|
|
#include <zephyr/rtio/rtio.h>
|
|
#include <zephyr/kernel.h>
|
|
|
|
#include <zephyr/logging/log.h>
|
|
LOG_MODULE_REGISTER(rtio_executor_concurrent, CONFIG_RTIO_LOG_LEVEL);
|
|
|
|
#define CONEX_TASK_COMPLETE BIT(0)
|
|
#define CONEX_TASK_SUSPENDED BIT(1)
|
|
|
|
|
|
/**
|
|
* @file
|
|
* @brief Concurrent RTIO Executor
|
|
*
|
|
* The concurrent executor provides fixed amounts of concurrency
|
|
* using minimal overhead but assumes a small number of concurrent tasks.
|
|
*
|
|
* Many of the task lookup and management functions in here are O(N) over N
|
|
* tasks. This is fine when the task set is *small*. Task lookup could be
|
|
* improved in the future with a binary search at the expense of code size.
|
|
*
|
|
* The assumption here is that perhaps only 8-16 concurrent tasks are likely
|
|
* such that simple short for loops over task array are reasonably fast.
|
|
*
|
|
* A maximum of 65K submissions queue entries are possible.
|
|
*/
|
|
|
|
/**
|
|
* check if there is a free task available
|
|
*/
|
|
static bool conex_task_free(struct rtio_concurrent_executor *exc)
|
|
{
|
|
return (exc->task_in - exc->task_out) < (exc->task_mask + 1);
|
|
}
|
|
|
|
/**
|
|
* get the next free available task index
|
|
*/
|
|
static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
|
|
{
|
|
uint16_t task_id = exc->task_in;
|
|
|
|
exc->task_in++;
|
|
return task_id;
|
|
}
|
|
|
|
static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
|
|
const struct rtio_iodev_sqe *iodev_sqe)
|
|
{
|
|
__ASSERT_NO_MSG(iodev_sqe <= &exc->task_cur[exc->task_mask] &&
|
|
iodev_sqe >= &exc->task_cur[0]);
|
|
return iodev_sqe - &exc->task_cur[0];
|
|
}
|
|
|
|
static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc)
|
|
{
|
|
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
|
|
|
|
while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) {
|
|
rtio_spsc_release(r->sq);
|
|
sqe = rtio_spsc_consume(r->sq);
|
|
}
|
|
|
|
rtio_spsc_release(r->sq);
|
|
}
|
|
|
|
static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
|
|
{
|
|
/* In order sweep up */
|
|
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
|
|
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) {
|
|
LOG_INF("sweeping oldest task %d", task_id);
|
|
conex_sweep_task(r, exc);
|
|
exc->task_out++;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
|
|
{
|
|
/* In order resume tasks */
|
|
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
|
|
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
|
|
LOG_INF("resuming suspended task %d", task_id);
|
|
exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED;
|
|
rtio_iodev_submit(&exc->task_cur[task_id]);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
|
|
{
|
|
conex_sweep(r, exc);
|
|
conex_resume(r, exc);
|
|
}
|
|
|
|
/**
|
|
* @brief Submit submissions to concurrent executor
|
|
*
|
|
* @param r RTIO context
|
|
*
|
|
* @retval 0 Always succeeds
|
|
*/
|
|
int rtio_concurrent_submit(struct rtio *r)
|
|
{
|
|
|
|
LOG_INF("submit");
|
|
|
|
struct rtio_concurrent_executor *exc =
|
|
(struct rtio_concurrent_executor *)r->executor;
|
|
struct rtio_sqe *sqe;
|
|
struct rtio_sqe *last_sqe;
|
|
k_spinlock_key_t key;
|
|
|
|
key = k_spin_lock(&exc->lock);
|
|
|
|
/* If never submitted before peek at the first item
|
|
* otherwise start back up where the last submit call
|
|
* left off
|
|
*/
|
|
if (exc->last_sqe == NULL) {
|
|
sqe = rtio_spsc_peek(r->sq);
|
|
} else {
|
|
/* Pickup from last submit call */
|
|
sqe = rtio_spsc_next(r->sq, exc->last_sqe);
|
|
}
|
|
|
|
last_sqe = sqe;
|
|
while (sqe != NULL && conex_task_free(exc)) {
|
|
LOG_INF("head SQE in chain %p", sqe);
|
|
|
|
/* Get the next task id if one exists */
|
|
uint16_t task_idx = conex_task_next(exc);
|
|
|
|
LOG_INF("setting up task %d", task_idx);
|
|
|
|
/* Setup task (yes this is it) */
|
|
exc->task_cur[task_idx].sqe = sqe;
|
|
exc->task_cur[task_idx].r = r;
|
|
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
|
|
|
|
LOG_INF("submitted sqe %p", sqe);
|
|
/* Go to the next sqe not in the current chain */
|
|
while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) {
|
|
sqe = rtio_spsc_next(r->sq, sqe);
|
|
}
|
|
|
|
LOG_INF("tail SQE in chain %p", sqe);
|
|
|
|
last_sqe = sqe;
|
|
|
|
/* SQE is the end of the previous chain */
|
|
sqe = rtio_spsc_next(r->sq, sqe);
|
|
}
|
|
|
|
/* Out of available pointers, wait til others complete, note the
|
|
* first pending submission queue. May be NULL if nothing is pending.
|
|
*/
|
|
exc->pending_sqe = sqe;
|
|
|
|
/**
|
|
* Run through the queue until the last item
|
|
* and take not of it
|
|
*/
|
|
while (sqe != NULL) {
|
|
last_sqe = sqe;
|
|
sqe = rtio_spsc_next(r->sq, sqe);
|
|
}
|
|
|
|
/* Note the last sqe for the next submit call */
|
|
exc->last_sqe = last_sqe;
|
|
|
|
/* Resume all suspended tasks */
|
|
conex_resume(r, exc);
|
|
|
|
k_spin_unlock(&exc->lock, key);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* @brief Callback from an iodev describing success
|
|
*/
|
|
void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
|
|
{
|
|
struct rtio *r = iodev_sqe->r;
|
|
const struct rtio_sqe *sqe = iodev_sqe->sqe;
|
|
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
|
|
const struct rtio_sqe *next_sqe;
|
|
k_spinlock_key_t key;
|
|
|
|
/* Interrupt may occur in spsc_acquire, breaking the contract
|
|
* so spin around it effectively preventing another interrupt on
|
|
* this core, and another core trying to concurrently work in here.
|
|
*
|
|
* This can and should be broken up into a few sections with a try
|
|
* lock around the sweep and resume.
|
|
*/
|
|
key = k_spin_lock(&exc->lock);
|
|
|
|
rtio_cqe_submit(r, result, sqe->userdata);
|
|
|
|
/* Determine the task id by memory offset O(1) */
|
|
uint16_t task_id = conex_task_id(exc, iodev_sqe);
|
|
|
|
if (sqe->flags & RTIO_SQE_CHAINED) {
|
|
next_sqe = rtio_spsc_next(r->sq, sqe);
|
|
|
|
exc->task_cur[task_id].sqe = next_sqe;
|
|
rtio_iodev_submit(&exc->task_cur[task_id]);
|
|
|
|
} else {
|
|
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
|
|
}
|
|
|
|
|
|
/* Sweep up unused SQEs and tasks, retry suspended tasks */
|
|
/* TODO Use a try lock here and don't bother doing it if we are already
|
|
* doing it elsewhere
|
|
*/
|
|
conex_sweep_resume(r, exc);
|
|
|
|
k_spin_unlock(&exc->lock, key);
|
|
}
|
|
|
|
/**
|
|
* @brief Callback from an iodev describing error
|
|
*/
|
|
void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
|
|
{
|
|
const struct rtio_sqe *nsqe;
|
|
k_spinlock_key_t key;
|
|
struct rtio *r = iodev_sqe->r;
|
|
const struct rtio_sqe *sqe = iodev_sqe->sqe;
|
|
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
|
|
|
|
/* Another interrupt (and sqe complete) may occur in spsc_acquire,
|
|
* breaking the contract so spin around it effectively preventing another
|
|
* interrupt on this core, and another core trying to concurrently work
|
|
* in here.
|
|
*
|
|
* This can and should be broken up into a few sections with a try
|
|
* lock around the sweep and resume.
|
|
*/
|
|
key = k_spin_lock(&exc->lock);
|
|
|
|
rtio_cqe_submit(r, result, sqe->userdata);
|
|
|
|
/* Determine the task id : O(1) */
|
|
uint16_t task_id = conex_task_id(exc, iodev_sqe);
|
|
|
|
sqe = iodev_sqe->sqe;
|
|
|
|
/* Fail the remaining sqe's in the chain */
|
|
if (sqe->flags & RTIO_SQE_CHAINED) {
|
|
nsqe = rtio_spsc_next(r->sq, sqe);
|
|
while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) {
|
|
rtio_cqe_submit(r, -ECANCELED, nsqe->userdata);
|
|
nsqe = rtio_spsc_next(r->sq, nsqe);
|
|
}
|
|
}
|
|
|
|
/* Task is complete (failed) */
|
|
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
|
|
|
|
conex_sweep_resume(r, exc);
|
|
|
|
k_spin_unlock(&exc->lock, key);
|
|
}
|