diff --git a/kernel/pipe.c b/kernel/pipe.c index c4e19e1a450..9adfd270368 100644 --- a/kernel/pipe.c +++ b/kernel/pipe.c @@ -35,7 +35,7 @@ static inline bool pipe_empty(struct k_pipe *pipe) } static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key, - k_timepoint_t time_limit) + k_timepoint_t time_limit, bool *need_resched) { k_timeout_t timeout = sys_timepoint_timeout(time_limit); int rc; @@ -45,6 +45,7 @@ static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key } pipe->waiting++; + *need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout); rc = z_pend_curr(&pipe->lock, *key, waitq, timeout); *key = k_spin_lock(&pipe->lock); @@ -85,7 +86,7 @@ struct pipe_buf_spec { size_t used; }; -static size_t copy_to_pending_readers(struct k_pipe *pipe, +static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched, const uint8_t *data, size_t len) { struct k_thread *reader; @@ -132,6 +133,7 @@ static size_t copy_to_pending_readers(struct k_pipe *pipe, /* rest of thread wake-up outside the scheduler lock */ z_thread_return_value_set_with_data(reader, 0, NULL); z_ready_thread(reader); + *need_resched = true; } } while (reader != NULL && written < len); @@ -144,6 +146,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ size_t written = 0; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); + bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout); @@ -160,7 +163,8 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ if (pipe_empty(pipe)) { if (pipe->waiting != 0) { - written += copy_to_pending_readers(pipe, &data[written], + written += copy_to_pending_readers(pipe, &need_resched, + &data[written], len - written); if (written >= len) { rc = written; @@ -179,7 +183,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ break; } - rc = wait_for(&pipe->space, pipe, &key, end); + rc = wait_for(&pipe->space, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = written ? written : -EAGAIN; @@ -189,7 +193,11 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc); - k_spin_unlock(&pipe->lock, key); + if (need_resched) { + z_reschedule(&pipe->lock, key); + } else { + k_spin_unlock(&pipe->lock, key); + } return rc; } @@ -199,6 +207,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout int rc; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); + bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout); @@ -209,7 +218,8 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout for (;;) { if (pipe_full(pipe)) { - z_sched_wake(&pipe->space, 0, NULL); + /* One or more pending writers may exist. */ + need_resched = z_sched_wake_all(&pipe->space, 0, NULL); } buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used); @@ -226,7 +236,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout /* provide our "direct copy" info to potential writers */ _current->base.swap_data = &buf; - rc = wait_for(&pipe->data, pipe, &key, end); + rc = wait_for(&pipe->data, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = buf.used ? buf.used : -EAGAIN; @@ -236,7 +246,11 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc); - k_spin_unlock(&pipe->lock, key); + if (need_resched) { + z_reschedule(&pipe->lock, key); + } else { + k_spin_unlock(&pipe->lock, key); + } return rc; }