kernel/pipe: add missing calls to z_reschedule()
We are waking up threads but failed to let them run if they are higher priority. Add missing calls to z_reschedule(). Also wake up all pending writers as we don't know how many there might be. It is more efficient to wake them all when the ring buffer is full before reading from it rather than waking them one by one whenever there is more room in it. Thanks to Peter Mitsis for noticing those issues. Signed-off-by: Nicolas Pitre <npitre@baylibre.com>
This commit is contained in:
parent
99c2057bb6
commit
3da90f9d49
@ -35,7 +35,7 @@ static inline bool pipe_empty(struct k_pipe *pipe)
|
||||
}
|
||||
|
||||
static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key,
|
||||
k_timepoint_t time_limit)
|
||||
k_timepoint_t time_limit, bool *need_resched)
|
||||
{
|
||||
k_timeout_t timeout = sys_timepoint_timeout(time_limit);
|
||||
int rc;
|
||||
@ -45,6 +45,7 @@ static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key
|
||||
}
|
||||
|
||||
pipe->waiting++;
|
||||
*need_resched = false;
|
||||
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout);
|
||||
rc = z_pend_curr(&pipe->lock, *key, waitq, timeout);
|
||||
*key = k_spin_lock(&pipe->lock);
|
||||
@ -85,7 +86,7 @@ struct pipe_buf_spec {
|
||||
size_t used;
|
||||
};
|
||||
|
||||
static size_t copy_to_pending_readers(struct k_pipe *pipe,
|
||||
static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched,
|
||||
const uint8_t *data, size_t len)
|
||||
{
|
||||
struct k_thread *reader;
|
||||
@ -132,6 +133,7 @@ static size_t copy_to_pending_readers(struct k_pipe *pipe,
|
||||
/* rest of thread wake-up outside the scheduler lock */
|
||||
z_thread_return_value_set_with_data(reader, 0, NULL);
|
||||
z_ready_thread(reader);
|
||||
*need_resched = true;
|
||||
}
|
||||
} while (reader != NULL && written < len);
|
||||
|
||||
@ -144,6 +146,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
|
||||
size_t written = 0;
|
||||
k_timepoint_t end = sys_timepoint_calc(timeout);
|
||||
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
|
||||
bool need_resched = false;
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout);
|
||||
|
||||
@ -160,7 +163,8 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
|
||||
|
||||
if (pipe_empty(pipe)) {
|
||||
if (pipe->waiting != 0) {
|
||||
written += copy_to_pending_readers(pipe, &data[written],
|
||||
written += copy_to_pending_readers(pipe, &need_resched,
|
||||
&data[written],
|
||||
len - written);
|
||||
if (written >= len) {
|
||||
rc = written;
|
||||
@ -179,7 +183,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
|
||||
break;
|
||||
}
|
||||
|
||||
rc = wait_for(&pipe->space, pipe, &key, end);
|
||||
rc = wait_for(&pipe->space, pipe, &key, end, &need_resched);
|
||||
if (rc != 0) {
|
||||
if (rc == -EAGAIN) {
|
||||
rc = written ? written : -EAGAIN;
|
||||
@ -189,7 +193,11 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
|
||||
}
|
||||
exit:
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc);
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
if (need_resched) {
|
||||
z_reschedule(&pipe->lock, key);
|
||||
} else {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -199,6 +207,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
|
||||
int rc;
|
||||
k_timepoint_t end = sys_timepoint_calc(timeout);
|
||||
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
|
||||
bool need_resched = false;
|
||||
|
||||
SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout);
|
||||
|
||||
@ -209,7 +218,8 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
|
||||
|
||||
for (;;) {
|
||||
if (pipe_full(pipe)) {
|
||||
z_sched_wake(&pipe->space, 0, NULL);
|
||||
/* One or more pending writers may exist. */
|
||||
need_resched = z_sched_wake_all(&pipe->space, 0, NULL);
|
||||
}
|
||||
|
||||
buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
|
||||
@ -226,7 +236,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
|
||||
/* provide our "direct copy" info to potential writers */
|
||||
_current->base.swap_data = &buf;
|
||||
|
||||
rc = wait_for(&pipe->data, pipe, &key, end);
|
||||
rc = wait_for(&pipe->data, pipe, &key, end, &need_resched);
|
||||
if (rc != 0) {
|
||||
if (rc == -EAGAIN) {
|
||||
rc = buf.used ? buf.used : -EAGAIN;
|
||||
@ -236,7 +246,11 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
|
||||
}
|
||||
exit:
|
||||
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc);
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
if (need_resched) {
|
||||
z_reschedule(&pipe->lock, key);
|
||||
} else {
|
||||
k_spin_unlock(&pipe->lock, key);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user