xref: /nrf52832-nimble/rt-thread/components/net/lwip-2.1.0/src/apps/mqtt/mqtt.c (revision 104654410c56c573564690304ae786df310c91fc)
1 /**
2  * @file
3  * MQTT client
4  *
5  * @defgroup mqtt MQTT client
6  * @ingroup apps
7  * @verbinclude mqtt_client.txt
8  */
9 
10 /*
11  * Copyright (c) 2016 Erik Andersson <[email protected]>
12  * All rights reserved.
13  *
14  * Redistribution and use in source and binary forms, with or without modification,
15  * are permitted provided that the following conditions are met:
16  *
17  * 1. Redistributions of source code must retain the above copyright notice,
18  *    this list of conditions and the following disclaimer.
19  * 2. Redistributions in binary form must reproduce the above copyright notice,
20  *    this list of conditions and the following disclaimer in the documentation
21  *    and/or other materials provided with the distribution.
22  * 3. The name of the author may not be used to endorse or promote products
23  *    derived from this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34  * OF SUCH DAMAGE.
35  *
36  * This file is part of the lwIP TCP/IP stack
37  *
38  * Author: Erik Andersson <[email protected]>
39  *
40  *
41  * @todo:
42  * - Handle large outgoing payloads for PUBLISH messages
43  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44  * - Add support for legacy MQTT protocol version
45  *
46  * Please coordinate changes and requests with Erik Andersson
47  * Erik Andersson <[email protected]>
48  *
49  */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/apps/mqtt_priv.h"
52 #include "lwip/timeouts.h"
53 #include "lwip/ip_addr.h"
54 #include "lwip/mem.h"
55 #include "lwip/err.h"
56 #include "lwip/pbuf.h"
57 #include "lwip/altcp.h"
58 #include "lwip/altcp_tcp.h"
59 #include "lwip/altcp_tls.h"
60 #include <string.h>
61 
62 #if LWIP_TCP && LWIP_CALLBACK_API
63 
64 /**
65  * MQTT_DEBUG: Default is off.
66  */
67 #if !defined MQTT_DEBUG || defined __DOXYGEN__
68 #define MQTT_DEBUG                  LWIP_DBG_OFF
69 #endif
70 
71 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
72 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
73 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
74 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
75 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
76 
77 
78 
79 /**
80  * MQTT client connection states
81  */
82 enum {
83   TCP_DISCONNECTED,
84   TCP_CONNECTING,
85   MQTT_CONNECTING,
86   MQTT_CONNECTED
87 };
88 
89 /**
90  * MQTT control message types
91  */
92 enum mqtt_message_type {
93   MQTT_MSG_TYPE_CONNECT = 1,
94   MQTT_MSG_TYPE_CONNACK = 2,
95   MQTT_MSG_TYPE_PUBLISH = 3,
96   MQTT_MSG_TYPE_PUBACK = 4,
97   MQTT_MSG_TYPE_PUBREC = 5,
98   MQTT_MSG_TYPE_PUBREL = 6,
99   MQTT_MSG_TYPE_PUBCOMP = 7,
100   MQTT_MSG_TYPE_SUBSCRIBE = 8,
101   MQTT_MSG_TYPE_SUBACK = 9,
102   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
103   MQTT_MSG_TYPE_UNSUBACK = 11,
104   MQTT_MSG_TYPE_PINGREQ = 12,
105   MQTT_MSG_TYPE_PINGRESP = 13,
106   MQTT_MSG_TYPE_DISCONNECT = 14
107 };
108 
109 /** Helpers to extract control packet type and qos from first byte in fixed header */
110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
112 
113 /**
114  * MQTT connect flags, only used in CONNECT message
115  */
116 enum mqtt_connect_flag {
117   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
118   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
119   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
120   MQTT_CONNECT_FLAG_WILL = 1 << 2,
121   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
122 };
123 
124 
125 static void mqtt_cyclic_timer(void *arg);
126 
127 #if defined(LWIP_DEBUG)
128 static const char *const mqtt_message_type_str[15] = {
129   "UNDEFINED",
130   "CONNECT",
131   "CONNACK",
132   "PUBLISH",
133   "PUBACK",
134   "PUBREC",
135   "PUBREL",
136   "PUBCOMP",
137   "SUBSCRIBE",
138   "SUBACK",
139   "UNSUBSCRIBE",
140   "UNSUBACK",
141   "PINGREQ",
142   "PINGRESP",
143   "DISCONNECT"
144 };
145 
146 /**
147  * Message type value to string
148  * @param msg_type see enum mqtt_message_type
149  *
150  * @return Control message type text string
151  */
152 static const char *
mqtt_msg_type_to_str(u8_t msg_type)153 mqtt_msg_type_to_str(u8_t msg_type)
154 {
155   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
156     msg_type = 0;
157   }
158   return mqtt_message_type_str[msg_type];
159 }
160 
161 #endif
162 
163 
164 /**
165  * Generate MQTT packet identifier
166  * @param client MQTT client
167  * @return New packet identifier, range 1 to 65535
168  */
169 static u16_t
msg_generate_packet_id(mqtt_client_t * client)170 msg_generate_packet_id(mqtt_client_t *client)
171 {
172   client->pkt_id_seq++;
173   if (client->pkt_id_seq == 0) {
174     client->pkt_id_seq++;
175   }
176   return client->pkt_id_seq;
177 }
178 
179 /*--------------------------------------------------------------------------------------------------------------------- */
180 /* Output ring buffer */
181 
182 /** Add single item to ring buffer */
183 static void
mqtt_ringbuf_put(struct mqtt_ringbuf_t * rb,u8_t item)184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item)
185 {
186   rb->buf[rb->put] = item;
187   rb->put++;
188   if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) {
189     rb->put = 0;
190   }
191 }
192 
193 /** Return pointer to ring buffer get position */
194 static u8_t *
mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t * rb)195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb)
196 {
197   return &rb->buf[rb->get];
198 }
199 
200 static void
mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t * rb,u16_t len)201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len)
202 {
203   LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE);
204 
205   rb->get += len;
206   if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) {
207     rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE;
208   }
209 }
210 
211 /** Return number of bytes in ring buffer */
212 static u16_t
mqtt_ringbuf_len(struct mqtt_ringbuf_t * rb)213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb)
214 {
215   u32_t len = rb->put - rb->get;
216   if (len > 0xFFFF) {
217     len += MQTT_OUTPUT_RINGBUF_SIZE;
218   }
219   return (u16_t)len;
220 }
221 
222 /** Return number of bytes free in ring buffer */
223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
224 
225 /** Return number of bytes possible to read without wrapping around */
226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
227 
228 /**
229  * Try send as many bytes as possible from output ring buffer
230  * @param rb Output ring buffer
231  * @param tpcb TCP connection handle
232  */
233 static void
mqtt_output_send(struct mqtt_ringbuf_t * rb,struct altcp_pcb * tpcb)234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
235 {
236   err_t err;
237   u8_t wrap = 0;
238   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
239   u16_t send_len = altcp_sndbuf(tpcb);
240   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
241 
242   if (send_len == 0 || ringbuf_lin_len == 0) {
243     return;
244   }
245 
246   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
247                                  send_len, ringbuf_lin_len, rb->get, rb->put));
248 
249   if (send_len > ringbuf_lin_len) {
250     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
251     send_len = ringbuf_lin_len;
252     /* Wrap around if more data in ring buffer after linear portion */
253     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
254   }
255   err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
256   if ((err == ERR_OK) && wrap) {
257     mqtt_ringbuf_advance_get_idx(rb, send_len);
258     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
259     send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
260     err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
261   }
262 
263   if (err == ERR_OK) {
264     mqtt_ringbuf_advance_get_idx(rb, send_len);
265     /* Flush */
266     altcp_output(tpcb);
267   } else {
268     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
269   }
270 }
271 
272 
273 
274 /*--------------------------------------------------------------------------------------------------------------------- */
275 /* Request queue */
276 
277 /**
278  * Create request item
279  * @param r_objs Pointer to request objects
280  * @param r_objs_len Number of array entries
281  * @param pkt_id Packet identifier of request
282  * @param cb Packet callback to call when requests lifetime ends
283  * @param arg Parameter following callback
284  * @return Request or NULL if failed to create
285  */
286 static struct mqtt_request_t *
mqtt_create_request(struct mqtt_request_t * r_objs,size_t r_objs_len,u16_t pkt_id,mqtt_request_cb_t cb,void * arg)287 mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
288 {
289   struct mqtt_request_t *r = NULL;
290   u8_t n;
291   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
292   for (n = 0; n < r_objs_len; n++) {
293     /* Item point to itself if not in use */
294     if (r_objs[n].next == &r_objs[n]) {
295       r = &r_objs[n];
296       r->next = NULL;
297       r->cb = cb;
298       r->arg = arg;
299       r->pkt_id = pkt_id;
300       break;
301     }
302   }
303   return r;
304 }
305 
306 
307 /**
308  * Append request to pending request queue
309  * @param tail Pointer to request queue tail pointer
310  * @param r Request to append
311  */
312 static void
mqtt_append_request(struct mqtt_request_t ** tail,struct mqtt_request_t * r)313 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
314 {
315   struct mqtt_request_t *head = NULL;
316   s16_t time_before = 0;
317   struct mqtt_request_t *iter;
318 
319   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
320 
321   /* Iterate trough queue to find head, and count total timeout time */
322   for (iter = *tail; iter != NULL; iter = iter->next) {
323     time_before += iter->timeout_diff;
324     head = iter;
325   }
326 
327   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
328   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
329   if (head == NULL) {
330     *tail = r;
331   } else {
332     head->next = r;
333   }
334 }
335 
336 
337 /**
338  * Delete request item
339  * @param r Request item to delete
340  */
341 static void
mqtt_delete_request(struct mqtt_request_t * r)342 mqtt_delete_request(struct mqtt_request_t *r)
343 {
344   if (r != NULL) {
345     r->next = r;
346   }
347 }
348 
349 /**
350  * Remove a request item with a specific packet identifier from request queue
351  * @param tail Pointer to request queue tail pointer
352  * @param pkt_id Packet identifier of request to take
353  * @return Request item if found, NULL if not
354  */
355 static struct mqtt_request_t *
mqtt_take_request(struct mqtt_request_t ** tail,u16_t pkt_id)356 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
357 {
358   struct mqtt_request_t *iter = NULL, *prev = NULL;
359   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
360   /* Search all request for pkt_id */
361   for (iter = *tail; iter != NULL; iter = iter->next) {
362     if (iter->pkt_id == pkt_id) {
363       break;
364     }
365     prev = iter;
366   }
367 
368   /* If request was found */
369   if (iter != NULL) {
370     /* unchain */
371     if (prev == NULL) {
372       *tail = iter->next;
373     } else {
374       prev->next = iter->next;
375     }
376     /* If exists, add remaining timeout time for the request to next */
377     if (iter->next != NULL) {
378       iter->next->timeout_diff += iter->timeout_diff;
379     }
380     iter->next = NULL;
381   }
382   return iter;
383 }
384 
385 /**
386  * Handle requests timeout
387  * @param tail Pointer to request queue tail pointer
388  * @param t Time since last call in seconds
389  */
390 static void
mqtt_request_time_elapsed(struct mqtt_request_t ** tail,u8_t t)391 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
392 {
393   struct mqtt_request_t *r;
394   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
395   r = *tail;
396   while (t > 0 && r != NULL) {
397     if (t >= r->timeout_diff) {
398       t -= (u8_t)r->timeout_diff;
399       /* Unchain */
400       *tail = r->next;
401       /* Notify upper layer about timeout */
402       if (r->cb != NULL) {
403         r->cb(r->arg, ERR_TIMEOUT);
404       }
405       mqtt_delete_request(r);
406       /* Tail might be be modified in callback, so re-read it in every iteration */
407       r = *(struct mqtt_request_t *const volatile *)tail;
408     } else {
409       r->timeout_diff -= t;
410       t = 0;
411     }
412   }
413 }
414 
415 /**
416  * Free all request items
417  * @param tail Pointer to request queue tail pointer
418  */
419 static void
mqtt_clear_requests(struct mqtt_request_t ** tail)420 mqtt_clear_requests(struct mqtt_request_t **tail)
421 {
422   struct mqtt_request_t *iter, *next;
423   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
424   for (iter = *tail; iter != NULL; iter = next) {
425     next = iter->next;
426     mqtt_delete_request(iter);
427   }
428   *tail = NULL;
429 }
430 /**
431  * Initialize all request items
432  * @param r_objs Pointer to request objects
433  * @param r_objs_len Number of array entries
434  */
435 static void
mqtt_init_requests(struct mqtt_request_t * r_objs,size_t r_objs_len)436 mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len)
437 {
438   u8_t n;
439   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
440   for (n = 0; n < r_objs_len; n++) {
441     /* Item pointing to itself indicates unused */
442     r_objs[n].next = &r_objs[n];
443   }
444 }
445 
446 /*--------------------------------------------------------------------------------------------------------------------- */
447 /* Output message build helpers */
448 
449 
450 static void
mqtt_output_append_u8(struct mqtt_ringbuf_t * rb,u8_t value)451 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
452 {
453   mqtt_ringbuf_put(rb, value);
454 }
455 
456 static
mqtt_output_append_u16(struct mqtt_ringbuf_t * rb,u16_t value)457 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
458 {
459   mqtt_ringbuf_put(rb, value >> 8);
460   mqtt_ringbuf_put(rb, value & 0xff);
461 }
462 
463 static void
mqtt_output_append_buf(struct mqtt_ringbuf_t * rb,const void * data,u16_t length)464 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
465 {
466   u16_t n;
467   for (n = 0; n < length; n++) {
468     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
469   }
470 }
471 
472 static void
mqtt_output_append_string(struct mqtt_ringbuf_t * rb,const char * str,u16_t length)473 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
474 {
475   u16_t n;
476   mqtt_ringbuf_put(rb, length >> 8);
477   mqtt_ringbuf_put(rb, length & 0xff);
478   for (n = 0; n < length; n++) {
479     mqtt_ringbuf_put(rb, str[n]);
480   }
481 }
482 
483 /**
484  * Append fixed header
485  * @param rb Output ring buffer
486  * @param msg_type see enum mqtt_message_type
487  * @param fdup MQTT DUP flag
488  * @param fqos MQTT QoS field
489  * @param fretain MQTT retain flag
490  * @param r_length Remaining length after fixed header
491  */
492 
493 static void
mqtt_output_append_fixed_header(struct mqtt_ringbuf_t * rb,u8_t msg_type,u8_t fdup,u8_t fqos,u8_t fretain,u16_t r_length)494 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup,
495                                 u8_t fqos, u8_t fretain, u16_t r_length)
496 {
497   /* Start with control byte */
498   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
499   /* Encode remaining length field */
500   do {
501     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
502     r_length >>= 7;
503   } while (r_length > 0);
504 }
505 
506 
507 /**
508  * Check output buffer space
509  * @param rb Output ring buffer
510  * @param r_length Remaining length after fixed header
511  * @return 1 if message will fit, 0 if not enough buffer space
512  */
513 static u8_t
mqtt_output_check_space(struct mqtt_ringbuf_t * rb,u16_t r_length)514 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
515 {
516   /* Start with length of type byte + remaining length */
517   u16_t total_len = 1 + r_length;
518 
519   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
520 
521   /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
522   do {
523     total_len++;
524     r_length >>= 7;
525   } while (r_length > 0);
526 
527   return (total_len <= mqtt_ringbuf_free(rb));
528 }
529 
530 
531 /**
532  * Close connection to server
533  * @param client MQTT client
534  * @param reason Reason for disconnection
535  */
536 static void
mqtt_close(mqtt_client_t * client,mqtt_connection_status_t reason)537 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
538 {
539   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
540 
541   /* Bring down TCP connection if not already done */
542   if (client->conn != NULL) {
543     err_t res;
544     altcp_recv(client->conn, NULL);
545     altcp_err(client->conn,  NULL);
546     altcp_sent(client->conn, NULL);
547     res = altcp_close(client->conn);
548     if (res != ERR_OK) {
549       altcp_abort(client->conn);
550       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res)));
551     }
552     client->conn = NULL;
553   }
554 
555   /* Remove all pending requests */
556   mqtt_clear_requests(&client->pend_req_queue);
557   /* Stop cyclic timer */
558   sys_untimeout(mqtt_cyclic_timer, client);
559 
560   /* Notify upper layer of disconnection if changed state */
561   if (client->conn_state != TCP_DISCONNECTED) {
562 
563     client->conn_state = TCP_DISCONNECTED;
564     if (client->connect_cb != NULL) {
565       client->connect_cb(client, client->connect_arg, reason);
566     }
567   }
568 }
569 
570 
571 /**
572  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
573  * @param arg MQTT client
574  */
575 static void
mqtt_cyclic_timer(void * arg)576 mqtt_cyclic_timer(void *arg)
577 {
578   u8_t restart_timer = 1;
579   mqtt_client_t *client = (mqtt_client_t *)arg;
580   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
581 
582   if (client->conn_state == MQTT_CONNECTING) {
583     client->cyclic_tick++;
584     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
585       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
586       /* Disconnect TCP */
587       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
588       restart_timer = 0;
589     }
590   } else if (client->conn_state == MQTT_CONNECTED) {
591     /* Handle timeout for pending requests */
592     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
593 
594     /* keep_alive > 0 means keep alive functionality shall be used */
595     if (client->keep_alive > 0) {
596 
597       client->server_watchdog++;
598       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
599       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) {
600         LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
601         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
602         restart_timer = 0;
603       }
604 
605       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
606       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
607         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
608         if (mqtt_output_check_space(&client->output, 0) != 0) {
609           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
610           client->cyclic_tick = 0;
611         }
612       } else {
613         client->cyclic_tick++;
614       }
615     }
616   } else {
617     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
618     restart_timer = 0;
619   }
620   if (restart_timer) {
621     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg);
622   }
623 }
624 
625 
626 /**
627  * Send PUBACK, PUBREC or PUBREL response message
628  * @param client MQTT client
629  * @param msg PUBACK, PUBREC or PUBREL
630  * @param pkt_id Packet identifier
631  * @param qos QoS value
632  * @return ERR_OK if successful, ERR_MEM if out of memory
633  */
634 static err_t
pub_ack_rec_rel_response(mqtt_client_t * client,u8_t msg,u16_t pkt_id,u8_t qos)635 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
636 {
637   err_t err = ERR_OK;
638   if (mqtt_output_check_space(&client->output, 2)) {
639     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
640     mqtt_output_append_u16(&client->output, pkt_id);
641     mqtt_output_send(&client->output, client->conn);
642   } else {
643     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
644                                    mqtt_msg_type_to_str(msg), pkt_id));
645     err = ERR_MEM;
646   }
647   return err;
648 }
649 
650 /**
651  * Subscribe response from server
652  * @param r Matching request
653  * @param result Result code from server
654  */
655 static void
mqtt_incomming_suback(struct mqtt_request_t * r,u8_t result)656 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
657 {
658   if (r->cb != NULL) {
659     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
660   }
661 }
662 
663 
664 /**
665  * Complete MQTT message received or buffer full
666  * @param client MQTT client
667  * @param fixed_hdr_idx header index
668  * @param length length received part
669  * @param remaining_length Remaining length of complete message
670  */
671 static mqtt_connection_status_t
mqtt_message_received(mqtt_client_t * client,u8_t fixed_hdr_idx,u16_t length,u32_t remaining_length)672 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
673 {
674   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
675 
676   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
677   size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_idx;
678 
679   /* Control packet type */
680   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
681   u16_t pkt_id = 0;
682 
683   LWIP_ASSERT("client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN", client->msg_idx < MQTT_VAR_HEADER_BUFFER_LEN);
684   LWIP_ASSERT("fixed_hdr_idx <= client->msg_idx", fixed_hdr_idx <= client->msg_idx);
685   LWIP_ERROR("buffer length mismatch", fixed_hdr_idx + length <= MQTT_VAR_HEADER_BUFFER_LEN,
686              return MQTT_CONNECT_DISCONNECTED);
687 
688   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
689     if (client->conn_state == MQTT_CONNECTING) {
690       if (length < 2) {
691         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n"));
692         goto out_disconnect;
693       }
694       /* Get result code from CONNACK */
695       res = (mqtt_connection_status_t)var_hdr_payload[1];
696       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res));
697       if (res == MQTT_CONNECT_ACCEPTED) {
698         /* Reset cyclic_tick when changing to connected state */
699         client->cyclic_tick = 0;
700         client->conn_state = MQTT_CONNECTED;
701         /* Notify upper layer */
702         if (client->connect_cb != 0) {
703           client->connect_cb(client, client->connect_arg, res);
704         }
705       }
706     } else {
707       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n"));
708     }
709   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
710     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n"));
711 
712   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
713     u16_t payload_offset = 0;
714     u16_t payload_length = length;
715     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
716 
717     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
718       /* Should have topic and pkt id*/
719       u8_t *topic;
720       u16_t after_topic;
721       u8_t bkp;
722       u16_t topic_len;
723       u16_t qos_len = (qos ? 2U : 0U);
724       if (length < 2 + qos_len) {
725         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n"));
726         goto out_disconnect;
727       }
728       topic_len = var_hdr_payload[0];
729       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
730       if ((topic_len > length - (2 + qos_len)) ||
731           (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) {
732         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n"));
733         goto out_disconnect;
734       }
735 
736       topic = var_hdr_payload + 2;
737       after_topic = 2 + topic_len;
738       /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */
739       if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) {
740         LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
741         goto out_disconnect;
742       }
743 
744       /* id for QoS 1 and 2 */
745       if (qos > 0) {
746         if (length < after_topic + 2U) {
747           LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n"));
748           goto out_disconnect;
749         }
750         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
751         after_topic += 2;
752       } else {
753         client->inpub_pkt_id = 0;
754       }
755       /* Take backup of byte after topic */
756       bkp = topic[topic_len];
757       /* Zero terminate string */
758       topic[topic_len] = 0;
759       /* Payload data remaining in receive buffer */
760       payload_length = length - after_topic;
761       payload_offset = after_topic;
762 
763       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n",
764                                      qos, topic, remaining_length + payload_length));
765       if (client->pub_cb != NULL) {
766         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
767       }
768       /* Restore byte after topic */
769       topic[topic_len] = bkp;
770     }
771     if (payload_length > 0 || remaining_length == 0) {
772       if (length < (size_t)(payload_offset + payload_length)) {
773         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n"));
774         goto out_disconnect;
775       }
776       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
777       /* Reply if QoS > 0 */
778       if (remaining_length == 0 && qos > 0) {
779         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
780         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
781         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
782                                        mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
783         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
784       }
785     }
786   } else {
787     /* Get packet identifier */
788     pkt_id = (u16_t)var_hdr_payload[0] << 8;
789     pkt_id |= (u16_t)var_hdr_payload[1];
790     if (pkt_id == 0) {
791       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
792       goto out_disconnect;
793     }
794     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
795       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id));
796       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
797 
798     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
799       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id));
800       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
801 
802     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
803                pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
804       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
805       if (r != NULL) {
806         LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
807         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
808           if (length < 3) {
809             LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n"));
810             goto out_disconnect;
811           } else {
812             mqtt_incomming_suback(r, var_hdr_payload[2]);
813           }
814         } else if (r->cb != NULL) {
815           r->cb(r->arg, ERR_OK);
816         }
817         mqtt_delete_request(r);
818       } else {
819         LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
820       }
821     } else {
822       LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
823       goto out_disconnect;
824     }
825   }
826   return res;
827 out_disconnect:
828   return MQTT_CONNECT_DISCONNECTED;
829 }
830 
831 
832 /**
833  * MQTT incoming message parser
834  * @param client MQTT client
835  * @param p PBUF chain of received data
836  * @return Connection status
837  */
838 static mqtt_connection_status_t
mqtt_parse_incoming(mqtt_client_t * client,struct pbuf * p)839 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
840 {
841   u16_t in_offset = 0;
842   u32_t msg_rem_len = 0;
843   u8_t fixed_hdr_idx = 0;
844   u8_t b = 0;
845 
846   while (p->tot_len > in_offset) {
847     /* We ALWAYS parse the header here first. Even if the header was not
848        included in this segment, we re-parse it here by buffering it in
849        client->rx_buffer. client->msg_idx keeps track of this. */
850     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
851 
852       if (fixed_hdr_idx < client->msg_idx) {
853         /* parse header from old pbuf (buffered in client->rx_buffer) */
854         b = client->rx_buffer[fixed_hdr_idx];
855       } else {
856         /* parse header from this pbuf and save it in client->rx_buffer in case
857            it comes in segmented */
858         b = pbuf_get_at(p, in_offset++);
859         client->rx_buffer[client->msg_idx++] = b;
860       }
861       fixed_hdr_idx++;
862 
863       if (fixed_hdr_idx >= 2) {
864         /* fixed header contains at least 2 bytes but can contain more, depending on
865            'remaining length'. All bytes but the last of this have 0x80 set to
866            indicate more bytes are coming. */
867         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
868         if ((b & 0x80) == 0) {
869           /* fixed header is done */
870           LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
871           if (msg_rem_len == 0) {
872             /* Complete message with no extra headers of payload received */
873             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
874             client->msg_idx = 0;
875             fixed_hdr_idx = 0;
876           } else {
877             /* Bytes remaining in message (changes remaining length if this is
878                not the first segment of this message) */
879             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
880           }
881         }
882       }
883     } else {
884       /* Fixed header has been parsed, parse variable header */
885       u16_t cpy_len, cpy_start, buffer_space;
886 
887       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
888 
889       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
890       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
891 
892       /* Limit to available space in buffer */
893       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
894       if (cpy_len > buffer_space) {
895         cpy_len = buffer_space;
896       }
897       pbuf_copy_partial(p, client->rx_buffer + cpy_start, cpy_len, in_offset);
898 
899       /* Advance get and put indexes  */
900       client->msg_idx += cpy_len;
901       in_offset += cpy_len;
902       msg_rem_len -= cpy_len;
903 
904       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
905       if ((msg_rem_len == 0) || (cpy_len == buffer_space)) {
906         /* Whole message received or buffer is full */
907         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
908         if (res != MQTT_CONNECT_ACCEPTED) {
909           return res;
910         }
911         if (msg_rem_len == 0) {
912           /* Reset parser state */
913           client->msg_idx = 0;
914           /* msg_tot_len = 0; */
915           fixed_hdr_idx = 0;
916         }
917       }
918     }
919   }
920   return MQTT_CONNECT_ACCEPTED;
921 }
922 
923 
924 /**
925  * TCP received callback function. @see tcp_recv_fn
926  * @param arg MQTT client
927  * @param p PBUF chain of received data
928  * @param err Passed as return value if not ERR_OK
929  * @return ERR_OK or err passed into callback
930  */
931 static err_t
mqtt_tcp_recv_cb(void * arg,struct altcp_pcb * pcb,struct pbuf * p,err_t err)932 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
933 {
934   mqtt_client_t *client = (mqtt_client_t *)arg;
935   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
936   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
937 
938   if (p == NULL) {
939     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
940     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
941   } else {
942     mqtt_connection_status_t res;
943     if (err != ERR_OK) {
944       LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err));
945       pbuf_free(p);
946       return err;
947     }
948 
949     /* Tell remote that data has been received */
950     altcp_recved(pcb, p->tot_len);
951     res = mqtt_parse_incoming(client, p);
952     pbuf_free(p);
953 
954     if (res != MQTT_CONNECT_ACCEPTED) {
955       mqtt_close(client, res);
956     }
957     /* If keep alive functionality is used */
958     if (client->keep_alive != 0) {
959       /* Reset server alive watchdog */
960       client->server_watchdog = 0;
961     }
962 
963   }
964   return ERR_OK;
965 }
966 
967 
968 /**
969  * TCP data sent callback function. @see tcp_sent_fn
970  * @param arg MQTT client
971  * @param tpcb TCP connection handle
972  * @param len Number of bytes sent
973  * @return ERR_OK
974  */
975 static err_t
mqtt_tcp_sent_cb(void * arg,struct altcp_pcb * tpcb,u16_t len)976 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
977 {
978   mqtt_client_t *client = (mqtt_client_t *)arg;
979 
980   LWIP_UNUSED_ARG(tpcb);
981   LWIP_UNUSED_ARG(len);
982 
983   if (client->conn_state == MQTT_CONNECTED) {
984     struct mqtt_request_t *r;
985 
986     /* Reset keep-alive send timer and server watchdog */
987     client->cyclic_tick = 0;
988     client->server_watchdog = 0;
989     /* QoS 0 publish has no response from server, so call its callbacks here */
990     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
991       LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
992       if (r->cb != NULL) {
993         r->cb(r->arg, ERR_OK);
994       }
995       mqtt_delete_request(r);
996     }
997     /* Try send any remaining buffers from output queue */
998     mqtt_output_send(&client->output, client->conn);
999   }
1000   return ERR_OK;
1001 }
1002 
1003 /**
1004  * TCP error callback function. @see tcp_err_fn
1005  * @param arg MQTT client
1006  * @param err Error encountered
1007  */
1008 static void
mqtt_tcp_err_cb(void * arg,err_t err)1009 mqtt_tcp_err_cb(void *arg, err_t err)
1010 {
1011   mqtt_client_t *client = (mqtt_client_t *)arg;
1012   LWIP_UNUSED_ARG(err); /* only used for debug output */
1013   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
1014   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
1015   /* Set conn to null before calling close as pcb is already deallocated*/
1016   client->conn = 0;
1017   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
1018 }
1019 
1020 /**
1021  * TCP poll callback function. @see tcp_poll_fn
1022  * @param arg MQTT client
1023  * @param tpcb TCP connection handle
1024  * @return err ERR_OK
1025  */
1026 static err_t
mqtt_tcp_poll_cb(void * arg,struct altcp_pcb * tpcb)1027 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
1028 {
1029   mqtt_client_t *client = (mqtt_client_t *)arg;
1030   if (client->conn_state == MQTT_CONNECTED) {
1031     /* Try send any remaining buffers from output queue */
1032     mqtt_output_send(&client->output, tpcb);
1033   }
1034   return ERR_OK;
1035 }
1036 
1037 /**
1038  * TCP connect callback function. @see tcp_connected_fn
1039  * @param arg MQTT client
1040  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
1041  * @return ERR_OK
1042  */
1043 static err_t
mqtt_tcp_connect_cb(void * arg,struct altcp_pcb * tpcb,err_t err)1044 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
1045 {
1046   mqtt_client_t *client = (mqtt_client_t *)arg;
1047 
1048   if (err != ERR_OK) {
1049     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
1050     return err;
1051   }
1052 
1053   /* Initiate receiver state */
1054   client->msg_idx = 0;
1055 
1056   /* Setup TCP callbacks */
1057   altcp_recv(tpcb, mqtt_tcp_recv_cb);
1058   altcp_sent(tpcb, mqtt_tcp_sent_cb);
1059   altcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
1060 
1061   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n"));
1062   /* Enter MQTT connect state */
1063   client->conn_state = MQTT_CONNECTING;
1064 
1065   /* Start cyclic timer */
1066   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client);
1067   client->cyclic_tick = 0;
1068 
1069   /* Start transmission from output queue, connect message is the first one out*/
1070   mqtt_output_send(&client->output, client->conn);
1071 
1072   return ERR_OK;
1073 }
1074 
1075 
1076 
1077 /*---------------------------------------------------------------------------------------------------- */
1078 /* Public API */
1079 
1080 
1081 /**
1082  * @ingroup mqtt
1083  * MQTT publish function.
1084  * @param client MQTT client
1085  * @param topic Publish topic string
1086  * @param payload Data to publish (NULL is allowed)
1087  * @param payload_length Length of payload (0 is allowed)
1088  * @param qos Quality of service, 0 1 or 2
1089  * @param retain MQTT retain flag
1090  * @param cb Callback to call when publish is complete or has timed out
1091  * @param arg User supplied argument to publish callback
1092  * @return ERR_OK if successful
1093  *         ERR_CONN if client is disconnected
1094  *         ERR_MEM if short on memory
1095  */
1096 err_t
mqtt_publish(mqtt_client_t * client,const char * topic,const void * payload,u16_t payload_length,u8_t qos,u8_t retain,mqtt_request_cb_t cb,void * arg)1097 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1098              mqtt_request_cb_t cb, void *arg)
1099 {
1100   struct mqtt_request_t *r;
1101   u16_t pkt_id;
1102   size_t topic_strlen;
1103   size_t total_len;
1104   u16_t topic_len;
1105   u16_t remaining_length;
1106 
1107   LWIP_ASSERT_CORE_LOCKED();
1108   LWIP_ASSERT("mqtt_publish: client != NULL", client);
1109   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1110   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1111 
1112   topic_strlen = strlen(topic);
1113   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1114   topic_len = (u16_t)topic_strlen;
1115   total_len = 2 + topic_len + payload_length;
1116 
1117   if (qos > 0) {
1118     total_len += 2;
1119     /* Generate pkt_id id for QoS1 and 2 */
1120     pkt_id = msg_generate_packet_id(client);
1121   } else {
1122     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1123     pkt_id = 0;
1124   }
1125   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1126   remaining_length = (u16_t)total_len;
1127 
1128   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1129 
1130   r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1131   if (r == NULL) {
1132     return ERR_MEM;
1133   }
1134 
1135   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1136     mqtt_delete_request(r);
1137     return ERR_MEM;
1138   }
1139   /* Append fixed header */
1140   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1141 
1142   /* Append Topic */
1143   mqtt_output_append_string(&client->output, topic, topic_len);
1144 
1145   /* Append packet if for QoS 1 and 2*/
1146   if (qos > 0) {
1147     mqtt_output_append_u16(&client->output, pkt_id);
1148   }
1149 
1150   /* Append optional publish payload */
1151   if ((payload != NULL) && (payload_length > 0)) {
1152     mqtt_output_append_buf(&client->output, payload, payload_length);
1153   }
1154 
1155   mqtt_append_request(&client->pend_req_queue, r);
1156   mqtt_output_send(&client->output, client->conn);
1157   return ERR_OK;
1158 }
1159 
1160 
1161 /**
1162  * @ingroup mqtt
1163  * MQTT subscribe/unsubscribe function.
1164  * @param client MQTT client
1165  * @param topic topic to subscribe to
1166  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1167  * @param cb Callback to call when subscribe/unsubscribe reponse is received
1168  * @param arg User supplied argument to publish callback
1169  * @param sub 1 for subscribe, 0 for unsubscribe
1170  * @return ERR_OK if successful, @see err_t enum for other results
1171  */
1172 err_t
mqtt_sub_unsub(mqtt_client_t * client,const char * topic,u8_t qos,mqtt_request_cb_t cb,void * arg,u8_t sub)1173 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1174 {
1175   size_t topic_strlen;
1176   size_t total_len;
1177   u16_t topic_len;
1178   u16_t remaining_length;
1179   u16_t pkt_id;
1180   struct mqtt_request_t *r;
1181 
1182   LWIP_ASSERT_CORE_LOCKED();
1183   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1184   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1185 
1186   topic_strlen = strlen(topic);
1187   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1188   topic_len = (u16_t)topic_strlen;
1189   /* Topic string, pkt_id, qos for subscribe */
1190   total_len =  topic_len + 2 + 2 + (sub != 0);
1191   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1192   remaining_length = (u16_t)total_len;
1193 
1194   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1195   if (client->conn_state == TCP_DISCONNECTED) {
1196     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1197     return ERR_CONN;
1198   }
1199 
1200   pkt_id = msg_generate_packet_id(client);
1201   r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1202   if (r == NULL) {
1203     return ERR_MEM;
1204   }
1205 
1206   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1207     mqtt_delete_request(r);
1208     return ERR_MEM;
1209   }
1210 
1211   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1212 
1213   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1214   /* Packet id */
1215   mqtt_output_append_u16(&client->output, pkt_id);
1216   /* Topic */
1217   mqtt_output_append_string(&client->output, topic, topic_len);
1218   /* QoS */
1219   if (sub != 0) {
1220     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1221   }
1222 
1223   mqtt_append_request(&client->pend_req_queue, r);
1224   mqtt_output_send(&client->output, client->conn);
1225   return ERR_OK;
1226 }
1227 
1228 
1229 /**
1230  * @ingroup mqtt
1231  * Set callback to handle incoming publish requests from server
1232  * @param client MQTT client
1233  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1234  * @param data_cb Callback for each fragment of payload that arrives
1235  * @param arg User supplied argument to both callbacks
1236  */
1237 void
mqtt_set_inpub_callback(mqtt_client_t * client,mqtt_incoming_publish_cb_t pub_cb,mqtt_incoming_data_cb_t data_cb,void * arg)1238 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1239                         mqtt_incoming_data_cb_t data_cb, void *arg)
1240 {
1241   LWIP_ASSERT_CORE_LOCKED();
1242   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1243   client->data_cb = data_cb;
1244   client->pub_cb = pub_cb;
1245   client->inpub_arg = arg;
1246 }
1247 
1248 /**
1249  * @ingroup mqtt
1250  * Create a new MQTT client instance
1251  * @return Pointer to instance on success, NULL otherwise
1252  */
1253 mqtt_client_t *
mqtt_client_new(void)1254 mqtt_client_new(void)
1255 {
1256   LWIP_ASSERT_CORE_LOCKED();
1257   return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t));
1258 }
1259 
1260 /**
1261  * @ingroup mqtt
1262  * Free MQTT client instance
1263  * @param client Pointer to instance to be freed
1264  */
1265 void
mqtt_client_free(mqtt_client_t * client)1266 mqtt_client_free(mqtt_client_t *client)
1267 {
1268   mem_free(client);
1269 }
1270 
1271 /**
1272  * @ingroup mqtt
1273  * Connect to MQTT server
1274  * @param client MQTT client
1275  * @param ip_addr Server IP
1276  * @param port Server port
1277  * @param cb Connection state change callback
1278  * @param arg User supplied argument to connection callback
1279  * @param client_info Client identification and connection options
1280  * @return ERR_OK if successful, @see err_t enum for other results
1281  */
1282 err_t
mqtt_client_connect(mqtt_client_t * client,const ip_addr_t * ip_addr,u16_t port,mqtt_connection_cb_t cb,void * arg,const struct mqtt_connect_client_info_t * client_info)1283 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1284                     const struct mqtt_connect_client_info_t *client_info)
1285 {
1286   err_t err;
1287   size_t len;
1288   u16_t client_id_length;
1289   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1290   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1291   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1292   u16_t client_user_len = 0, client_pass_len = 0;
1293 
1294   LWIP_ASSERT_CORE_LOCKED();
1295   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1296   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1297   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1298   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1299 
1300   if (client->conn_state != TCP_DISCONNECTED) {
1301     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n"));
1302     return ERR_ISCONN;
1303   }
1304 
1305   /* Wipe clean */
1306   memset(client, 0, sizeof(mqtt_client_t));
1307   client->connect_arg = arg;
1308   client->connect_cb = cb;
1309   client->keep_alive = client_info->keep_alive;
1310   mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list));
1311 
1312   /* Build connect message */
1313   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1314     flags |= MQTT_CONNECT_FLAG_WILL;
1315     flags |= (client_info->will_qos & 3) << 3;
1316     if (client_info->will_retain) {
1317       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1318     }
1319     len = strlen(client_info->will_topic);
1320     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1321     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1322     will_topic_len = (u8_t)len;
1323     len = strlen(client_info->will_msg);
1324     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1325     will_msg_len = (u8_t)len;
1326     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1327     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1328     remaining_length = (u16_t)len;
1329   }
1330   if (client_info->client_user != NULL) {
1331     flags |= MQTT_CONNECT_FLAG_USERNAME;
1332     len = strlen(client_info->client_user);
1333     LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL);
1334     LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL);
1335     client_user_len = (u16_t)len;
1336     len = remaining_length + 2 + client_user_len;
1337     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1338     remaining_length = (u16_t)len;
1339   }
1340   if (client_info->client_pass != NULL) {
1341     flags |= MQTT_CONNECT_FLAG_PASSWORD;
1342     len = strlen(client_info->client_pass);
1343     LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL);
1344     LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL);
1345     client_pass_len = (u16_t)len;
1346     len = remaining_length + 2 + client_pass_len;
1347     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1348     remaining_length = (u16_t)len;
1349   }
1350 
1351   /* Don't complicate things, always connect using clean session */
1352   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1353 
1354   len = strlen(client_info->client_id);
1355   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1356   client_id_length = (u16_t)len;
1357   len = remaining_length + 2 + client_id_length;
1358   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1359   remaining_length = (u16_t)len;
1360 
1361   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1362     return ERR_MEM;
1363   }
1364 
1365 #if LWIP_ALTCP && LWIP_ALTCP_TLS
1366   if (client_info->tls_config) {
1367     client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr));
1368   } else
1369 #endif
1370   {
1371     client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr));
1372   }
1373   if (client->conn == NULL) {
1374     return ERR_MEM;
1375   }
1376 
1377   /* Set arg pointer for callbacks */
1378   altcp_arg(client->conn, client);
1379   /* Any local address, pick random local port number */
1380   err = altcp_bind(client->conn, IP_ADDR_ANY, 0);
1381   if (err != ERR_OK) {
1382     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1383     goto tcp_fail;
1384   }
1385   LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1386 
1387   /* Connect to server */
1388   err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1389   if (err != ERR_OK) {
1390     LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1391     goto tcp_fail;
1392   }
1393   /* Set error callback */
1394   altcp_err(client->conn, mqtt_tcp_err_cb);
1395   client->conn_state = TCP_CONNECTING;
1396 
1397   /* Append fixed header */
1398   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1399   /* Append Protocol string */
1400   mqtt_output_append_string(&client->output, "MQTT", 4);
1401   /* Append Protocol level */
1402   mqtt_output_append_u8(&client->output, 4);
1403   /* Append connect flags */
1404   mqtt_output_append_u8(&client->output, flags);
1405   /* Append keep-alive */
1406   mqtt_output_append_u16(&client->output, client_info->keep_alive);
1407   /* Append client id */
1408   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1409   /* Append will message if used */
1410   if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1411     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1412     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1413   }
1414   /* Append user name if given */
1415   if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
1416     mqtt_output_append_string(&client->output, client_info->client_user, client_user_len);
1417   }
1418   /* Append password if given */
1419   if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
1420     mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len);
1421   }
1422   return ERR_OK;
1423 
1424 tcp_fail:
1425   altcp_abort(client->conn);
1426   client->conn = NULL;
1427   return err;
1428 }
1429 
1430 
1431 /**
1432  * @ingroup mqtt
1433  * Disconnect from MQTT server
1434  * @param client MQTT client
1435  */
1436 void
mqtt_disconnect(mqtt_client_t * client)1437 mqtt_disconnect(mqtt_client_t *client)
1438 {
1439   LWIP_ASSERT_CORE_LOCKED();
1440   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1441   /* If connection in not already closed */
1442   if (client->conn_state != TCP_DISCONNECTED) {
1443     /* Set conn_state before calling mqtt_close to prevent callback from being called */
1444     client->conn_state = TCP_DISCONNECTED;
1445     mqtt_close(client, (mqtt_connection_status_t)0);
1446   }
1447 }
1448 
1449 /**
1450  * @ingroup mqtt
1451  * Check connection with server
1452  * @param client MQTT client
1453  * @return 1 if connected to server, 0 otherwise
1454  */
1455 u8_t
mqtt_client_is_connected(mqtt_client_t * client)1456 mqtt_client_is_connected(mqtt_client_t *client)
1457 {
1458   LWIP_ASSERT_CORE_LOCKED();
1459   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1460   return client->conn_state == MQTT_CONNECTED;
1461 }
1462 
1463 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
1464