Add support for DICONNECT message specified in MQTT 5.0. As with MQTT 5.0, the disconnect can now also be initiated by the broker, it was needed to add decoder support for the message. Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
845 lines
23 KiB
C
845 lines
23 KiB
C
/*
|
|
* Copyright (c) 2022 G-Technologies Sdn. Bhd.
|
|
*
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
#include <zephyr/kernel.h>
|
|
#include <zephyr/shell/shell_mqtt.h>
|
|
#include <zephyr/init.h>
|
|
#include <zephyr/logging/log.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <zephyr/drivers/hwinfo.h>
|
|
|
|
SHELL_MQTT_DEFINE(shell_transport_mqtt);
|
|
SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
|
|
CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
|
|
CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
|
|
|
|
LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
|
|
|
|
#define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
|
|
#define CONNECT_TIMEOUT_MS 2000
|
|
#define LISTEN_TIMEOUT_MS 500
|
|
#define MQTT_SEND_DELAY_MS K_MSEC(100)
|
|
#define PROCESS_INTERVAL K_SECONDS(2)
|
|
#define SHELL_MQTT_WORKQ_STACK_SIZE 2048
|
|
|
|
#ifdef CONFIG_SHELL_MQTT_SERVER_USERNAME
|
|
#define MQTT_USERNAME CONFIG_SHELL_MQTT_SERVER_USERNAME
|
|
#else
|
|
#define MQTT_USERNAME NULL
|
|
#endif /* CONFIG_SHELL_MQTT_SERVER_USERNAME */
|
|
|
|
#ifdef CONFIG_SHELL_MQTT_SERVER_PASSWORD
|
|
#define MQTT_PASSWORD CONFIG_SHELL_MQTT_SERVER_PASSWORD
|
|
#else
|
|
#define MQTT_PASSWORD NULL
|
|
#endif /*SHELL_MQTT_SERVER_PASSWORD */
|
|
|
|
struct shell_mqtt *sh_mqtt;
|
|
K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
|
|
|
|
static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
|
|
|
|
static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
|
|
{
|
|
return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
|
|
}
|
|
|
|
static inline int sh_mqtt_work_submit(struct k_work *work)
|
|
{
|
|
return k_work_submit_to_queue(&sh_mqtt->workq, work);
|
|
}
|
|
|
|
/* Lock the context of the shell mqtt */
|
|
static inline int sh_mqtt_context_lock(k_timeout_t timeout)
|
|
{
|
|
return k_mutex_lock(&sh_mqtt->lock, timeout);
|
|
}
|
|
|
|
/* Unlock the context of the shell mqtt */
|
|
static inline void sh_mqtt_context_unlock(void)
|
|
{
|
|
(void)k_mutex_unlock(&sh_mqtt->lock);
|
|
}
|
|
|
|
static void sh_mqtt_rx_rb_flush(void)
|
|
{
|
|
uint8_t c;
|
|
uint32_t size = ring_buf_size_get(&sh_mqtt->rx_rb);
|
|
|
|
while (size > 0) {
|
|
size = ring_buf_get(&sh_mqtt->rx_rb, &c, 1U);
|
|
}
|
|
}
|
|
|
|
bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
|
|
{
|
|
uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
|
|
ssize_t length;
|
|
|
|
length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
|
|
if (length <= 0) {
|
|
return false;
|
|
}
|
|
|
|
(void)memset(id, 0, id_max_len);
|
|
length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len);
|
|
|
|
return length > 0;
|
|
}
|
|
|
|
static void prepare_fds(void)
|
|
{
|
|
if (sh_mqtt->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
|
|
sh_mqtt->fds[0].fd = sh_mqtt->mqtt_cli.transport.tcp.sock;
|
|
}
|
|
|
|
sh_mqtt->fds[0].events = ZSOCK_POLLIN;
|
|
sh_mqtt->nfds = 1;
|
|
}
|
|
|
|
static void clear_fds(void)
|
|
{
|
|
sh_mqtt->nfds = 0;
|
|
}
|
|
|
|
/*
|
|
* Upon successful completion, poll() shall return a non-negative value. A positive value indicates
|
|
* the total number of pollfd structures that have selected events (that is, those for which the
|
|
* revents member is non-zero). A value of 0 indicates that the call timed out and no file
|
|
* descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate
|
|
* the error.
|
|
*/
|
|
static int wait(int timeout)
|
|
{
|
|
int rc = 0;
|
|
|
|
if (sh_mqtt->nfds > 0) {
|
|
rc = zsock_poll(sh_mqtt->fds, sh_mqtt->nfds, timeout);
|
|
if (rc < 0) {
|
|
LOG_ERR("poll error: %d", errno);
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
/* Query IP address for the broker URL */
|
|
static int get_mqtt_broker_addrinfo(void)
|
|
{
|
|
int rc;
|
|
struct zsock_addrinfo hints = { .ai_family = AF_INET,
|
|
.ai_socktype = SOCK_STREAM,
|
|
.ai_protocol = 0 };
|
|
|
|
if (sh_mqtt->haddr != NULL) {
|
|
zsock_freeaddrinfo(sh_mqtt->haddr);
|
|
}
|
|
|
|
rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
|
|
STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh_mqtt->haddr);
|
|
if (rc == 0) {
|
|
LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
|
|
CONFIG_SHELL_MQTT_SERVER_PORT);
|
|
|
|
return 0;
|
|
}
|
|
|
|
LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
|
|
CONFIG_SHELL_MQTT_SERVER_PORT);
|
|
|
|
return rc;
|
|
}
|
|
|
|
/* Close MQTT connection properly and cleanup socket */
|
|
static void sh_mqtt_close_and_cleanup(void)
|
|
{
|
|
/* Initialize to negative value so that the mqtt_abort case can run */
|
|
int rc = -1;
|
|
|
|
/* If both network & mqtt connected, mqtt_disconnect will send a
|
|
* disconnection packet to the broker, it will invoke
|
|
* mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
|
|
*/
|
|
if ((sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
|
|
(sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
|
|
rc = mqtt_disconnect(&sh_mqtt->mqtt_cli, NULL);
|
|
}
|
|
|
|
/* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
|
|
if (rc < 0) {
|
|
/* mqtt_abort doesn't send disconnection packet to the broker, but it
|
|
* makes sure that the MQTT connection is aborted locally and will
|
|
* always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
|
|
*/
|
|
(void)mqtt_abort(&sh_mqtt->mqtt_cli);
|
|
}
|
|
|
|
/* Cleanup socket */
|
|
clear_fds();
|
|
}
|
|
|
|
static void broker_init(void)
|
|
{
|
|
struct sockaddr_in *broker4 = (struct sockaddr_in *)&sh_mqtt->broker;
|
|
|
|
broker4->sin_family = AF_INET;
|
|
broker4->sin_port = htons(CONFIG_SHELL_MQTT_SERVER_PORT);
|
|
|
|
net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh_mqtt->haddr->ai_addr)->sin_addr);
|
|
}
|
|
|
|
static void client_init(void)
|
|
{
|
|
static struct mqtt_utf8 password;
|
|
static struct mqtt_utf8 username;
|
|
|
|
password.utf8 = (uint8_t *)MQTT_PASSWORD;
|
|
password.size = strlen(MQTT_PASSWORD);
|
|
username.utf8 = (uint8_t *)MQTT_USERNAME;
|
|
username.size = strlen(MQTT_USERNAME);
|
|
|
|
mqtt_client_init(&sh_mqtt->mqtt_cli);
|
|
|
|
/* MQTT client configuration */
|
|
sh_mqtt->mqtt_cli.broker = &sh_mqtt->broker;
|
|
sh_mqtt->mqtt_cli.evt_cb = mqtt_evt_handler;
|
|
sh_mqtt->mqtt_cli.client_id.utf8 = (uint8_t *)sh_mqtt->device_id;
|
|
sh_mqtt->mqtt_cli.client_id.size = strlen(sh_mqtt->device_id);
|
|
sh_mqtt->mqtt_cli.password = &password;
|
|
sh_mqtt->mqtt_cli.user_name = &username;
|
|
sh_mqtt->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
|
|
|
|
/* MQTT buffers configuration */
|
|
sh_mqtt->mqtt_cli.rx_buf = sh_mqtt->buf.rx;
|
|
sh_mqtt->mqtt_cli.rx_buf_size = sizeof(sh_mqtt->buf.rx);
|
|
sh_mqtt->mqtt_cli.tx_buf = sh_mqtt->buf.tx;
|
|
sh_mqtt->mqtt_cli.tx_buf_size = sizeof(sh_mqtt->buf.tx);
|
|
|
|
/* MQTT transport configuration */
|
|
sh_mqtt->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
|
|
}
|
|
|
|
/* Work routine to process MQTT packet and keep alive MQTT connection */
|
|
static void sh_mqtt_process_handler(struct k_work *work)
|
|
{
|
|
ARG_UNUSED(work);
|
|
int rc;
|
|
int64_t remaining = LISTEN_TIMEOUT_MS;
|
|
int64_t start_time = k_uptime_get();
|
|
|
|
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
|
LOG_DBG("%s_work while %s", "process", "network disconnected");
|
|
return;
|
|
}
|
|
|
|
/* If context can't be locked, that means net conn cb locked it */
|
|
if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
|
|
/* In that case we should simply return */
|
|
LOG_DBG("%s_work unable to lock context", "process");
|
|
return;
|
|
}
|
|
|
|
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
|
LOG_DBG("MQTT %s", "not connected");
|
|
goto process_error;
|
|
}
|
|
|
|
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
|
LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
|
|
goto process_error;
|
|
}
|
|
|
|
LOG_DBG("MQTT %s", "Processing");
|
|
/* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
|
|
while ((remaining > 0) && (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
|
|
(sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
|
|
(sh_mqtt->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
|
|
LOG_DBG("Listening to socket");
|
|
rc = wait(remaining);
|
|
if (rc > 0) {
|
|
LOG_DBG("Process socket for MQTT packet");
|
|
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
|
if (rc != 0) {
|
|
LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
|
|
goto process_error;
|
|
}
|
|
} else if (rc < 0) {
|
|
goto process_error;
|
|
}
|
|
|
|
LOG_DBG("MQTT %s", "Keepalive");
|
|
rc = mqtt_live(&sh_mqtt->mqtt_cli);
|
|
if ((rc != 0) && (rc != -EAGAIN)) {
|
|
LOG_ERR("%s error: %d", "mqtt_live", rc);
|
|
goto process_error;
|
|
}
|
|
|
|
remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
|
|
}
|
|
|
|
/* Reschedule the process work */
|
|
LOG_DBG("Scheduling %s work", "process");
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, K_SECONDS(2));
|
|
sh_mqtt_context_unlock();
|
|
return;
|
|
|
|
process_error:
|
|
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
|
|
sh_mqtt_close_and_cleanup();
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
|
sh_mqtt_context_unlock();
|
|
}
|
|
|
|
static void sh_mqtt_subscribe_handler(struct k_work *work)
|
|
{
|
|
ARG_UNUSED(work);
|
|
/* Subscribe config information */
|
|
struct mqtt_topic subs_topic = { .topic = { .utf8 = sh_mqtt->sub_topic,
|
|
.size = strlen(sh_mqtt->sub_topic) },
|
|
.qos = MQTT_QOS_1_AT_LEAST_ONCE };
|
|
const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
|
|
.list_count = 1U,
|
|
.message_id = 1U };
|
|
int rc;
|
|
|
|
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
|
LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
|
|
return;
|
|
}
|
|
|
|
/* If context can't be locked, that means net conn cb locked it */
|
|
if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
|
|
/* In that case we should simply return */
|
|
LOG_DBG("%s_work unable to lock context", "subscribe");
|
|
return;
|
|
}
|
|
|
|
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
|
LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
|
|
goto subscribe_error;
|
|
}
|
|
|
|
rc = mqtt_subscribe(&sh_mqtt->mqtt_cli, &subs_list);
|
|
if (rc == 0) {
|
|
/* Wait for mqtt's connack */
|
|
LOG_DBG("Listening to socket");
|
|
rc = wait(CONNECT_TIMEOUT_MS);
|
|
if (rc > 0) {
|
|
LOG_DBG("Process socket for MQTT packet");
|
|
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
|
if (rc != 0) {
|
|
LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
|
|
goto subscribe_error;
|
|
}
|
|
} else if (rc < 0) {
|
|
goto subscribe_error;
|
|
}
|
|
|
|
/* No suback, fail */
|
|
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
|
goto subscribe_error;
|
|
}
|
|
|
|
LOG_DBG("Scheduling MQTT process work");
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, PROCESS_INTERVAL);
|
|
sh_mqtt_context_unlock();
|
|
|
|
LOG_INF("Logs will be published to: %s", sh_mqtt->pub_topic);
|
|
LOG_INF("Subscribing shell cmds from: %s", sh_mqtt->sub_topic);
|
|
|
|
return;
|
|
}
|
|
|
|
subscribe_error:
|
|
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
|
|
sh_mqtt_close_and_cleanup();
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
|
sh_mqtt_context_unlock();
|
|
}
|
|
|
|
/* Work routine to connect to MQTT */
|
|
static void sh_mqtt_connect_handler(struct k_work *work)
|
|
{
|
|
ARG_UNUSED(work);
|
|
int rc;
|
|
|
|
if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
|
|
LOG_DBG("%s_work while %s", "connect", "network disconnected");
|
|
return;
|
|
}
|
|
|
|
/* If context can't be locked, that means net conn cb locked it */
|
|
if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
|
|
/* In that case we should simply return */
|
|
LOG_DBG("%s_work unable to lock context", "connect");
|
|
return;
|
|
}
|
|
|
|
if (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
|
|
__ASSERT(0, "MQTT shouldn't be already connected");
|
|
LOG_ERR("MQTT shouldn't be already connected");
|
|
goto connect_error;
|
|
}
|
|
|
|
/* Resolve the broker URL */
|
|
LOG_DBG("Resolving DNS");
|
|
rc = get_mqtt_broker_addrinfo();
|
|
if (rc != 0) {
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
|
sh_mqtt_context_unlock();
|
|
return;
|
|
}
|
|
|
|
LOG_DBG("Initializing MQTT client");
|
|
broker_init();
|
|
client_init();
|
|
|
|
/* Try to connect to mqtt */
|
|
LOG_DBG("Connecting to MQTT broker");
|
|
rc = mqtt_connect(&sh_mqtt->mqtt_cli);
|
|
if (rc != 0) {
|
|
LOG_ERR("%s error: %d", "mqtt_connect", rc);
|
|
goto connect_error;
|
|
}
|
|
|
|
/* Prepare port config */
|
|
LOG_DBG("Preparing socket");
|
|
prepare_fds();
|
|
|
|
/* Wait for mqtt's connack */
|
|
LOG_DBG("Listening to socket");
|
|
rc = wait(CONNECT_TIMEOUT_MS);
|
|
if (rc > 0) {
|
|
LOG_DBG("Process socket for MQTT packet");
|
|
rc = mqtt_input(&sh_mqtt->mqtt_cli);
|
|
if (rc != 0) {
|
|
LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
|
|
goto connect_error;
|
|
}
|
|
} else if (rc < 0) {
|
|
goto connect_error;
|
|
}
|
|
|
|
/* No connack, fail */
|
|
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
|
goto connect_error;
|
|
}
|
|
|
|
LOG_DBG("Scheduling %s work", "subscribe");
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->subscribe_dwork, K_SECONDS(2));
|
|
sh_mqtt_context_unlock();
|
|
return;
|
|
|
|
connect_error:
|
|
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
|
|
sh_mqtt_close_and_cleanup();
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
|
sh_mqtt_context_unlock();
|
|
}
|
|
|
|
static int sh_mqtt_publish(uint8_t *data, uint32_t len)
|
|
{
|
|
sh_mqtt->pub_data.message.payload.data = data;
|
|
sh_mqtt->pub_data.message.payload.len = len;
|
|
sh_mqtt->pub_data.message_id++;
|
|
|
|
return mqtt_publish(&sh_mqtt->mqtt_cli, &sh_mqtt->pub_data);
|
|
}
|
|
|
|
static int sh_mqtt_publish_tx_buf(bool is_work)
|
|
{
|
|
int rc;
|
|
|
|
rc = sh_mqtt_publish(&sh_mqtt->tx_buf.buf[0], sh_mqtt->tx_buf.len);
|
|
memset(&sh_mqtt->tx_buf, 0, sizeof(sh_mqtt->tx_buf));
|
|
if (rc != 0) {
|
|
LOG_ERR("MQTT publish error: %d", rc);
|
|
return rc;
|
|
}
|
|
|
|
/* Arbitrary delay to not kill the session */
|
|
if (!is_work) {
|
|
k_sleep(MQTT_SEND_DELAY_MS);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
static void sh_mqtt_publish_handler(struct k_work *work)
|
|
{
|
|
ARG_UNUSED(work);
|
|
int rc;
|
|
|
|
(void)sh_mqtt_context_lock(K_FOREVER);
|
|
|
|
rc = sh_mqtt_publish_tx_buf(true);
|
|
if (rc != 0) {
|
|
LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
|
|
sh_mqtt_close_and_cleanup();
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
|
|
}
|
|
|
|
sh_mqtt_context_unlock();
|
|
}
|
|
|
|
static void cancel_dworks_and_cleanup(void)
|
|
{
|
|
(void)k_work_cancel_delayable(&sh_mqtt->connect_dwork);
|
|
(void)k_work_cancel_delayable(&sh_mqtt->subscribe_dwork);
|
|
(void)k_work_cancel_delayable(&sh_mqtt->process_dwork);
|
|
(void)k_work_cancel_delayable(&sh_mqtt->publish_dwork);
|
|
sh_mqtt_close_and_cleanup();
|
|
}
|
|
|
|
static void net_disconnect_handler(struct k_work *work)
|
|
{
|
|
ARG_UNUSED(work);
|
|
|
|
LOG_WRN("Network %s", "disconnected");
|
|
sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
|
|
|
|
/* Stop all possible work */
|
|
(void)sh_mqtt_context_lock(K_FOREVER);
|
|
cancel_dworks_and_cleanup();
|
|
sh_mqtt_context_unlock();
|
|
/* If the transport was requested, the connect work will be rescheduled
|
|
* when internet is connected again
|
|
*/
|
|
}
|
|
|
|
/* Network connection event handler */
|
|
static void network_evt_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event,
|
|
struct net_if *iface)
|
|
{
|
|
if ((mgmt_event == NET_EVENT_L4_CONNECTED) &&
|
|
(sh_mqtt->network_state == SHELL_MQTT_NETWORK_DISCONNECTED)) {
|
|
LOG_WRN("Network %s", "connected");
|
|
sh_mqtt->network_state = SHELL_MQTT_NETWORK_CONNECTED;
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
|
|
} else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) &&
|
|
(sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED)) {
|
|
(void)sh_mqtt_work_submit(&sh_mqtt->net_disconnected_work);
|
|
}
|
|
}
|
|
|
|
static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
|
|
{
|
|
switch (evt->type) {
|
|
case MQTT_EVT_CONNACK:
|
|
if (evt->result != 0) {
|
|
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
|
LOG_ERR("MQTT %s %d", "connect failed", evt->result);
|
|
break;
|
|
}
|
|
|
|
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
|
|
LOG_WRN("MQTT %s", "client connected!");
|
|
|
|
break;
|
|
case MQTT_EVT_SUBACK:
|
|
if (evt->result != 0) {
|
|
LOG_ERR("MQTT subscribe: %s", "error");
|
|
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
|
break;
|
|
}
|
|
|
|
LOG_WRN("MQTT subscribe: %s", "ok");
|
|
sh_mqtt->subscribe_state = SHELL_MQTT_SUBSCRIBED;
|
|
break;
|
|
|
|
case MQTT_EVT_UNSUBACK:
|
|
LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
|
|
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
|
break;
|
|
|
|
case MQTT_EVT_DISCONNECT:
|
|
LOG_WRN("MQTT disconnected: %d", evt->result);
|
|
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
|
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
|
break;
|
|
|
|
case MQTT_EVT_PUBLISH: {
|
|
const struct mqtt_publish_param *pub = &evt->param.publish;
|
|
uint32_t payload_left;
|
|
size_t size;
|
|
int rc;
|
|
|
|
payload_left = pub->message.payload.len;
|
|
|
|
LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
|
|
LOG_DBG(" id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
|
|
LOG_DBG(" item: %s", pub->message.topic.topic.utf8);
|
|
|
|
/* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
|
|
if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
|
|
struct mqtt_puback_param puback = { .message_id = pub->message_id };
|
|
|
|
(void)mqtt_publish_qos1_ack(client, &puback);
|
|
}
|
|
|
|
while (payload_left > 0) {
|
|
/* Attempt to claim `payload_left` bytes of buffer in rb */
|
|
size = (size_t)ring_buf_put_claim(&sh_mqtt->rx_rb, &sh_mqtt->rx_rb_ptr,
|
|
payload_left);
|
|
/* Read `size` bytes of payload from mqtt */
|
|
rc = mqtt_read_publish_payload_blocking(client, sh_mqtt->rx_rb_ptr, size);
|
|
|
|
/* errno value, return */
|
|
if (rc < 0) {
|
|
(void)ring_buf_put_finish(&sh_mqtt->rx_rb, 0U);
|
|
sh_mqtt_rx_rb_flush();
|
|
return;
|
|
}
|
|
|
|
size = (size_t)rc;
|
|
/* Indicate that `size` bytes of payload has been written into rb */
|
|
(void)ring_buf_put_finish(&sh_mqtt->rx_rb, size);
|
|
/* Update `payload_left` */
|
|
payload_left -= size;
|
|
/* Tells the shell that we have new data for it */
|
|
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
|
/* Arbitrary sleep for the shell to do its thing */
|
|
(void)k_msleep(100);
|
|
}
|
|
|
|
/* Shell won't execute the cmds without \r\n */
|
|
while (true) {
|
|
/* Check if rb's free space is enough to fit in \r\n */
|
|
size = ring_buf_space_get(&sh_mqtt->rx_rb);
|
|
if (size >= sizeof("\r\n")) {
|
|
(void)ring_buf_put(&sh_mqtt->rx_rb, "\r\n", sizeof("\r\n"));
|
|
break;
|
|
}
|
|
/* Arbitrary sleep for the shell to do its thing */
|
|
(void)k_msleep(100);
|
|
}
|
|
|
|
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
|
break;
|
|
}
|
|
|
|
case MQTT_EVT_PUBACK:
|
|
if (evt->result != 0) {
|
|
LOG_ERR("MQTT PUBACK error %d", evt->result);
|
|
break;
|
|
}
|
|
|
|
LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
|
|
break;
|
|
|
|
case MQTT_EVT_PINGRESP:
|
|
LOG_DBG("PINGRESP packet");
|
|
break;
|
|
|
|
default:
|
|
LOG_DBG("MQTT event received %d", evt->type);
|
|
break;
|
|
}
|
|
}
|
|
|
|
static int init(const struct shell_transport *transport, const void *config,
|
|
shell_transport_handler_t evt_handler, void *context)
|
|
{
|
|
sh_mqtt = (struct shell_mqtt *)transport->ctx;
|
|
|
|
(void)memset(sh_mqtt, 0, sizeof(struct shell_mqtt));
|
|
|
|
(void)k_mutex_init(&sh_mqtt->lock);
|
|
|
|
if (!shell_mqtt_get_devid(sh_mqtt->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
|
|
LOG_ERR("Unable to get device identity, using dummy value");
|
|
(void)snprintf(sh_mqtt->device_id, sizeof("dummy"), "dummy");
|
|
}
|
|
|
|
LOG_DBG("Client ID is %s", sh_mqtt->device_id);
|
|
|
|
(void)snprintf(sh_mqtt->pub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_tx", sh_mqtt->device_id);
|
|
(void)snprintf(sh_mqtt->sub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_rx", sh_mqtt->device_id);
|
|
|
|
ring_buf_init(&sh_mqtt->rx_rb, RX_RB_SIZE, sh_mqtt->rx_rb_buf);
|
|
|
|
LOG_DBG("Initializing shell MQTT backend");
|
|
|
|
sh_mqtt->shell_handler = evt_handler;
|
|
sh_mqtt->shell_context = context;
|
|
|
|
sh_mqtt->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
|
|
sh_mqtt->pub_data.message.topic.topic.utf8 = (uint8_t *)sh_mqtt->pub_topic;
|
|
sh_mqtt->pub_data.message.topic.topic.size =
|
|
strlen(sh_mqtt->pub_data.message.topic.topic.utf8);
|
|
sh_mqtt->pub_data.dup_flag = 0U;
|
|
sh_mqtt->pub_data.retain_flag = 0U;
|
|
|
|
/* Initialize the work queue */
|
|
k_work_queue_init(&sh_mqtt->workq);
|
|
k_work_queue_start(&sh_mqtt->workq, sh_mqtt_workq_stack,
|
|
K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
|
|
(void)k_thread_name_set(&sh_mqtt->workq.thread, "sh_mqtt_workq");
|
|
k_work_init(&sh_mqtt->net_disconnected_work, net_disconnect_handler);
|
|
k_work_init_delayable(&sh_mqtt->connect_dwork, sh_mqtt_connect_handler);
|
|
k_work_init_delayable(&sh_mqtt->subscribe_dwork, sh_mqtt_subscribe_handler);
|
|
k_work_init_delayable(&sh_mqtt->process_dwork, sh_mqtt_process_handler);
|
|
k_work_init_delayable(&sh_mqtt->publish_dwork, sh_mqtt_publish_handler);
|
|
|
|
LOG_DBG("Initializing listener for network");
|
|
net_mgmt_init_event_callback(&sh_mqtt->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
|
|
|
|
sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
|
|
sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
|
|
sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int uninit(const struct shell_transport *transport)
|
|
{
|
|
ARG_UNUSED(transport);
|
|
|
|
/* Not initialized yet */
|
|
if (sh_mqtt == NULL) {
|
|
return -ENODEV;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int enable(const struct shell_transport *transport, bool blocking)
|
|
{
|
|
ARG_UNUSED(transport);
|
|
ARG_UNUSED(blocking);
|
|
|
|
/* Not initialized yet */
|
|
if (sh_mqtt == NULL) {
|
|
return -ENODEV;
|
|
}
|
|
|
|
/* Listen for network connection status */
|
|
net_mgmt_add_event_callback(&sh_mqtt->mgmt_cb);
|
|
conn_mgr_mon_resend_status();
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int write_data(const struct shell_transport *transport, const void *data, size_t length,
|
|
size_t *cnt)
|
|
{
|
|
ARG_UNUSED(transport);
|
|
int rc = 0;
|
|
struct k_work_sync ws;
|
|
size_t copy_len;
|
|
|
|
*cnt = 0;
|
|
|
|
/* Not initialized yet */
|
|
if (sh_mqtt == NULL) {
|
|
return -ENODEV;
|
|
}
|
|
|
|
/* Not connected to broker */
|
|
if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
|
|
goto out;
|
|
}
|
|
|
|
(void)k_work_cancel_delayable_sync(&sh_mqtt->publish_dwork, &ws);
|
|
|
|
do {
|
|
if ((sh_mqtt->tx_buf.len + length - *cnt) > TX_BUF_SIZE) {
|
|
copy_len = TX_BUF_SIZE - sh_mqtt->tx_buf.len;
|
|
} else {
|
|
copy_len = length - *cnt;
|
|
}
|
|
|
|
memcpy(sh_mqtt->tx_buf.buf + sh_mqtt->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
|
|
sh_mqtt->tx_buf.len += copy_len;
|
|
|
|
/* Send the data immediately if the buffer is full */
|
|
if (sh_mqtt->tx_buf.len == TX_BUF_SIZE) {
|
|
rc = sh_mqtt_publish_tx_buf(false);
|
|
if (rc != 0) {
|
|
sh_mqtt_close_and_cleanup();
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork,
|
|
K_SECONDS(2));
|
|
*cnt = length;
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
*cnt += copy_len;
|
|
} while (*cnt < length);
|
|
|
|
if (sh_mqtt->tx_buf.len > 0) {
|
|
(void)sh_mqtt_work_reschedule(&sh_mqtt->publish_dwork, MQTT_SEND_DELAY_MS);
|
|
}
|
|
|
|
/* Inform shell that it is ready for next TX */
|
|
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh_mqtt->shell_context);
|
|
|
|
out:
|
|
/* We will always assume that we sent everything */
|
|
*cnt = length;
|
|
return rc;
|
|
}
|
|
|
|
static int read_data(const struct shell_transport *transport, void *data, size_t length,
|
|
size_t *cnt)
|
|
{
|
|
ARG_UNUSED(transport);
|
|
|
|
/* Not initialized yet */
|
|
if (sh_mqtt == NULL) {
|
|
return -ENODEV;
|
|
}
|
|
|
|
/* Not subscribed yet */
|
|
if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
|
|
*cnt = 0;
|
|
return 0;
|
|
}
|
|
|
|
*cnt = ring_buf_get(&sh_mqtt->rx_rb, data, length);
|
|
|
|
/* Inform the shell if there are still data in the rb */
|
|
if (ring_buf_size_get(&sh_mqtt->rx_rb) > 0) {
|
|
sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
|
|
.uninit = uninit,
|
|
.enable = enable,
|
|
.write = write_data,
|
|
.read = read_data };
|
|
|
|
static int enable_shell_mqtt(void)
|
|
{
|
|
|
|
bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
|
|
uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
|
|
CONFIG_LOG_MAX_LEVEL :
|
|
CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
|
|
static const struct shell_backend_config_flags cfg_flags = {
|
|
.insert_mode = 0,
|
|
.echo = 0,
|
|
.obscure = 0,
|
|
.mode_delete = 0,
|
|
.use_colors = 0,
|
|
.use_vt100 = 0,
|
|
};
|
|
|
|
return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
|
|
}
|
|
|
|
/* Function is used for testing purposes */
|
|
const struct shell *shell_backend_mqtt_get_ptr(void)
|
|
{
|
|
return &shell_mqtt;
|
|
}
|
|
|
|
SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
|