From b0caaed43b0e8dabb46a9f0d673b152da7e520b2 Mon Sep 17 00:00:00 2001 From: Luis Ubieda Date: Sat, 5 Jul 2025 10:59:27 -0400 Subject: [PATCH] rtio: workq: Restructure workqueue as a threads pool with a queue Based on a discussion around P4WQ limitations for our application, it was determined that the RTIO workqueue required the ability to use additional threads from the pool in spite of work items blocked. Since this is not covered, nor desired for the P4WQ, then remove this dependency and re-implement it in a way that covers also this use-case. Signed-off-by: Luis Ubieda --- subsys/rtio/Kconfig.workq | 10 ++-- subsys/rtio/rtio_workq.c | 98 +++++++++++++++----------------- tests/subsys/rtio/workq/prj.conf | 1 - 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/subsys/rtio/Kconfig.workq b/subsys/rtio/Kconfig.workq index de9106c91eb..296854c7e3b 100644 --- a/subsys/rtio/Kconfig.workq +++ b/subsys/rtio/Kconfig.workq @@ -3,8 +3,6 @@ config RTIO_WORKQ bool "RTIO Work-queues service to process Sync operations" - select SCHED_DEADLINE - select P4WQ_INIT_STAGE_EARLY select RTIO_CONSUME_SEM help Enable RTIO Work-queues to allow processing synchronous operations @@ -12,10 +10,14 @@ config RTIO_WORKQ if RTIO_WORKQ -config RTIO_WORKQ_PRIO_MED - int "Medium Thread priority of RTIO Work-queues" +config RTIO_WORKQ_THREADS_POOL_PRIO + int "Priority of RTIO Workqueue Threads Pool" default MAIN_THREAD_PRIORITY +config RTIO_WORKQ_THREADS_POOL_STACK_SIZE + int "Priority of RTIO Workqueue Threads Pool" + default 1024 + config RTIO_WORKQ_STACK_SIZE int "Thread stack-size of RTIO Workqueues" default 2048 diff --git a/subsys/rtio/rtio_workq.c b/subsys/rtio/rtio_workq.c index f04acb46525..7c17ef5491c 100644 --- a/subsys/rtio/rtio_workq.c +++ b/subsys/rtio/rtio_workq.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2024 Croxel Inc. + * Copyright (c) 2025 Croxel Inc. * * SPDX-License-Identifier: Apache-2.0 */ @@ -7,37 +8,15 @@ #include #include -#define RTIO_WORKQ_PRIO_MED CONFIG_RTIO_WORKQ_PRIO_MED -#define RTIO_WORKQ_PRIO_HIGH RTIO_WORKQ_PRIO_MED - 1 -#define RTIO_WORKQ_PRIO_LOW RTIO_WORKQ_PRIO_MED + 1 - K_MEM_SLAB_DEFINE_STATIC(rtio_work_items_slab, sizeof(struct rtio_work_req), CONFIG_RTIO_WORKQ_POOL_ITEMS, 4); - -static void rtio_work_req_done_handler(struct k_p4wq_work *work) -{ - struct rtio_work_req *req = CONTAINER_OF(work, - struct rtio_work_req, - work); - k_mem_slab_free(&rtio_work_items_slab, req); -} - -K_P4WQ_DEFINE_WITH_DONE_HANDLER(rtio_workq, - CONFIG_RTIO_WORKQ_THREADS_POOL, - CONFIG_RTIO_WORKQ_STACK_SIZE, - rtio_work_req_done_handler); - -static void rtio_work_handler(struct k_p4wq_work *work) -{ - struct rtio_work_req *req = CONTAINER_OF(work, - struct rtio_work_req, - work); - struct rtio_iodev_sqe *iodev_sqe = req->iodev_sqe; - - req->handler(iodev_sqe); -} +static K_THREAD_STACK_ARRAY_DEFINE(rtio_workq_threads_stack, + CONFIG_RTIO_WORKQ_THREADS_POOL, + CONFIG_RTIO_WORKQ_THREADS_POOL_STACK_SIZE); +static struct k_thread rtio_work_threads[CONFIG_RTIO_WORKQ_THREADS_POOL]; +static K_QUEUE_DEFINE(rtio_workq); struct rtio_work_req *rtio_work_req_alloc(void) { @@ -49,12 +28,6 @@ struct rtio_work_req *rtio_work_req_alloc(void) return NULL; } - /** Initialize work item before using it as it comes - * from a Memory slab (no-init region). - */ - req->work.thread = NULL; - (void)k_sem_init(&req->work.done_sem, 1, 1); - return req; } @@ -71,31 +44,52 @@ void rtio_work_req_submit(struct rtio_work_req *req, return; } - struct k_p4wq_work *work = &req->work; - struct rtio_sqe *sqe = &iodev_sqe->sqe; - - /** Link the relevant info so that we can get it on the k_p4wq_work work item. - */ req->iodev_sqe = iodev_sqe; req->handler = handler; - /** Set the required information to handle the action */ - work->handler = rtio_work_handler; - work->deadline = 0; - - if (sqe->prio == RTIO_PRIO_LOW) { - work->priority = RTIO_WORKQ_PRIO_LOW; - } else if (sqe->prio == RTIO_PRIO_HIGH) { - work->priority = RTIO_WORKQ_PRIO_HIGH; - } else { - work->priority = RTIO_WORKQ_PRIO_MED; - } - - /** Decoupling action: Let the P4WQ execute the action. */ - k_p4wq_submit(&rtio_workq, work); + /** For now we're simply treating this as a FIFO queue. It may be + * desirable to expand this to handle queue ordering based on RTIO + * SQE priority. + */ + k_queue_append(&rtio_workq, req); } uint32_t rtio_work_req_used_count_get(void) { return k_mem_slab_num_used_get(&rtio_work_items_slab); } + +static void rtio_workq_thread_fn(void *arg1, void *arg2, void *arg3) +{ + ARG_UNUSED(arg1); + ARG_UNUSED(arg2); + ARG_UNUSED(arg3); + + while (true) { + struct rtio_work_req *req = k_queue_get(&rtio_workq, K_FOREVER); + + if (req != NULL) { + req->handler(req->iodev_sqe); + + k_mem_slab_free(&rtio_work_items_slab, req); + } + } +} + +static int static_init(void) +{ + for (size_t i = 0 ; i < ARRAY_SIZE(rtio_work_threads) ; i++) { + k_thread_create(&rtio_work_threads[i], + rtio_workq_threads_stack[i], + CONFIG_RTIO_WORKQ_THREADS_POOL_STACK_SIZE, + rtio_workq_thread_fn, + NULL, NULL, NULL, + CONFIG_RTIO_WORKQ_THREADS_POOL_PRIO, + 0, + K_NO_WAIT); + } + + return 0; +} + +SYS_INIT(static_init, POST_KERNEL, 1); diff --git a/tests/subsys/rtio/workq/prj.conf b/tests/subsys/rtio/workq/prj.conf index 39ef2dd6233..0f6cbf3d898 100644 --- a/tests/subsys/rtio/workq/prj.conf +++ b/tests/subsys/rtio/workq/prj.conf @@ -1,7 +1,6 @@ CONFIG_RTIO=y CONFIG_RTIO_WORKQ=y CONFIG_RTIO_WORKQ_THREADS_POOL=3 -CONFIG_RTIO_WORKQ_PRIO_MED=3 CONFIG_ZTEST=y CONFIG_ZTEST_THREAD_PRIORITY=8 CONFIG_MP_MAX_NUM_CPUS=1