net: lib: coap: CoAP client, multiple request handling
Use only single thread for handling polling of the sockets. Each client will have only 1 active socket which to poll. Each client can have multiple simultaneous requests ongoing. The client only has one buffer for receiving and one buffer for sending. Therefore the messages are reformed when resending. Signed-off-by: Jarno Lämsä <jarno.lamsa@nordicsemi.no>
This commit is contained in:
parent
bbec614b78
commit
419fa3ca6a
@ -77,27 +77,29 @@ struct coap_client_option {
|
||||
};
|
||||
|
||||
/** @cond INTERNAL_HIDDEN */
|
||||
struct coap_client_internal_request {
|
||||
uint8_t request_token[COAP_TOKEN_MAX_LEN];
|
||||
uint32_t offset;
|
||||
uint32_t last_id;
|
||||
uint8_t request_tkl;
|
||||
uint8_t retry_count;
|
||||
bool request_ongoing;
|
||||
struct coap_block_context recv_blk_ctx;
|
||||
struct coap_block_context send_blk_ctx;
|
||||
struct coap_pending pending;
|
||||
struct coap_client_request coap_request;
|
||||
struct coap_packet request;
|
||||
};
|
||||
|
||||
struct coap_client {
|
||||
int fd;
|
||||
struct sockaddr address;
|
||||
socklen_t socklen;
|
||||
bool response_ready;
|
||||
struct k_mutex send_mutex;
|
||||
uint8_t send_buf[MAX_COAP_MSG_LEN];
|
||||
uint8_t recv_buf[MAX_COAP_MSG_LEN];
|
||||
uint8_t request_token[COAP_TOKEN_MAX_LEN];
|
||||
int request_tkl;
|
||||
int offset;
|
||||
int retry_count;
|
||||
struct coap_block_context recv_blk_ctx;
|
||||
struct coap_block_context send_blk_ctx;
|
||||
struct coap_pending pending;
|
||||
struct coap_client_request *coap_request;
|
||||
struct coap_packet request;
|
||||
k_tid_t tid;
|
||||
struct k_thread thread;
|
||||
struct k_sem coap_client_recv_sem;
|
||||
atomic_t coap_client_recv_active;
|
||||
|
||||
K_THREAD_STACK_MEMBER(coap_thread_stack, CONFIG_COAP_CLIENT_STACK_SIZE);
|
||||
struct coap_client_internal_request requests[CONFIG_COAP_CLIENT_MAX_REQUESTS];
|
||||
};
|
||||
/** @endcond */
|
||||
|
||||
@ -123,7 +125,7 @@ int coap_client_init(struct coap_client *client, const char *info);
|
||||
*
|
||||
* @param client Client instance.
|
||||
* @param sock Open socket file descriptor.
|
||||
* @param addr the destination address of the request.
|
||||
* @param addr the destination address of the request, NULL if socket is already connected.
|
||||
* @param req CoAP request structure
|
||||
* @param retries How many times to retry or -1 to use default.
|
||||
* @return zero when operation started successfully or negative error code otherwise.
|
||||
|
||||
@ -133,6 +133,18 @@ config COAP_CLIENT_STACK_SIZE
|
||||
int "Stack size of the CoAP client thread"
|
||||
default 1024
|
||||
|
||||
config COAP_CLIENT_MAX_INSTANCES
|
||||
int "Maximum number of CoAP clients"
|
||||
default 2
|
||||
help
|
||||
Maximum number of CoAP clients
|
||||
|
||||
config COAP_CLIENT_MAX_REQUESTS
|
||||
int "Maximum number of simultaneous requests per client"
|
||||
default 2
|
||||
help
|
||||
Maximum number of CoAP requests a single client can handle at a time
|
||||
|
||||
endif # COAP_CLIENT
|
||||
|
||||
module = COAP
|
||||
|
||||
@ -18,21 +18,15 @@ LOG_MODULE_DECLARE(net_coap, CONFIG_COAP_LOG_LEVEL);
|
||||
#define COAP_PATH_ELEM_QUERY '?'
|
||||
#define COAP_PATH_ELEM_AMP '&'
|
||||
#define COAP_SEPARATE_TIMEOUT 6000
|
||||
#define COAP_PERIODIC_TIMEOUT 500
|
||||
#define DEFAULT_RETRY_AMOUNT 5
|
||||
#define BLOCK1_OPTION_SIZE 4
|
||||
#define PAYLOAD_MARKER_SIZE 1
|
||||
|
||||
static int coap_client_schedule_poll(struct coap_client *client, int sock,
|
||||
struct coap_client_request *req)
|
||||
{
|
||||
client->fd = sock;
|
||||
client->coap_request = req;
|
||||
|
||||
k_sem_give(&client->coap_client_recv_sem);
|
||||
atomic_set(&client->coap_client_recv_active, 1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
static struct coap_client *clients[CONFIG_COAP_CLIENT_MAX_INSTANCES];
|
||||
static int num_clients;
|
||||
static K_SEM_DEFINE(coap_client_recv_sem, 0, 1);
|
||||
static atomic_t coap_client_recv_active;
|
||||
|
||||
static int send_request(int sock, const void *buf, size_t len, int flags,
|
||||
const struct sockaddr *dest_addr, socklen_t addrlen)
|
||||
@ -54,15 +48,72 @@ static int receive(int sock, void *buf, size_t max_len, int flags,
|
||||
}
|
||||
}
|
||||
|
||||
static void reset_block_contexts(struct coap_client *client)
|
||||
static void reset_block_contexts(struct coap_client_internal_request *request)
|
||||
{
|
||||
client->recv_blk_ctx.block_size = 0;
|
||||
client->recv_blk_ctx.total_size = 0;
|
||||
client->recv_blk_ctx.current = 0;
|
||||
request->recv_blk_ctx.block_size = 0;
|
||||
request->recv_blk_ctx.total_size = 0;
|
||||
request->recv_blk_ctx.current = 0;
|
||||
|
||||
client->send_blk_ctx.block_size = 0;
|
||||
client->send_blk_ctx.total_size = 0;
|
||||
client->send_blk_ctx.current = 0;
|
||||
request->send_blk_ctx.block_size = 0;
|
||||
request->send_blk_ctx.total_size = 0;
|
||||
request->send_blk_ctx.current = 0;
|
||||
}
|
||||
|
||||
static void reset_internal_request(struct coap_client_internal_request *request)
|
||||
{
|
||||
request->offset = 0;
|
||||
request->last_id = 0;
|
||||
request->retry_count = 0;
|
||||
reset_block_contexts(request);
|
||||
}
|
||||
|
||||
static int coap_client_schedule_poll(struct coap_client *client, int sock,
|
||||
struct coap_client_request *req,
|
||||
struct coap_client_internal_request *internal_req)
|
||||
{
|
||||
client->fd = sock;
|
||||
memcpy(&internal_req->coap_request, req, sizeof(struct coap_client_request));
|
||||
internal_req->request_ongoing = true;
|
||||
|
||||
if (!coap_client_recv_active) {
|
||||
k_sem_give(&coap_client_recv_sem);
|
||||
}
|
||||
atomic_set(&coap_client_recv_active, 1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool has_ongoing_request(struct coap_client *client)
|
||||
{
|
||||
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
|
||||
if (client->requests[i].request_ongoing == true) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
struct coap_client_internal_request *get_free_request(struct coap_client *client)
|
||||
{
|
||||
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
|
||||
if (client->requests[i].request_ongoing == false) {
|
||||
return &client->requests[i];
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool has_ongoing_requests(void)
|
||||
{
|
||||
bool has_requests = false;
|
||||
|
||||
for (int i = 0; i < num_clients; i++) {
|
||||
has_requests |= has_ongoing_request(clients[i]);
|
||||
}
|
||||
|
||||
return has_requests;
|
||||
}
|
||||
|
||||
static int coap_client_init_path_options(struct coap_packet *pckt, const char *path)
|
||||
@ -190,39 +241,55 @@ static enum coap_block_size coap_client_default_block_size(void)
|
||||
}
|
||||
|
||||
static int coap_client_init_request(struct coap_client *client,
|
||||
struct coap_client_request *req)
|
||||
struct coap_client_request *req,
|
||||
struct coap_client_internal_request *internal_req,
|
||||
bool reconstruct)
|
||||
{
|
||||
int ret = 0;
|
||||
int i;
|
||||
|
||||
memset(client->send_buf, 0, sizeof(client->send_buf));
|
||||
ret = coap_packet_init(&client->request, client->send_buf, MAX_COAP_MSG_LEN, 1,
|
||||
req->confirmable ? COAP_TYPE_CON : COAP_TYPE_NON_CON,
|
||||
COAP_TOKEN_MAX_LEN, coap_next_token(), req->method,
|
||||
coap_next_id());
|
||||
|
||||
if (!reconstruct) {
|
||||
uint8_t *token = coap_next_token();
|
||||
|
||||
internal_req->last_id = coap_next_id();
|
||||
internal_req->request_tkl = COAP_TOKEN_MAX_LEN & 0xf;
|
||||
memcpy(internal_req->request_token, token, internal_req->request_tkl);
|
||||
}
|
||||
|
||||
ret = coap_packet_init(&internal_req->request, client->send_buf, MAX_COAP_MSG_LEN,
|
||||
1, req->confirmable ? COAP_TYPE_CON : COAP_TYPE_NON_CON,
|
||||
COAP_TOKEN_MAX_LEN, internal_req->request_token, req->method,
|
||||
internal_req->last_id);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to init CoAP message %d", ret);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = coap_client_init_path_options(&client->request, req->path);
|
||||
ret = coap_client_init_path_options(&internal_req->request, req->path);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to parse path to options %d", ret);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = coap_append_option_int(&client->request, COAP_OPTION_CONTENT_FORMAT, req->fmt);
|
||||
/* Add content format option only if there is a payload */
|
||||
if (req->payload) {
|
||||
ret = coap_append_option_int(&internal_req->request,
|
||||
COAP_OPTION_CONTENT_FORMAT, req->fmt);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to append content format option");
|
||||
goto out;
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to append content format option");
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
/* Blockwise receive ongoing, request next block. */
|
||||
if (client->recv_blk_ctx.current > 0) {
|
||||
ret = coap_append_block2_option(&client->request, &client->recv_blk_ctx);
|
||||
if (internal_req->recv_blk_ctx.current > 0) {
|
||||
ret = coap_append_block2_option(&internal_req->request,
|
||||
&internal_req->recv_blk_ctx);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to append block 2 option");
|
||||
@ -232,7 +299,7 @@ static int coap_client_init_request(struct coap_client *client,
|
||||
|
||||
/* Add extra options if any */
|
||||
for (i = 0; i < req->num_options; i++) {
|
||||
ret = coap_packet_append_option(&client->request, req->options[i].code,
|
||||
ret = coap_packet_append_option(&internal_req->request, req->options[i].code,
|
||||
req->options[i].value, req->options[i].len);
|
||||
|
||||
if (ret < 0) {
|
||||
@ -246,15 +313,16 @@ static int coap_client_init_request(struct coap_client *client,
|
||||
uint16_t offset;
|
||||
|
||||
/* Blockwise send ongoing, add block1 */
|
||||
if (client->send_blk_ctx.total_size > 0 ||
|
||||
if (internal_req->send_blk_ctx.total_size > 0 ||
|
||||
(req->len > CONFIG_COAP_CLIENT_MESSAGE_SIZE)) {
|
||||
|
||||
if (client->send_blk_ctx.total_size == 0) {
|
||||
coap_block_transfer_init(&client->send_blk_ctx,
|
||||
if (internal_req->send_blk_ctx.total_size == 0) {
|
||||
coap_block_transfer_init(&internal_req->send_blk_ctx,
|
||||
coap_client_default_block_size(),
|
||||
req->len);
|
||||
}
|
||||
ret = coap_append_block1_option(&client->request, &client->send_blk_ctx);
|
||||
ret = coap_append_block1_option(&internal_req->request,
|
||||
&internal_req->send_blk_ctx);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to append block1 option");
|
||||
@ -262,29 +330,29 @@ static int coap_client_init_request(struct coap_client *client,
|
||||
}
|
||||
}
|
||||
|
||||
ret = coap_packet_append_payload_marker(&client->request);
|
||||
ret = coap_packet_append_payload_marker(&internal_req->request);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to append payload marker to CoAP message");
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (client->send_blk_ctx.total_size > 0) {
|
||||
if (internal_req->send_blk_ctx.total_size > 0) {
|
||||
uint16_t block_in_bytes =
|
||||
coap_block_size_to_bytes(client->send_blk_ctx.block_size);
|
||||
coap_block_size_to_bytes(internal_req->send_blk_ctx.block_size);
|
||||
|
||||
payload_len = client->send_blk_ctx.total_size -
|
||||
client->send_blk_ctx.current;
|
||||
payload_len = internal_req->send_blk_ctx.total_size -
|
||||
internal_req->send_blk_ctx.current;
|
||||
if (payload_len > block_in_bytes) {
|
||||
payload_len = block_in_bytes;
|
||||
}
|
||||
offset = client->send_blk_ctx.current;
|
||||
offset = internal_req->send_blk_ctx.current;
|
||||
} else {
|
||||
payload_len = req->len;
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
ret = coap_packet_append_payload(&client->request, req->payload + offset,
|
||||
ret = coap_packet_append_payload(&internal_req->request, req->payload + offset,
|
||||
payload_len);
|
||||
|
||||
if (ret < 0) {
|
||||
@ -292,22 +360,22 @@ static int coap_client_init_request(struct coap_client *client,
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (client->send_blk_ctx.total_size > 0) {
|
||||
coap_next_block(&client->request, &client->send_blk_ctx);
|
||||
if (internal_req->send_blk_ctx.total_size > 0) {
|
||||
coap_next_block(&internal_req->request, &internal_req->send_blk_ctx);
|
||||
}
|
||||
}
|
||||
client->request_tkl = coap_header_get_token(&client->request, client->request_token);
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int coap_client_req(struct coap_client *client, int sock, const struct sockaddr *addr,
|
||||
struct coap_client_request *req, int retries)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (client->coap_client_recv_active) {
|
||||
struct coap_client_internal_request *internal_req = get_free_request(client);
|
||||
|
||||
if (internal_req == NULL) {
|
||||
return -EAGAIN;
|
||||
}
|
||||
|
||||
@ -315,44 +383,75 @@ int coap_client_req(struct coap_client *client, int sock, const struct sockaddr
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
if (addr != NULL) {
|
||||
memcpy(&client->address, addr, sizeof(*addr));
|
||||
client->socklen = sizeof(client->address);
|
||||
} else {
|
||||
memset(&client->address, 0, sizeof(client->address));
|
||||
client->socklen = 0;
|
||||
/* Don't allow changing to a different socket if there is already request ongoing. */
|
||||
if (client->fd != sock && has_ongoing_request(client)) {
|
||||
return -EALREADY;
|
||||
}
|
||||
|
||||
/* Don't allow changing to a different address if there is already request ongoing. */
|
||||
if (addr != NULL) {
|
||||
if (memcmp(&client->address, addr, sizeof(*addr)) != 0) {
|
||||
if (has_ongoing_request(client)) {
|
||||
LOG_WRN("Can't change to a different socket, request ongoing.");
|
||||
return -EALREADY;
|
||||
}
|
||||
|
||||
memcpy(&client->address, addr, sizeof(*addr));
|
||||
client->socklen = sizeof(client->address);
|
||||
}
|
||||
} else {
|
||||
if (client->socklen != 0) {
|
||||
if (has_ongoing_request(client)) {
|
||||
LOG_WRN("Can't change to a different socket, request ongoing.");
|
||||
return -EALREADY;
|
||||
}
|
||||
|
||||
memset(&client->address, 0, sizeof(client->address));
|
||||
client->socklen = 0;
|
||||
}
|
||||
}
|
||||
|
||||
reset_internal_request(internal_req);
|
||||
|
||||
if (retries == -1) {
|
||||
client->retry_count = DEFAULT_RETRY_AMOUNT;
|
||||
internal_req->retry_count = DEFAULT_RETRY_AMOUNT;
|
||||
} else {
|
||||
client->retry_count = retries;
|
||||
internal_req->retry_count = retries;
|
||||
}
|
||||
|
||||
ret = coap_client_init_request(client, req);
|
||||
if (k_mutex_lock(&client->send_mutex, K_NO_WAIT)) {
|
||||
return -EAGAIN;
|
||||
}
|
||||
|
||||
ret = coap_client_init_request(client, req, internal_req, false);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to initialize coap request");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = coap_client_schedule_poll(client, sock, req);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to schedule polling");
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = coap_pending_init(&client->pending, &client->request, &client->address,
|
||||
client->retry_count);
|
||||
ret = coap_client_schedule_poll(client, sock, req, internal_req);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to schedule polling");
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = coap_pending_init(&internal_req->pending, &internal_req->request, &client->address,
|
||||
internal_req->retry_count);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to initialize pending struct");
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
goto out;
|
||||
}
|
||||
|
||||
coap_pending_cycle(&client->pending);
|
||||
coap_pending_cycle(&internal_req->pending);
|
||||
|
||||
ret = send_request(sock, client->request.data, client->request.offset, 0, &client->address,
|
||||
client->socklen);
|
||||
ret = send_request(sock, internal_req->request.data, internal_req->request.offset, 0,
|
||||
&client->address, client->socklen);
|
||||
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Transmission failed: %d", errno);
|
||||
@ -364,89 +463,144 @@ out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int handle_poll(struct coap_client *client)
|
||||
static void report_callback_error(struct coap_client_internal_request *internal_req, int error_code)
|
||||
{
|
||||
if (internal_req->coap_request.cb) {
|
||||
internal_req->coap_request.cb(error_code, 0, NULL, 0, true,
|
||||
internal_req->coap_request.user_data);
|
||||
}
|
||||
}
|
||||
|
||||
static bool timeout_expired(struct coap_client_internal_request *internal_req)
|
||||
{
|
||||
return (internal_req->request_ongoing &&
|
||||
internal_req->pending.timeout <= k_uptime_get_32());
|
||||
}
|
||||
|
||||
static int resend_request(struct coap_client *client,
|
||||
struct coap_client_internal_request *internal_req)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
while (1) {
|
||||
struct zsock_pollfd fds;
|
||||
if (internal_req->pending.timeout != 0 && coap_pending_cycle(&internal_req->pending)) {
|
||||
LOG_ERR("Timeout in poll, retrying send");
|
||||
|
||||
fds.fd = client->fd;
|
||||
fds.events = ZSOCK_POLLIN;
|
||||
fds.revents = 0;
|
||||
/* rfc7252#section-5.2.2, use separate timeout value for a separate response */
|
||||
if (client->pending.timeout != 0) {
|
||||
ret = zsock_poll(&fds, 1, client->pending.timeout);
|
||||
} else {
|
||||
ret = zsock_poll(&fds, 1, COAP_SEPARATE_TIMEOUT);
|
||||
/* Reset send block context as it was updated in previous init from packet */
|
||||
if (internal_req->send_blk_ctx.total_size > 0) {
|
||||
internal_req->send_blk_ctx.current = internal_req->offset;
|
||||
}
|
||||
|
||||
k_mutex_lock(&client->send_mutex, K_FOREVER);
|
||||
ret = coap_client_init_request(client, &internal_req->coap_request,
|
||||
internal_req, true);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error in poll:%d", errno);
|
||||
errno = 0;
|
||||
return ret;
|
||||
} else if (ret == 0) {
|
||||
if (client->pending.timeout != 0 && coap_pending_cycle(&client->pending)) {
|
||||
LOG_ERR("Timeout in poll, retrying send");
|
||||
ret = send_request(client->fd, client->request.data,
|
||||
client->request.offset, 0, &client->address,
|
||||
client->socklen);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Transmission failed: %d", errno);
|
||||
ret = -errno;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
/* No more retries left, don't retry */
|
||||
LOG_ERR("Timeout in poll, no more retries");
|
||||
ret = -EFAULT;
|
||||
break;
|
||||
}
|
||||
LOG_ERR("Error re-creating CoAP request");
|
||||
} else {
|
||||
if (fds.revents & ZSOCK_POLLERR) {
|
||||
LOG_ERR("Error in poll");
|
||||
ret = -EIO;
|
||||
break;
|
||||
ret = send_request(client->fd, internal_req->request.data,
|
||||
internal_req->request.offset, 0, &client->address,
|
||||
client->socklen);
|
||||
if (ret > 0) {
|
||||
ret = 0;
|
||||
} else {
|
||||
LOG_ERR("Failed to resend request, %d", ret);
|
||||
}
|
||||
}
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
} else {
|
||||
LOG_ERR("Timeout in poll, no more retries left");
|
||||
ret = -ETIMEDOUT;
|
||||
report_callback_error(internal_req, ret);
|
||||
internal_req->request_ongoing = false;
|
||||
}
|
||||
|
||||
if (fds.revents & ZSOCK_POLLHUP) {
|
||||
LOG_ERR("Error in poll: POLLHUP");
|
||||
ret = -ECONNRESET;
|
||||
break;
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int coap_client_resend_handler(void)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
for (int i = 0; i < num_clients; i++) {
|
||||
for (int j = 0; j < CONFIG_COAP_CLIENT_MAX_REQUESTS; j++) {
|
||||
if (timeout_expired(&clients[i]->requests[j])) {
|
||||
ret = resend_request(clients[i], &clients[i]->requests[j]);
|
||||
}
|
||||
|
||||
if (fds.revents & ZSOCK_POLLNVAL) {
|
||||
LOG_ERR("Error in poll: POLLNVAL - fd not open");
|
||||
ret = -EINVAL;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!(fds.revents & ZSOCK_POLLIN)) {
|
||||
LOG_ERR("Unknown poll error");
|
||||
ret = -EINVAL;
|
||||
break;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool token_compare(struct coap_client *client, const struct coap_packet *resp)
|
||||
static int handle_poll(void)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
while (1) {
|
||||
struct zsock_pollfd fds[CONFIG_COAP_CLIENT_MAX_INSTANCES] = {0};
|
||||
int nfds = 0;
|
||||
|
||||
/* Use periodic timeouts */
|
||||
for (int i = 0; i < num_clients; i++) {
|
||||
fds[i].fd = clients[i]->fd;
|
||||
fds[i].events = ZSOCK_POLLIN;
|
||||
fds[i].revents = 0;
|
||||
nfds++;
|
||||
}
|
||||
|
||||
ret = zsock_poll(fds, nfds, COAP_PERIODIC_TIMEOUT);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error in poll:%d", errno);
|
||||
errno = 0;
|
||||
return ret;
|
||||
} else if (ret == 0) {
|
||||
/* Resend all the expired pending messages */
|
||||
ret = coap_client_resend_handler();
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error resending request: %d", ret);
|
||||
}
|
||||
|
||||
if (!has_ongoing_requests()) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
} else {
|
||||
for (int i = 0; i < nfds; i++) {
|
||||
if (fds[i].revents & ZSOCK_POLLERR) {
|
||||
LOG_ERR("Error in poll for socket %d", fds[i].fd);
|
||||
}
|
||||
if (fds[i].revents & ZSOCK_POLLHUP) {
|
||||
LOG_ERR("Error in poll: POLLHUP for socket %d", fds[i].fd);
|
||||
}
|
||||
if (fds[i].revents & ZSOCK_POLLNVAL) {
|
||||
LOG_ERR("Error in poll: POLLNVAL - fd %d not open",
|
||||
fds[i].fd);
|
||||
}
|
||||
if (fds[i].revents & ZSOCK_POLLIN) {
|
||||
clients[i]->response_ready = true;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool token_compare(struct coap_client_internal_request *internal_req,
|
||||
const struct coap_packet *resp)
|
||||
{
|
||||
uint8_t response_token[COAP_TOKEN_MAX_LEN];
|
||||
uint8_t response_tkl;
|
||||
|
||||
response_tkl = coap_header_get_token(resp, response_token);
|
||||
|
||||
if (client->request_tkl != response_tkl) {
|
||||
if (internal_req->request_tkl != response_tkl) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return memcmp(&client->request_token, &response_token, response_tkl) == 0;
|
||||
return memcmp(&internal_req->request_token, &response_token, response_tkl) == 0;
|
||||
}
|
||||
|
||||
static int recv_response(struct coap_client *client, struct coap_packet *response)
|
||||
@ -477,28 +631,19 @@ static int recv_response(struct coap_client *client, struct coap_packet *respons
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void report_callback_error(struct coap_client *client, int error_code)
|
||||
{
|
||||
if (client->coap_request->cb) {
|
||||
client->coap_request->cb(error_code, 0, NULL, 0, true,
|
||||
client->coap_request->user_data);
|
||||
}
|
||||
}
|
||||
|
||||
static int send_ack(struct coap_client *client, const struct coap_packet *req,
|
||||
uint8_t response_code)
|
||||
{
|
||||
int ret;
|
||||
struct coap_packet ack;
|
||||
|
||||
ret = coap_ack_init(&client->request, req, client->send_buf, MAX_COAP_MSG_LEN,
|
||||
response_code);
|
||||
ret = coap_ack_init(&ack, req, client->send_buf, MAX_COAP_MSG_LEN, response_code);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Failed to initialize CoAP ACK-message");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = send_request(client->fd, client->request.data, client->request.offset, 0,
|
||||
&client->address, client->socklen);
|
||||
ret = send_request(client->fd, ack.data, ack.offset, 0, &client->address, client->socklen);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error sending a CoAP ACK-message");
|
||||
return ret;
|
||||
@ -514,10 +659,11 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req,
|
||||
uint16_t id;
|
||||
uint8_t token[COAP_TOKEN_MAX_LEN];
|
||||
uint8_t tkl;
|
||||
struct coap_packet reset;
|
||||
|
||||
id = coap_header_get_id(req);
|
||||
tkl = response_code ? coap_header_get_token(req, token) : 0;
|
||||
ret = coap_packet_init(&client->request, client->send_buf, MAX_COAP_MSG_LEN, COAP_VERSION,
|
||||
ret = coap_packet_init(&reset, client->send_buf, MAX_COAP_MSG_LEN, COAP_VERSION,
|
||||
COAP_TYPE_RESET, tkl, token, response_code, id);
|
||||
|
||||
if (ret < 0) {
|
||||
@ -525,7 +671,7 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req,
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = send_request(client->fd, client->request.data, client->request.offset, 0,
|
||||
ret = send_request(client->fd, reset.data, reset.offset, 0,
|
||||
&client->address, client->socklen);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error sending CoAP reset message");
|
||||
@ -535,6 +681,43 @@ static int send_reset(struct coap_client *client, const struct coap_packet *req,
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct coap_client_internal_request *get_request_with_id(struct coap_client *client,
|
||||
uint16_t message_id)
|
||||
{
|
||||
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
|
||||
if (client->requests[i].request_ongoing == true &&
|
||||
client->requests[i].pending.id == message_id) {
|
||||
return &client->requests[i];
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct coap_client_internal_request *get_request_with_token(struct coap_client *client,
|
||||
const struct coap_packet *resp)
|
||||
{
|
||||
|
||||
uint8_t response_token[COAP_TOKEN_MAX_LEN];
|
||||
uint8_t response_tkl;
|
||||
|
||||
response_tkl = coap_header_get_token(resp, response_token);
|
||||
|
||||
for (int i = 0; i < CONFIG_COAP_CLIENT_MAX_REQUESTS; i++) {
|
||||
if (client->requests[i].request_ongoing) {
|
||||
if (client->requests[i].request_tkl != response_tkl) {
|
||||
continue;
|
||||
}
|
||||
if (memcmp(&client->requests[i].request_token, &response_token,
|
||||
response_tkl) == 0) {
|
||||
return &client->requests[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int handle_response(struct coap_client *client, const struct coap_packet *response)
|
||||
{
|
||||
int ret = 0;
|
||||
@ -543,6 +726,7 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
int block_num;
|
||||
bool blockwise_transfer = false;
|
||||
bool last_block = false;
|
||||
struct coap_client_internal_request *internal_req;
|
||||
|
||||
/* Handle different types, ACK might be separate or piggybacked
|
||||
* CON and NCON contains a separate response, CON needs an empty response
|
||||
@ -552,13 +736,14 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
*/
|
||||
response_type = coap_header_get_type(response);
|
||||
|
||||
internal_req = get_request_with_id(client, coap_header_get_id(response));
|
||||
/* Reset and Ack need to match the message ID with request */
|
||||
if ((response_type == COAP_TYPE_ACK || response_type == COAP_TYPE_RESET) &&
|
||||
coap_header_get_id(response) != client->pending.id) {
|
||||
internal_req == NULL) {
|
||||
LOG_ERR("Unexpected ACK or Reset");
|
||||
return -EFAULT;
|
||||
} else if (response_type == COAP_TYPE_RESET) {
|
||||
coap_pending_clear(&client->pending);
|
||||
coap_pending_clear(&internal_req->pending);
|
||||
}
|
||||
|
||||
/* CON, NON_CON and piggybacked ACK need to match the token with original request */
|
||||
@ -566,16 +751,24 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
uint8_t response_code = coap_header_get_code(response);
|
||||
const uint8_t *payload = coap_packet_get_payload(response, &payload_len);
|
||||
|
||||
/* Separate response */
|
||||
/* Separate response coming */
|
||||
if (payload_len == 0 && response_type == COAP_TYPE_ACK &&
|
||||
response_code == COAP_CODE_EMPTY) {
|
||||
/* Clear the pending, poll uses now the separate timeout for the response. */
|
||||
coap_pending_clear(&client->pending);
|
||||
internal_req->pending.t0 = k_uptime_get_32();
|
||||
internal_req->pending.timeout = internal_req->pending.t0 + COAP_SEPARATE_TIMEOUT;
|
||||
internal_req->pending.retries = 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Check for tokens */
|
||||
if (!token_compare(client, response)) {
|
||||
/* Check for tokens
|
||||
* Separate response doesn't match with message ID,
|
||||
* check if there is a separate request waiting with matching token
|
||||
*/
|
||||
if (internal_req == NULL) {
|
||||
internal_req = get_request_with_token(client, response);
|
||||
}
|
||||
|
||||
if (internal_req == NULL || !token_compare(internal_req, response)) {
|
||||
LOG_ERR("Not matching tokens, respond with reset");
|
||||
ret = send_reset(client, response, COAP_RESPONSE_CODE_NOT_FOUND);
|
||||
return 1;
|
||||
@ -590,8 +783,8 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
}
|
||||
}
|
||||
|
||||
if (client->pending.timeout != 0) {
|
||||
coap_pending_clear(&client->pending);
|
||||
if (internal_req->pending.timeout != 0) {
|
||||
coap_pending_clear(&internal_req->pending);
|
||||
}
|
||||
|
||||
/* Check if block2 exists */
|
||||
@ -602,26 +795,27 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
block_num = GET_BLOCK_NUM(block_option);
|
||||
|
||||
if (block_num == 0) {
|
||||
coap_block_transfer_init(&client->recv_blk_ctx,
|
||||
coap_block_transfer_init(&internal_req->recv_blk_ctx,
|
||||
coap_client_default_block_size(),
|
||||
0);
|
||||
client->offset = 0;
|
||||
internal_req->offset = 0;
|
||||
}
|
||||
|
||||
ret = coap_update_from_block(response, &client->recv_blk_ctx);
|
||||
ret = coap_update_from_block(response, &internal_req->recv_blk_ctx);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error updating block context");
|
||||
}
|
||||
coap_next_block(response, &client->recv_blk_ctx);
|
||||
coap_next_block(response, &internal_req->recv_blk_ctx);
|
||||
} else {
|
||||
client->offset = 0;
|
||||
internal_req->offset = 0;
|
||||
last_block = true;
|
||||
}
|
||||
|
||||
/* Check if this was a response to last blockwise send */
|
||||
if (client->send_blk_ctx.total_size > 0) {
|
||||
if (internal_req->send_blk_ctx.total_size > 0) {
|
||||
blockwise_transfer = true;
|
||||
if (client->send_blk_ctx.total_size == client->send_blk_ctx.current) {
|
||||
internal_req->offset = internal_req->send_blk_ctx.current;
|
||||
if (internal_req->send_blk_ctx.total_size == internal_req->send_blk_ctx.current) {
|
||||
last_block = true;
|
||||
} else {
|
||||
last_block = false;
|
||||
@ -629,40 +823,43 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
}
|
||||
|
||||
/* Call user callback */
|
||||
if (client->coap_request->cb) {
|
||||
client->coap_request->cb(response_code, client->offset, payload, payload_len,
|
||||
last_block, client->coap_request->user_data);
|
||||
if (internal_req->coap_request.cb) {
|
||||
internal_req->coap_request.cb(response_code, internal_req->offset, payload,
|
||||
payload_len, last_block,
|
||||
internal_req->coap_request.user_data);
|
||||
|
||||
/* Update the offset for next callback in a blockwise transfer */
|
||||
if (blockwise_transfer) {
|
||||
client->offset += payload_len;
|
||||
internal_req->offset += payload_len;
|
||||
}
|
||||
}
|
||||
|
||||
/* If this wasn't last block, send the next request */
|
||||
if (blockwise_transfer && !last_block) {
|
||||
ret = coap_client_init_request(client, client->coap_request);
|
||||
k_mutex_lock(&client->send_mutex, K_FOREVER);
|
||||
ret = coap_client_init_request(client, &internal_req->coap_request, internal_req,
|
||||
false);
|
||||
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error creating a CoAP request");
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (client->pending.timeout != 0) {
|
||||
LOG_ERR("Previous pending hasn't arrived");
|
||||
goto fail;
|
||||
}
|
||||
|
||||
ret = coap_pending_init(&client->pending, &client->request, &client->address,
|
||||
client->retry_count);
|
||||
ret = coap_pending_init(&internal_req->pending, &internal_req->request,
|
||||
&client->address, internal_req->retry_count);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error creating pending");
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
goto fail;
|
||||
}
|
||||
coap_pending_cycle(&client->pending);
|
||||
coap_pending_cycle(&internal_req->pending);
|
||||
|
||||
ret = send_request(client->fd, internal_req->request.data,
|
||||
internal_req->request.offset, 0, &client->address,
|
||||
client->socklen);
|
||||
k_mutex_unlock(&client->send_mutex);
|
||||
|
||||
ret = send_request(client->fd, client->request.data, client->request.offset, 0,
|
||||
&client->address, client->socklen);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error sending a CoAP request");
|
||||
goto fail;
|
||||
@ -671,51 +868,52 @@ static int handle_response(struct coap_client *client, const struct coap_packet
|
||||
}
|
||||
}
|
||||
fail:
|
||||
client->response_ready = false;
|
||||
internal_req->request_ongoing = false;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void coap_client_recv(void *coap_cl, void *a, void *b)
|
||||
{
|
||||
int ret;
|
||||
struct coap_client *const client = coap_cl;
|
||||
|
||||
reset_block_contexts(client);
|
||||
k_sem_take(&client->coap_client_recv_sem, K_FOREVER);
|
||||
k_sem_take(&coap_client_recv_sem, K_FOREVER);
|
||||
while (true) {
|
||||
struct coap_packet response;
|
||||
|
||||
atomic_set(&client->coap_client_recv_active, 1);
|
||||
ret = handle_poll(client);
|
||||
atomic_set(&coap_client_recv_active, 1);
|
||||
ret = handle_poll();
|
||||
if (ret < 0) {
|
||||
/* Error in polling, clear pending. */
|
||||
/* Error in polling */
|
||||
LOG_ERR("Error in poll");
|
||||
coap_pending_clear(&client->pending);
|
||||
report_callback_error(client, ret);
|
||||
goto idle;
|
||||
}
|
||||
|
||||
ret = recv_response(client, &response);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error receiving response");
|
||||
report_callback_error(client, ret);
|
||||
goto idle;
|
||||
for (int i = 0; i < num_clients; i++) {
|
||||
if (clients[i]->response_ready) {
|
||||
struct coap_packet response;
|
||||
|
||||
ret = recv_response(clients[i], &response);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error receiving response");
|
||||
clients[i]->response_ready = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = handle_response(clients[i], &response);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error handling respnse");
|
||||
}
|
||||
|
||||
clients[i]->response_ready = false;
|
||||
}
|
||||
}
|
||||
|
||||
ret = handle_response(client, &response);
|
||||
if (ret < 0) {
|
||||
LOG_ERR("Error handling respnse");
|
||||
report_callback_error(client, ret);
|
||||
goto idle;
|
||||
}
|
||||
|
||||
/* There are more messages coming for the original request */
|
||||
if (ret > 0) {
|
||||
/* There are more messages coming */
|
||||
if (has_ongoing_requests()) {
|
||||
continue;
|
||||
} else {
|
||||
idle:
|
||||
reset_block_contexts(client);
|
||||
atomic_set(&client->coap_client_recv_active, 0);
|
||||
k_sem_take(&client->coap_client_recv_sem, K_FOREVER);
|
||||
atomic_set(&coap_client_recv_active, 0);
|
||||
k_sem_take(&coap_client_recv_sem, K_FOREVER);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -726,22 +924,19 @@ int coap_client_init(struct coap_client *client, const char *info)
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
client->fd = -1;
|
||||
k_sem_init(&client->coap_client_recv_sem, 0, 1);
|
||||
|
||||
client->tid =
|
||||
k_thread_create(&client->thread, client->coap_thread_stack,
|
||||
K_THREAD_STACK_SIZEOF(client->coap_thread_stack),
|
||||
coap_client_recv, client, NULL, NULL,
|
||||
CONFIG_COAP_CLIENT_THREAD_PRIORITY, 0, K_NO_WAIT);
|
||||
|
||||
if (IS_ENABLED(CONFIG_THREAD_NAME)) {
|
||||
if (info != NULL) {
|
||||
k_thread_name_set(client->tid, info);
|
||||
} else {
|
||||
k_thread_name_set(client->tid, "coap_client");
|
||||
}
|
||||
if (num_clients >= CONFIG_COAP_CLIENT_MAX_INSTANCES) {
|
||||
return -ENOSPC;
|
||||
}
|
||||
|
||||
k_mutex_init(&client->send_mutex);
|
||||
|
||||
clients[num_clients] = client;
|
||||
num_clients++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
K_THREAD_DEFINE(coap_client_recv_thread, CONFIG_COAP_CLIENT_STACK_SIZE,
|
||||
coap_client_recv, NULL, NULL, NULL,
|
||||
CONFIG_COAP_CLIENT_THREAD_PRIORITY, 0, 0);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user