diff --git a/include/zephyr/net/coap_client.h b/include/zephyr/net/coap_client.h index dc5d008c1b7..86c4911de5b 100644 --- a/include/zephyr/net/coap_client.h +++ b/include/zephyr/net/coap_client.h @@ -77,27 +77,29 @@ struct coap_client_option { }; /** @cond INTERNAL_HIDDEN */ +struct coap_client_internal_request { + uint8_t request_token[COAP_TOKEN_MAX_LEN]; + uint32_t offset; + uint32_t last_id; + uint8_t request_tkl; + uint8_t retry_count; + bool request_ongoing; + struct coap_block_context recv_blk_ctx; + struct coap_block_context send_blk_ctx; + struct coap_pending pending; + struct coap_client_request coap_request; + struct coap_packet request; +}; + struct coap_client { int fd; struct sockaddr address; socklen_t socklen; + bool response_ready; + struct k_mutex send_mutex; uint8_t send_buf[MAX_COAP_MSG_LEN]; uint8_t recv_buf[MAX_COAP_MSG_LEN]; - uint8_t request_token[COAP_TOKEN_MAX_LEN]; - int request_tkl; - int offset; - int retry_count; - struct coap_block_context recv_blk_ctx; - struct coap_block_context send_blk_ctx; - struct coap_pending pending; - struct coap_client_request *coap_request; - struct coap_packet request; - k_tid_t tid; - struct k_thread thread; - struct k_sem coap_client_recv_sem; - atomic_t coap_client_recv_active; - - K_THREAD_STACK_MEMBER(coap_thread_stack, CONFIG_COAP_CLIENT_STACK_SIZE); + struct coap_client_internal_request requests[CONFIG_COAP_CLIENT_MAX_REQUESTS]; }; /** @endcond */ @@ -123,7 +125,7 @@ int coap_client_init(struct coap_client *client, const char *info); * * @param client Client instance. * @param sock Open socket file descriptor. - * @param addr the destination address of the request. + * @param addr the destination address of the request, NULL if socket is already connected. * @param req CoAP request structure * @param retries How many times to retry or -1 to use default. * @return zero when operation started successfully or negative error code otherwise. diff --git a/subsys/net/lib/coap/Kconfig b/subsys/net/lib/coap/Kconfig index 12a7c0c9b00..e7587ab21ac 100644 --- a/subsys/net/lib/coap/Kconfig +++ b/subsys/net/lib/coap/Kconfig @@ -133,6 +133,18 @@ config COAP_CLIENT_STACK_SIZE int "Stack size of the CoAP client thread" default 1024 +config COAP_CLIENT_MAX_INSTANCES + int "Maximum number of CoAP clients" + default 2 + help + Maximum number of CoAP clients + +config COAP_CLIENT_MAX_REQUESTS + int "Maximum number of simultaneous requests per client" + default 2 + help + Maximum number of CoAP requests a single client can handle at a time + endif # COAP_CLIENT module = COAP diff --git a/subsys/net/lib/coap/coap_client.c b/subsys/net/lib/coap/coap_client.c index ec0c4d1f7fb..7c11d0502c6 100644 --- a/subsys/net/lib/coap/coap_client.c +++ b/subsys/net/lib/coap/coap_client.c @@ -18,21 +18,15 @@ LOG_MODULE_DECLARE(net_coap, CONFIG_COAP_LOG_LEVEL); #define COAP_PATH_ELEM_QUERY '?' #define COAP_PATH_ELEM_AMP '&' #define COAP_SEPARATE_TIMEOUT 6000 +#define COAP_PERIODIC_TIMEOUT 500 #define DEFAULT_RETRY_AMOUNT 5 #define BLOCK1_OPTION_SIZE 4 #define PAYLOAD_MARKER_SIZE 1 -static int coap_client_schedule_poll(struct coap_client *client, int sock, - struct coap_client_request *req) -{ - client->fd = sock; - client->coap_request = req; - - k_sem_give(&client->coap_client_recv_sem); - atomic_set(&client->coap_client_recv_active, 1); - - return 0; -} +static struct coap_client *clients[CONFIG_COAP_CLIENT_MAX_INSTANCES]; +static int num_clients; +static K_SEM_DEFINE(coap_client_recv_sem, 0, 1); +static atomic_t coap_client_recv_active; static int send_request(int sock, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen) @@ -54,15 +48,72 @@ static int receive(int sock, void *buf, size_t max_len, int flags, } } -static void reset_block_contexts(struct coap_client *client) +static void reset_block_contexts(struct coap_client_internal_request *request) { - client->recv_blk_ctx.block_size = 0; - client->recv_blk_ctx.total_size = 0; - client->recv_blk_ctx.current = 0; + request->recv_blk_ctx.block_size = 0; + request->recv_blk_ctx.total_size = 0; + request->recv_blk_ctx.current = 0; - client->send_blk_ctx.block_size = 0; - client->send_blk_ctx.total_size = 0; - client->send_blk_ctx.current = 0; + request->send_blk_ctx.block_size = 0; + request->send_blk_ctx.total_size = 0; + request->send_blk_ctx.current = 0; +} + +static void reset_internal_request(struct coap_client_internal_request *request) +{ + request->offset = 0; + request->last_id = 0; + request->retry_count = 0; + reset_block_contexts(request); +} + +static int coap_client_schedule_poll(struct coap_client *client, int sock, + struct coap_client_request *req, + struct coap_client_internal_request *internal_req) +{ + client->fd = sock; + memcpy(&internal_req->coap_request, req, sizeof(struct coap_client_request)); + internal_req->request_ongoing = true; + + if (!coap_client_recv_active) { + k_sem_give(&coap_client_recv_sem); + } + atomic_set(&coap_client_recv_active, 1); + + return 0; +} + +bool has_ongoing_request(struct coap_client *client) +{ + for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) { + if (client->requests[i].request_ongoing == true) { + return true; + } + } + + return false; +} + +struct coap_client_internal_request *get_free_request(struct coap_client *client) +{ + for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) { + if (client->requests[i].request_ongoing == false) { + return &client->requests[i]; + } + } + + return NULL; +} + +static bool has_ongoing_requests(void) +{ + bool has_requests = false; + + for (int i = 0; i < num_clients; i++) { + has_requests |= has_ongoing_request(clients[i]); + } + + return has_requests; } static int coap_client_init_path_options(struct coap_packet *pckt, const char *path) @@ -190,39 +241,55 @@ static enum coap_block_size coap_client_default_block_size(void) } static int coap_client_init_request(struct coap_client *client, - struct coap_client_request *req) + struct coap_client_request *req, + struct coap_client_internal_request *internal_req, + bool reconstruct) { int ret = 0; int i; memset(client->send_buf, 0, sizeof(client->send_buf)); - ret = coap_packet_init(&client->request, client->send_buf, MAX_COAP_MSG_LEN, 1, - req->confirmable ? COAP_TYPE_CON : COAP_TYPE_NON_CON, - COAP_TOKEN_MAX_LEN, coap_next_token(), req->method, - coap_next_id()); + + if (!reconstruct) { + uint8_t *token = coap_next_token(); + + internal_req->last_id = coap_next_id(); + internal_req->request_tkl = COAP_TOKEN_MAX_LEN & 0xf; + memcpy(internal_req->request_token, token, internal_req->request_tkl); + } + + ret = coap_packet_init(&internal_req->request, client->send_buf, MAX_COAP_MSG_LEN, + 1, req->confirmable ? COAP_TYPE_CON : COAP_TYPE_NON_CON, + COAP_TOKEN_MAX_LEN, internal_req->request_token, req->method, + internal_req->last_id); if (ret < 0) { LOG_ERR("Failed to init CoAP message %d", ret); goto out; } - ret = coap_client_init_path_options(&client->request, req->path); + ret = coap_client_init_path_options(&internal_req->request, req->path); if (ret < 0) { LOG_ERR("Failed to parse path to options %d", ret); goto out; } - ret = coap_append_option_int(&client->request, COAP_OPTION_CONTENT_FORMAT, req->fmt); + /* Add content format option only if there is a payload */ + if (req->payload) { + ret = coap_append_option_int(&internal_req->request, + COAP_OPTION_CONTENT_FORMAT, req->fmt); - if (ret < 0) { - LOG_ERR("Failed to append content format option"); - goto out; + if (ret < 0) { + LOG_ERR("Failed to append content format option"); + goto out; + } } /* Blockwise receive ongoing, request next block. */ - if (client->recv_blk_ctx.current > 0) { - ret = coap_append_block2_option(&client->request, &client->recv_blk_ctx); + if (internal_req->recv_blk_ctx.current > 0) { + ret = coap_append_block2_option(&internal_req->request, + &internal_req->recv_blk_ctx); if (ret < 0) { LOG_ERR("Failed to append block 2 option"); @@ -232,7 +299,7 @@ static int coap_client_init_request(struct coap_client *client, /* Add extra options if any */ for (i = 0; i < req->num_options; i++) { - ret = coap_packet_append_option(&client->request, req->options[i].code, + ret = coap_packet_append_option(&internal_req->request, req->options[i].code, req->options[i].value, req->options[i].len); if (ret < 0) { @@ -246,15 +313,16 @@ static int coap_client_init_request(struct coap_client *client, uint16_t offset; /* Blockwise send ongoing, add block1 */ - if (client->send_blk_ctx.total_size > 0 || + if (internal_req->send_blk_ctx.total_size > 0 || (req->len > CONFIG_COAP_CLIENT_MESSAGE_SIZE)) { - if (client->send_blk_ctx.total_size == 0) { - coap_block_transfer_init(&client->send_blk_ctx, + if (internal_req->send_blk_ctx.total_size == 0) { + coap_block_transfer_init(&internal_req->send_blk_ctx, coap_client_default_block_size(), req->len); } - ret = coap_append_block1_option(&client->request, &client->send_blk_ctx); + ret = coap_append_block1_option(&internal_req->request, + &internal_req->send_blk_ctx); if (ret < 0) { LOG_ERR("Failed to append block1 option"); @@ -262,29 +330,29 @@ static int coap_client_init_request(struct coap_client *client, } } - ret = coap_packet_append_payload_marker(&client->request); + ret = coap_packet_append_payload_marker(&internal_req->request); if (ret < 0) { LOG_ERR("Failed to append payload marker to CoAP message"); goto out; } - if (client->send_blk_ctx.total_size > 0) { + if (internal_req->send_blk_ctx.total_size > 0) { uint16_t block_in_bytes = - coap_block_size_to_bytes(client->send_blk_ctx.block_size); + coap_block_size_to_bytes(internal_req->send_blk_ctx.block_size); - payload_len = client->send_blk_ctx.total_size - - client->send_blk_ctx.current; + payload_len = internal_req->send_blk_ctx.total_size - + internal_req->send_blk_ctx.current; if (payload_len > block_in_bytes) { payload_len = block_in_bytes; } - offset = client->send_blk_ctx.current; + offset = internal_req->send_blk_ctx.current; } else { payload_len = req->len; offset = 0; } - ret = coap_packet_append_payload(&client->request, req->payload + offset, + ret = coap_packet_append_payload(&internal_req->request, req->payload + offset, payload_len); if (ret < 0) { @@ -292,22 +360,22 @@ static int coap_client_init_request(struct coap_client *client, goto out; } - if (client->send_blk_ctx.total_size > 0) { - coap_next_block(&client->request, &client->send_blk_ctx); + if (internal_req->send_blk_ctx.total_size > 0) { + coap_next_block(&internal_req->request, &internal_req->send_blk_ctx); } } - client->request_tkl = coap_header_get_token(&client->request, client->request_token); out: return ret; } - int coap_client_req(struct coap_client *client, int sock, const struct sockaddr *addr, struct coap_client_request *req, int retries) { int ret; - if (client->coap_client_recv_active) { + struct coap_client_internal_request *internal_req = get_free_request(client); + + if (internal_req == NULL) { return -EAGAIN; } @@ -315,44 +383,75 @@ int coap_client_req(struct coap_client *client, int sock, const struct sockaddr return -EINVAL; } - if (addr != NULL) { - memcpy(&client->address, addr, sizeof(*addr)); - client->socklen = sizeof(client->address); - } else { - memset(&client->address, 0, sizeof(client->address)); - client->socklen = 0; + /* Don't allow changing to a different socket if there is already request ongoing. */ + if (client->fd != sock && has_ongoing_request(client)) { + return -EALREADY; } + /* Don't allow changing to a different address if there is already request ongoing. */ + if (addr != NULL) { + if (memcmp(&client->address, addr, sizeof(*addr)) != 0) { + if (has_ongoing_request(client)) { + LOG_WRN("Can't change to a different socket, request ongoing."); + return -EALREADY; + } + + memcpy(&client->address, addr, sizeof(*addr)); + client->socklen = sizeof(client->address); + } + } else { + if (client->socklen != 0) { + if (has_ongoing_request(client)) { + LOG_WRN("Can't change to a different socket, request ongoing."); + return -EALREADY; + } + + memset(&client->address, 0, sizeof(client->address)); + client->socklen = 0; + } + } + + reset_internal_request(internal_req); + if (retries == -1) { - client->retry_count = DEFAULT_RETRY_AMOUNT; + internal_req->retry_count = DEFAULT_RETRY_AMOUNT; } else { - client->retry_count = retries; + internal_req->retry_count = retries; } - ret = coap_client_init_request(client, req); + if (k_mutex_lock(&client->send_mutex, K_NO_WAIT)) { + return -EAGAIN; + } + + ret = coap_client_init_request(client, req, internal_req, false); if (ret < 0) { LOG_ERR("Failed to initialize coap request"); - return ret; - } - - ret = coap_client_schedule_poll(client, sock, req); - if (ret < 0) { - LOG_ERR("Failed to schedule polling"); + k_mutex_unlock(&client->send_mutex); goto out; } - ret = coap_pending_init(&client->pending, &client->request, &client->address, - client->retry_count); + ret = coap_client_schedule_poll(client, sock, req, internal_req); + if (ret < 0) { + LOG_ERR("Failed to schedule polling"); + k_mutex_unlock(&client->send_mutex); + goto out; + } + + ret = coap_pending_init(&internal_req->pending, &internal_req->request, &client->address, + internal_req->retry_count); if (ret < 0) { LOG_ERR("Failed to initialize pending struct"); + k_mutex_unlock(&client->send_mutex); goto out; } - coap_pending_cycle(&client->pending); + coap_pending_cycle(&internal_req->pending); - ret = send_request(sock, client->request.data, client->request.offset, 0, &client->address, - client->socklen); + ret = send_request(sock, internal_req->request.data, internal_req->request.offset, 0, + &client->address, client->socklen); + + k_mutex_unlock(&client->send_mutex); if (ret < 0) { LOG_ERR("Transmission failed: %d", errno); @@ -364,89 +463,144 @@ out: return ret; } -static int handle_poll(struct coap_client *client) +static void report_callback_error(struct coap_client_internal_request *internal_req, int error_code) +{ + if (internal_req->coap_request.cb) { + internal_req->coap_request.cb(error_code, 0, NULL, 0, true, + internal_req->coap_request.user_data); + } +} + +static bool timeout_expired(struct coap_client_internal_request *internal_req) +{ + return (internal_req->request_ongoing && + internal_req->pending.timeout <= k_uptime_get_32()); +} + +static int resend_request(struct coap_client *client, + struct coap_client_internal_request *internal_req) { int ret = 0; - while (1) { - struct zsock_pollfd fds; + if (internal_req->pending.timeout != 0 && coap_pending_cycle(&internal_req->pending)) { + LOG_ERR("Timeout in poll, retrying send"); - fds.fd = client->fd; - fds.events = ZSOCK_POLLIN; - fds.revents = 0; - /* rfc7252#section-5.2.2, use separate timeout value for a separate response */ - if (client->pending.timeout != 0) { - ret = zsock_poll(&fds, 1, client->pending.timeout); - } else { - ret = zsock_poll(&fds, 1, COAP_SEPARATE_TIMEOUT); + /* Reset send block context as it was updated in previous init from packet */ + if (internal_req->send_blk_ctx.total_size > 0) { + internal_req->send_blk_ctx.current = internal_req->offset; } - + k_mutex_lock(&client->send_mutex, K_FOREVER); + ret = coap_client_init_request(client, &internal_req->coap_request, + internal_req, true); if (ret < 0) { - LOG_ERR("Error in poll:%d", errno); - errno = 0; - return ret; - } else if (ret == 0) { - if (client->pending.timeout != 0 && coap_pending_cycle(&client->pending)) { - LOG_ERR("Timeout in poll, retrying send"); - ret = send_request(client->fd, client->request.data, - client->request.offset, 0, &client->address, - client->socklen); - if (ret < 0) { - LOG_ERR("Transmission failed: %d", errno); - ret = -errno; - break; - } - } else { - /* No more retries left, don't retry */ - LOG_ERR("Timeout in poll, no more retries"); - ret = -EFAULT; - break; - } + LOG_ERR("Error re-creating CoAP request"); } else { - if (fds.revents & ZSOCK_POLLERR) { - LOG_ERR("Error in poll"); - ret = -EIO; - break; + ret = send_request(client->fd, internal_req->request.data, + internal_req->request.offset, 0, &client->address, + client->socklen); + if (ret > 0) { + ret = 0; + } else { + LOG_ERR("Failed to resend request, %d", ret); } + } + k_mutex_unlock(&client->send_mutex); + } else { + LOG_ERR("Timeout in poll, no more retries left"); + ret = -ETIMEDOUT; + report_callback_error(internal_req, ret); + internal_req->request_ongoing = false; + } - if (fds.revents & ZSOCK_POLLHUP) { - LOG_ERR("Error in poll: POLLHUP"); - ret = -ECONNRESET; - break; + return ret; +} + +static int coap_client_resend_handler(void) +{ + int ret = 0; + + for (int i = 0; i < num_clients; i++) { + for (int j = 0; j < CONFIG_COAP_CLIENT_MAX_REQUESTS; j++) { + if (timeout_expired(&clients[i]->requests[j])) { + ret = resend_request(clients[i], &clients[i]->requests[j]); } - - if (fds.revents & ZSOCK_POLLNVAL) { - LOG_ERR("Error in poll: POLLNVAL - fd not open"); - ret = -EINVAL; - break; - } - - if (!(fds.revents & ZSOCK_POLLIN)) { - LOG_ERR("Unknown poll error"); - ret = -EINVAL; - break; - } - - ret = 0; - break; } } return ret; } -static bool token_compare(struct coap_client *client, const struct coap_packet *resp) +static int handle_poll(void) +{ + int ret = 0; + + while (1) { + struct zsock_pollfd fds[CONFIG_COAP_CLIENT_MAX_INSTANCES] = {0}; + int nfds = 0; + + /* Use periodic timeouts */ + for (int i = 0; i < num_clients; i++) { + fds[i].fd = clients[i]->fd; + fds[i].events = ZSOCK_POLLIN; + fds[i].revents = 0; + nfds++; + } + + ret = zsock_poll(fds, nfds, COAP_PERIODIC_TIMEOUT); + + if (ret < 0) { + LOG_ERR("Error in poll:%d", errno); + errno = 0; + return ret; + } else if (ret == 0) { + /* Resend all the expired pending messages */ + ret = coap_client_resend_handler(); + + if (ret < 0) { + LOG_ERR("Error resending request: %d", ret); + } + + if (!has_ongoing_requests()) { + return ret; + } + + } else { + for (int i = 0; i < nfds; i++) { + if (fds[i].revents & ZSOCK_POLLERR) { + LOG_ERR("Error in poll for socket %d", fds[i].fd); + } + if (fds[i].revents & ZSOCK_POLLHUP) { + LOG_ERR("Error in poll: POLLHUP for socket %d", fds[i].fd); + } + if (fds[i].revents & ZSOCK_POLLNVAL) { + LOG_ERR("Error in poll: POLLNVAL - fd %d not open", + fds[i].fd); + } + if (fds[i].revents & ZSOCK_POLLIN) { + clients[i]->response_ready = true; + } + } + + return 0; + } + } + + return ret; +} + +static bool token_compare(struct coap_client_internal_request *internal_req, + const struct coap_packet *resp) { uint8_t response_token[COAP_TOKEN_MAX_LEN]; uint8_t response_tkl; response_tkl = coap_header_get_token(resp, response_token); - if (client->request_tkl != response_tkl) { + if (internal_req->request_tkl != response_tkl) { return false; } - return memcmp(&client->request_token, &response_token, response_tkl) == 0; + return memcmp(&internal_req->request_token, &response_token, response_tkl) == 0; } static int recv_response(struct coap_client *client, struct coap_packet *response) @@ -477,28 +631,19 @@ static int recv_response(struct coap_client *client, struct coap_packet *respons return ret; } -static void report_callback_error(struct coap_client *client, int error_code) -{ - if (client->coap_request->cb) { - client->coap_request->cb(error_code, 0, NULL, 0, true, - client->coap_request->user_data); - } -} - static int send_ack(struct coap_client *client, const struct coap_packet *req, uint8_t response_code) { int ret; + struct coap_packet ack; - ret = coap_ack_init(&client->request, req, client->send_buf, MAX_COAP_MSG_LEN, - response_code); + ret = coap_ack_init(&ack, req, client->send_buf, MAX_COAP_MSG_LEN, response_code); if (ret < 0) { LOG_ERR("Failed to initialize CoAP ACK-message"); return ret; } - ret = send_request(client->fd, client->request.data, client->request.offset, 0, - &client->address, client->socklen); + ret = send_request(client->fd, ack.data, ack.offset, 0, &client->address, client->socklen); if (ret < 0) { LOG_ERR("Error sending a CoAP ACK-message"); return ret; @@ -514,10 +659,11 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req, uint16_t id; uint8_t token[COAP_TOKEN_MAX_LEN]; uint8_t tkl; + struct coap_packet reset; id = coap_header_get_id(req); tkl = response_code ? coap_header_get_token(req, token) : 0; - ret = coap_packet_init(&client->request, client->send_buf, MAX_COAP_MSG_LEN, COAP_VERSION, + ret = coap_packet_init(&reset, client->send_buf, MAX_COAP_MSG_LEN, COAP_VERSION, COAP_TYPE_RESET, tkl, token, response_code, id); if (ret < 0) { @@ -525,7 +671,7 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req, return ret; } - ret = send_request(client->fd, client->request.data, client->request.offset, 0, + ret = send_request(client->fd, reset.data, reset.offset, 0, &client->address, client->socklen); if (ret < 0) { LOG_ERR("Error sending CoAP reset message"); @@ -535,6 +681,43 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req, return 0; } +struct coap_client_internal_request *get_request_with_id(struct coap_client *client, + uint16_t message_id) +{ + for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) { + if (client->requests[i].request_ongoing == true && + client->requests[i].pending.id == message_id) { + return &client->requests[i]; + } + } + + return NULL; +} + +struct coap_client_internal_request *get_request_with_token(struct coap_client *client, + const struct coap_packet *resp) +{ + + uint8_t response_token[COAP_TOKEN_MAX_LEN]; + uint8_t response_tkl; + + response_tkl = coap_header_get_token(resp, response_token); + + for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) { + if (client->requests[i].request_ongoing) { + if (client->requests[i].request_tkl != response_tkl) { + continue; + } + if (memcmp(&client->requests[i].request_token, &response_token, + response_tkl) == 0) { + return &client->requests[i]; + } + } + } + + return NULL; +} + static int handle_response(struct coap_client *client, const struct coap_packet *response) { int ret = 0; @@ -543,6 +726,7 @@ static int handle_response(struct coap_client *client, const struct coap_packet int block_num; bool blockwise_transfer = false; bool last_block = false; + struct coap_client_internal_request *internal_req; /* Handle different types, ACK might be separate or piggybacked * CON and NCON contains a separate response, CON needs an empty response @@ -552,13 +736,14 @@ static int handle_response(struct coap_client *client, const struct coap_packet */ response_type = coap_header_get_type(response); + internal_req = get_request_with_id(client, coap_header_get_id(response)); /* Reset and Ack need to match the message ID with request */ if ((response_type == COAP_TYPE_ACK || response_type == COAP_TYPE_RESET) && - coap_header_get_id(response) != client->pending.id) { + internal_req == NULL) { LOG_ERR("Unexpected ACK or Reset"); return -EFAULT; } else if (response_type == COAP_TYPE_RESET) { - coap_pending_clear(&client->pending); + coap_pending_clear(&internal_req->pending); } /* CON, NON_CON and piggybacked ACK need to match the token with original request */ @@ -566,16 +751,24 @@ static int handle_response(struct coap_client *client, const struct coap_packet uint8_t response_code = coap_header_get_code(response); const uint8_t *payload = coap_packet_get_payload(response, &payload_len); - /* Separate response */ + /* Separate response coming */ if (payload_len == 0 && response_type == COAP_TYPE_ACK && response_code == COAP_CODE_EMPTY) { - /* Clear the pending, poll uses now the separate timeout for the response. */ - coap_pending_clear(&client->pending); + internal_req->pending.t0 = k_uptime_get_32(); + internal_req->pending.timeout = internal_req->pending.t0 + COAP_SEPARATE_TIMEOUT; + internal_req->pending.retries = 0; return 1; } - /* Check for tokens */ - if (!token_compare(client, response)) { + /* Check for tokens + * Separate response doesn't match with message ID, + * check if there is a separate request waiting with matching token + */ + if (internal_req == NULL) { + internal_req = get_request_with_token(client, response); + } + + if (internal_req == NULL || !token_compare(internal_req, response)) { LOG_ERR("Not matching tokens, respond with reset"); ret = send_reset(client, response, COAP_RESPONSE_CODE_NOT_FOUND); return 1; @@ -590,8 +783,8 @@ static int handle_response(struct coap_client *client, const struct coap_packet } } - if (client->pending.timeout != 0) { - coap_pending_clear(&client->pending); + if (internal_req->pending.timeout != 0) { + coap_pending_clear(&internal_req->pending); } /* Check if block2 exists */ @@ -602,26 +795,27 @@ static int handle_response(struct coap_client *client, const struct coap_packet block_num = GET_BLOCK_NUM(block_option); if (block_num == 0) { - coap_block_transfer_init(&client->recv_blk_ctx, + coap_block_transfer_init(&internal_req->recv_blk_ctx, coap_client_default_block_size(), 0); - client->offset = 0; + internal_req->offset = 0; } - ret = coap_update_from_block(response, &client->recv_blk_ctx); + ret = coap_update_from_block(response, &internal_req->recv_blk_ctx); if (ret < 0) { LOG_ERR("Error updating block context"); } - coap_next_block(response, &client->recv_blk_ctx); + coap_next_block(response, &internal_req->recv_blk_ctx); } else { - client->offset = 0; + internal_req->offset = 0; last_block = true; } /* Check if this was a response to last blockwise send */ - if (client->send_blk_ctx.total_size > 0) { + if (internal_req->send_blk_ctx.total_size > 0) { blockwise_transfer = true; - if (client->send_blk_ctx.total_size == client->send_blk_ctx.current) { + internal_req->offset = internal_req->send_blk_ctx.current; + if (internal_req->send_blk_ctx.total_size == internal_req->send_blk_ctx.current) { last_block = true; } else { last_block = false; @@ -629,40 +823,43 @@ static int handle_response(struct coap_client *client, const struct coap_packet } /* Call user callback */ - if (client->coap_request->cb) { - client->coap_request->cb(response_code, client->offset, payload, payload_len, - last_block, client->coap_request->user_data); + if (internal_req->coap_request.cb) { + internal_req->coap_request.cb(response_code, internal_req->offset, payload, + payload_len, last_block, + internal_req->coap_request.user_data); /* Update the offset for next callback in a blockwise transfer */ if (blockwise_transfer) { - client->offset += payload_len; + internal_req->offset += payload_len; } } /* If this wasn't last block, send the next request */ if (blockwise_transfer && !last_block) { - ret = coap_client_init_request(client, client->coap_request); + k_mutex_lock(&client->send_mutex, K_FOREVER); + ret = coap_client_init_request(client, &internal_req->coap_request, internal_req, + false); if (ret < 0) { LOG_ERR("Error creating a CoAP request"); + k_mutex_unlock(&client->send_mutex); goto fail; } - if (client->pending.timeout != 0) { - LOG_ERR("Previous pending hasn't arrived"); - goto fail; - } - - ret = coap_pending_init(&client->pending, &client->request, &client->address, - client->retry_count); + ret = coap_pending_init(&internal_req->pending, &internal_req->request, + &client->address, internal_req->retry_count); if (ret < 0) { LOG_ERR("Error creating pending"); + k_mutex_unlock(&client->send_mutex); goto fail; } - coap_pending_cycle(&client->pending); + coap_pending_cycle(&internal_req->pending); + + ret = send_request(client->fd, internal_req->request.data, + internal_req->request.offset, 0, &client->address, + client->socklen); + k_mutex_unlock(&client->send_mutex); - ret = send_request(client->fd, client->request.data, client->request.offset, 0, - &client->address, client->socklen); if (ret < 0) { LOG_ERR("Error sending a CoAP request"); goto fail; @@ -671,51 +868,52 @@ static int handle_response(struct coap_client *client, const struct coap_packet } } fail: + client->response_ready = false; + internal_req->request_ongoing = false; return ret; } void coap_client_recv(void *coap_cl, void *a, void *b) { int ret; - struct coap_client *const client = coap_cl; - reset_block_contexts(client); - k_sem_take(&client->coap_client_recv_sem, K_FOREVER); + k_sem_take(&coap_client_recv_sem, K_FOREVER); while (true) { - struct coap_packet response; - - atomic_set(&client->coap_client_recv_active, 1); - ret = handle_poll(client); + atomic_set(&coap_client_recv_active, 1); + ret = handle_poll(); if (ret < 0) { - /* Error in polling, clear pending. */ + /* Error in polling */ LOG_ERR("Error in poll"); - coap_pending_clear(&client->pending); - report_callback_error(client, ret); goto idle; } - ret = recv_response(client, &response); - if (ret < 0) { - LOG_ERR("Error receiving response"); - report_callback_error(client, ret); - goto idle; + for (int i = 0; i < num_clients; i++) { + if (clients[i]->response_ready) { + struct coap_packet response; + + ret = recv_response(clients[i], &response); + if (ret < 0) { + LOG_ERR("Error receiving response"); + clients[i]->response_ready = false; + continue; + } + + ret = handle_response(clients[i], &response); + if (ret < 0) { + LOG_ERR("Error handling respnse"); + } + + clients[i]->response_ready = false; + } } - ret = handle_response(client, &response); - if (ret < 0) { - LOG_ERR("Error handling respnse"); - report_callback_error(client, ret); - goto idle; - } - - /* There are more messages coming for the original request */ - if (ret > 0) { + /* There are more messages coming */ + if (has_ongoing_requests()) { continue; } else { idle: - reset_block_contexts(client); - atomic_set(&client->coap_client_recv_active, 0); - k_sem_take(&client->coap_client_recv_sem, K_FOREVER); + atomic_set(&coap_client_recv_active, 0); + k_sem_take(&coap_client_recv_sem, K_FOREVER); } } } @@ -726,22 +924,19 @@ int coap_client_init(struct coap_client *client, const char *info) return -EINVAL; } - client->fd = -1; - k_sem_init(&client->coap_client_recv_sem, 0, 1); - - client->tid = - k_thread_create(&client->thread, client->coap_thread_stack, - K_THREAD_STACK_SIZEOF(client->coap_thread_stack), - coap_client_recv, client, NULL, NULL, - CONFIG_COAP_CLIENT_THREAD_PRIORITY, 0, K_NO_WAIT); - - if (IS_ENABLED(CONFIG_THREAD_NAME)) { - if (info != NULL) { - k_thread_name_set(client->tid, info); - } else { - k_thread_name_set(client->tid, "coap_client"); - } + if (num_clients >= CONFIG_COAP_CLIENT_MAX_INSTANCES) { + return -ENOSPC; } + k_mutex_init(&client->send_mutex); + + clients[num_clients] = client; + num_clients++; + return 0; } + + +K_THREAD_DEFINE(coap_client_recv_thread, CONFIG_COAP_CLIENT_STACK_SIZE, + coap_client_recv, NULL, NULL, NULL, + CONFIG_COAP_CLIENT_THREAD_PRIORITY, 0, 0);