1 /**
2 * @file
3 * MQTT client
4 *
5 * @defgroup mqtt MQTT client
6 * @ingroup apps
7 * @verbinclude mqtt_client.txt
8 */
9
10 /*
11 * Copyright (c) 2016 Erik Andersson <[email protected]>
12 * All rights reserved.
13 *
14 * Redistribution and use in source and binary forms, with or without modification,
15 * are permitted provided that the following conditions are met:
16 *
17 * 1. Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * 3. The name of the author may not be used to endorse or promote products
23 * derived from this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34 * OF SUCH DAMAGE.
35 *
36 * This file is part of the lwIP TCP/IP stack
37 *
38 * Author: Erik Andersson <[email protected]>
39 *
40 *
41 * @todo:
42 * - Handle large outgoing payloads for PUBLISH messages
43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44 * - Add support for legacy MQTT protocol version
45 *
46 * Please coordinate changes and requests with Erik Andersson
47 * Erik Andersson <[email protected]>
48 *
49 */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/apps/mqtt_priv.h"
52 #include "lwip/timeouts.h"
53 #include "lwip/ip_addr.h"
54 #include "lwip/mem.h"
55 #include "lwip/err.h"
56 #include "lwip/pbuf.h"
57 #include "lwip/altcp.h"
58 #include "lwip/altcp_tcp.h"
59 #include "lwip/altcp_tls.h"
60 #include <string.h>
61
62 #if LWIP_TCP && LWIP_CALLBACK_API
63
64 /**
65 * MQTT_DEBUG: Default is off.
66 */
67 #if !defined MQTT_DEBUG || defined __DOXYGEN__
68 #define MQTT_DEBUG LWIP_DBG_OFF
69 #endif
70
71 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE)
72 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE)
73 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
74 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
75 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
76
77
78
79 /**
80 * MQTT client connection states
81 */
82 enum {
83 TCP_DISCONNECTED,
84 TCP_CONNECTING,
85 MQTT_CONNECTING,
86 MQTT_CONNECTED
87 };
88
89 /**
90 * MQTT control message types
91 */
92 enum mqtt_message_type {
93 MQTT_MSG_TYPE_CONNECT = 1,
94 MQTT_MSG_TYPE_CONNACK = 2,
95 MQTT_MSG_TYPE_PUBLISH = 3,
96 MQTT_MSG_TYPE_PUBACK = 4,
97 MQTT_MSG_TYPE_PUBREC = 5,
98 MQTT_MSG_TYPE_PUBREL = 6,
99 MQTT_MSG_TYPE_PUBCOMP = 7,
100 MQTT_MSG_TYPE_SUBSCRIBE = 8,
101 MQTT_MSG_TYPE_SUBACK = 9,
102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
103 MQTT_MSG_TYPE_UNSUBACK = 11,
104 MQTT_MSG_TYPE_PINGREQ = 12,
105 MQTT_MSG_TYPE_PINGRESP = 13,
106 MQTT_MSG_TYPE_DISCONNECT = 14
107 };
108
109 /** Helpers to extract control packet type and qos from first byte in fixed header */
110 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
111 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
112
113 /**
114 * MQTT connect flags, only used in CONNECT message
115 */
116 enum mqtt_connect_flag {
117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
120 MQTT_CONNECT_FLAG_WILL = 1 << 2,
121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
122 };
123
124
125 static void mqtt_cyclic_timer(void *arg);
126
127 #if defined(LWIP_DEBUG)
128 static const char *const mqtt_message_type_str[15] = {
129 "UNDEFINED",
130 "CONNECT",
131 "CONNACK",
132 "PUBLISH",
133 "PUBACK",
134 "PUBREC",
135 "PUBREL",
136 "PUBCOMP",
137 "SUBSCRIBE",
138 "SUBACK",
139 "UNSUBSCRIBE",
140 "UNSUBACK",
141 "PINGREQ",
142 "PINGRESP",
143 "DISCONNECT"
144 };
145
146 /**
147 * Message type value to string
148 * @param msg_type see enum mqtt_message_type
149 *
150 * @return Control message type text string
151 */
152 static const char *
mqtt_msg_type_to_str(u8_t msg_type)153 mqtt_msg_type_to_str(u8_t msg_type)
154 {
155 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
156 msg_type = 0;
157 }
158 return mqtt_message_type_str[msg_type];
159 }
160
161 #endif
162
163
164 /**
165 * Generate MQTT packet identifier
166 * @param client MQTT client
167 * @return New packet identifier, range 1 to 65535
168 */
169 static u16_t
msg_generate_packet_id(mqtt_client_t * client)170 msg_generate_packet_id(mqtt_client_t *client)
171 {
172 client->pkt_id_seq++;
173 if (client->pkt_id_seq == 0) {
174 client->pkt_id_seq++;
175 }
176 return client->pkt_id_seq;
177 }
178
179 /*--------------------------------------------------------------------------------------------------------------------- */
180 /* Output ring buffer */
181
182 /** Add single item to ring buffer */
183 static void
mqtt_ringbuf_put(struct mqtt_ringbuf_t * rb,u8_t item)184 mqtt_ringbuf_put(struct mqtt_ringbuf_t *rb, u8_t item)
185 {
186 rb->buf[rb->put] = item;
187 rb->put++;
188 if (rb->put >= MQTT_OUTPUT_RINGBUF_SIZE) {
189 rb->put = 0;
190 }
191 }
192
193 /** Return pointer to ring buffer get position */
194 static u8_t *
mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t * rb)195 mqtt_ringbuf_get_ptr(struct mqtt_ringbuf_t *rb)
196 {
197 return &rb->buf[rb->get];
198 }
199
200 static void
mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t * rb,u16_t len)201 mqtt_ringbuf_advance_get_idx(struct mqtt_ringbuf_t *rb, u16_t len)
202 {
203 LWIP_ASSERT("mqtt_ringbuf_advance_get_idx: len < MQTT_OUTPUT_RINGBUF_SIZE", len < MQTT_OUTPUT_RINGBUF_SIZE);
204
205 rb->get += len;
206 if (rb->get >= MQTT_OUTPUT_RINGBUF_SIZE) {
207 rb->get = rb->get - MQTT_OUTPUT_RINGBUF_SIZE;
208 }
209 }
210
211 /** Return number of bytes in ring buffer */
212 static u16_t
mqtt_ringbuf_len(struct mqtt_ringbuf_t * rb)213 mqtt_ringbuf_len(struct mqtt_ringbuf_t *rb)
214 {
215 u32_t len = rb->put - rb->get;
216 if (len > 0xFFFF) {
217 len += MQTT_OUTPUT_RINGBUF_SIZE;
218 }
219 return (u16_t)len;
220 }
221
222 /** Return number of bytes free in ring buffer */
223 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
224
225 /** Return number of bytes possible to read without wrapping around */
226 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
227
228 /**
229 * Try send as many bytes as possible from output ring buffer
230 * @param rb Output ring buffer
231 * @param tpcb TCP connection handle
232 */
233 static void
mqtt_output_send(struct mqtt_ringbuf_t * rb,struct altcp_pcb * tpcb)234 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct altcp_pcb *tpcb)
235 {
236 err_t err;
237 u8_t wrap = 0;
238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
239 u16_t send_len = altcp_sndbuf(tpcb);
240 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
241
242 if (send_len == 0 || ringbuf_lin_len == 0) {
243 return;
244 }
245
246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
247 send_len, ringbuf_lin_len, rb->get, rb->put));
248
249 if (send_len > ringbuf_lin_len) {
250 /* Space in TCP output buffer is larger than available in ring buffer linear portion */
251 send_len = ringbuf_lin_len;
252 /* Wrap around if more data in ring buffer after linear portion */
253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
254 }
255 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
256 if ((err == ERR_OK) && wrap) {
257 mqtt_ringbuf_advance_get_idx(rb, send_len);
258 /* Use the lesser one of ring buffer linear length and TCP send buffer size */
259 send_len = LWIP_MIN(altcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
260 err = altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
261 }
262
263 if (err == ERR_OK) {
264 mqtt_ringbuf_advance_get_idx(rb, send_len);
265 /* Flush */
266 altcp_output(tpcb);
267 } else {
268 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
269 }
270 }
271
272
273
274 /*--------------------------------------------------------------------------------------------------------------------- */
275 /* Request queue */
276
277 /**
278 * Create request item
279 * @param r_objs Pointer to request objects
280 * @param r_objs_len Number of array entries
281 * @param pkt_id Packet identifier of request
282 * @param cb Packet callback to call when requests lifetime ends
283 * @param arg Parameter following callback
284 * @return Request or NULL if failed to create
285 */
286 static struct mqtt_request_t *
mqtt_create_request(struct mqtt_request_t * r_objs,size_t r_objs_len,u16_t pkt_id,mqtt_request_cb_t cb,void * arg)287 mqtt_create_request(struct mqtt_request_t *r_objs, size_t r_objs_len, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
288 {
289 struct mqtt_request_t *r = NULL;
290 u8_t n;
291 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
292 for (n = 0; n < r_objs_len; n++) {
293 /* Item point to itself if not in use */
294 if (r_objs[n].next == &r_objs[n]) {
295 r = &r_objs[n];
296 r->next = NULL;
297 r->cb = cb;
298 r->arg = arg;
299 r->pkt_id = pkt_id;
300 break;
301 }
302 }
303 return r;
304 }
305
306
307 /**
308 * Append request to pending request queue
309 * @param tail Pointer to request queue tail pointer
310 * @param r Request to append
311 */
312 static void
mqtt_append_request(struct mqtt_request_t ** tail,struct mqtt_request_t * r)313 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
314 {
315 struct mqtt_request_t *head = NULL;
316 s16_t time_before = 0;
317 struct mqtt_request_t *iter;
318
319 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
320
321 /* Iterate trough queue to find head, and count total timeout time */
322 for (iter = *tail; iter != NULL; iter = iter->next) {
323 time_before += iter->timeout_diff;
324 head = iter;
325 }
326
327 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
328 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
329 if (head == NULL) {
330 *tail = r;
331 } else {
332 head->next = r;
333 }
334 }
335
336
337 /**
338 * Delete request item
339 * @param r Request item to delete
340 */
341 static void
mqtt_delete_request(struct mqtt_request_t * r)342 mqtt_delete_request(struct mqtt_request_t *r)
343 {
344 if (r != NULL) {
345 r->next = r;
346 }
347 }
348
349 /**
350 * Remove a request item with a specific packet identifier from request queue
351 * @param tail Pointer to request queue tail pointer
352 * @param pkt_id Packet identifier of request to take
353 * @return Request item if found, NULL if not
354 */
355 static struct mqtt_request_t *
mqtt_take_request(struct mqtt_request_t ** tail,u16_t pkt_id)356 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
357 {
358 struct mqtt_request_t *iter = NULL, *prev = NULL;
359 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
360 /* Search all request for pkt_id */
361 for (iter = *tail; iter != NULL; iter = iter->next) {
362 if (iter->pkt_id == pkt_id) {
363 break;
364 }
365 prev = iter;
366 }
367
368 /* If request was found */
369 if (iter != NULL) {
370 /* unchain */
371 if (prev == NULL) {
372 *tail = iter->next;
373 } else {
374 prev->next = iter->next;
375 }
376 /* If exists, add remaining timeout time for the request to next */
377 if (iter->next != NULL) {
378 iter->next->timeout_diff += iter->timeout_diff;
379 }
380 iter->next = NULL;
381 }
382 return iter;
383 }
384
385 /**
386 * Handle requests timeout
387 * @param tail Pointer to request queue tail pointer
388 * @param t Time since last call in seconds
389 */
390 static void
mqtt_request_time_elapsed(struct mqtt_request_t ** tail,u8_t t)391 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
392 {
393 struct mqtt_request_t *r;
394 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
395 r = *tail;
396 while (t > 0 && r != NULL) {
397 if (t >= r->timeout_diff) {
398 t -= (u8_t)r->timeout_diff;
399 /* Unchain */
400 *tail = r->next;
401 /* Notify upper layer about timeout */
402 if (r->cb != NULL) {
403 r->cb(r->arg, ERR_TIMEOUT);
404 }
405 mqtt_delete_request(r);
406 /* Tail might be be modified in callback, so re-read it in every iteration */
407 r = *(struct mqtt_request_t *const volatile *)tail;
408 } else {
409 r->timeout_diff -= t;
410 t = 0;
411 }
412 }
413 }
414
415 /**
416 * Free all request items
417 * @param tail Pointer to request queue tail pointer
418 */
419 static void
mqtt_clear_requests(struct mqtt_request_t ** tail)420 mqtt_clear_requests(struct mqtt_request_t **tail)
421 {
422 struct mqtt_request_t *iter, *next;
423 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
424 for (iter = *tail; iter != NULL; iter = next) {
425 next = iter->next;
426 mqtt_delete_request(iter);
427 }
428 *tail = NULL;
429 }
430 /**
431 * Initialize all request items
432 * @param r_objs Pointer to request objects
433 * @param r_objs_len Number of array entries
434 */
435 static void
mqtt_init_requests(struct mqtt_request_t * r_objs,size_t r_objs_len)436 mqtt_init_requests(struct mqtt_request_t *r_objs, size_t r_objs_len)
437 {
438 u8_t n;
439 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
440 for (n = 0; n < r_objs_len; n++) {
441 /* Item pointing to itself indicates unused */
442 r_objs[n].next = &r_objs[n];
443 }
444 }
445
446 /*--------------------------------------------------------------------------------------------------------------------- */
447 /* Output message build helpers */
448
449
450 static void
mqtt_output_append_u8(struct mqtt_ringbuf_t * rb,u8_t value)451 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
452 {
453 mqtt_ringbuf_put(rb, value);
454 }
455
456 static
mqtt_output_append_u16(struct mqtt_ringbuf_t * rb,u16_t value)457 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
458 {
459 mqtt_ringbuf_put(rb, value >> 8);
460 mqtt_ringbuf_put(rb, value & 0xff);
461 }
462
463 static void
mqtt_output_append_buf(struct mqtt_ringbuf_t * rb,const void * data,u16_t length)464 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
465 {
466 u16_t n;
467 for (n = 0; n < length; n++) {
468 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
469 }
470 }
471
472 static void
mqtt_output_append_string(struct mqtt_ringbuf_t * rb,const char * str,u16_t length)473 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
474 {
475 u16_t n;
476 mqtt_ringbuf_put(rb, length >> 8);
477 mqtt_ringbuf_put(rb, length & 0xff);
478 for (n = 0; n < length; n++) {
479 mqtt_ringbuf_put(rb, str[n]);
480 }
481 }
482
483 /**
484 * Append fixed header
485 * @param rb Output ring buffer
486 * @param msg_type see enum mqtt_message_type
487 * @param fdup MQTT DUP flag
488 * @param fqos MQTT QoS field
489 * @param fretain MQTT retain flag
490 * @param r_length Remaining length after fixed header
491 */
492
493 static void
mqtt_output_append_fixed_header(struct mqtt_ringbuf_t * rb,u8_t msg_type,u8_t fdup,u8_t fqos,u8_t fretain,u16_t r_length)494 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t fdup,
495 u8_t fqos, u8_t fretain, u16_t r_length)
496 {
497 /* Start with control byte */
498 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
499 /* Encode remaining length field */
500 do {
501 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
502 r_length >>= 7;
503 } while (r_length > 0);
504 }
505
506
507 /**
508 * Check output buffer space
509 * @param rb Output ring buffer
510 * @param r_length Remaining length after fixed header
511 * @return 1 if message will fit, 0 if not enough buffer space
512 */
513 static u8_t
mqtt_output_check_space(struct mqtt_ringbuf_t * rb,u16_t r_length)514 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
515 {
516 /* Start with length of type byte + remaining length */
517 u16_t total_len = 1 + r_length;
518
519 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
520
521 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
522 do {
523 total_len++;
524 r_length >>= 7;
525 } while (r_length > 0);
526
527 return (total_len <= mqtt_ringbuf_free(rb));
528 }
529
530
531 /**
532 * Close connection to server
533 * @param client MQTT client
534 * @param reason Reason for disconnection
535 */
536 static void
mqtt_close(mqtt_client_t * client,mqtt_connection_status_t reason)537 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
538 {
539 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
540
541 /* Bring down TCP connection if not already done */
542 if (client->conn != NULL) {
543 err_t res;
544 altcp_recv(client->conn, NULL);
545 altcp_err(client->conn, NULL);
546 altcp_sent(client->conn, NULL);
547 res = altcp_close(client->conn);
548 if (res != ERR_OK) {
549 altcp_abort(client->conn);
550 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_close: Close err=%s\n", lwip_strerr(res)));
551 }
552 client->conn = NULL;
553 }
554
555 /* Remove all pending requests */
556 mqtt_clear_requests(&client->pend_req_queue);
557 /* Stop cyclic timer */
558 sys_untimeout(mqtt_cyclic_timer, client);
559
560 /* Notify upper layer of disconnection if changed state */
561 if (client->conn_state != TCP_DISCONNECTED) {
562
563 client->conn_state = TCP_DISCONNECTED;
564 if (client->connect_cb != NULL) {
565 client->connect_cb(client, client->connect_arg, reason);
566 }
567 }
568 }
569
570
571 /**
572 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
573 * @param arg MQTT client
574 */
575 static void
mqtt_cyclic_timer(void * arg)576 mqtt_cyclic_timer(void *arg)
577 {
578 u8_t restart_timer = 1;
579 mqtt_client_t *client = (mqtt_client_t *)arg;
580 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
581
582 if (client->conn_state == MQTT_CONNECTING) {
583 client->cyclic_tick++;
584 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
585 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
586 /* Disconnect TCP */
587 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
588 restart_timer = 0;
589 }
590 } else if (client->conn_state == MQTT_CONNECTED) {
591 /* Handle timeout for pending requests */
592 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
593
594 /* keep_alive > 0 means keep alive functionality shall be used */
595 if (client->keep_alive > 0) {
596
597 client->server_watchdog++;
598 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
599 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive / 2)) {
600 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
601 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
602 restart_timer = 0;
603 }
604
605 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
606 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
607 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
608 if (mqtt_output_check_space(&client->output, 0) != 0) {
609 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
610 client->cyclic_tick = 0;
611 }
612 } else {
613 client->cyclic_tick++;
614 }
615 }
616 } else {
617 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
618 restart_timer = 0;
619 }
620 if (restart_timer) {
621 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, arg);
622 }
623 }
624
625
626 /**
627 * Send PUBACK, PUBREC or PUBREL response message
628 * @param client MQTT client
629 * @param msg PUBACK, PUBREC or PUBREL
630 * @param pkt_id Packet identifier
631 * @param qos QoS value
632 * @return ERR_OK if successful, ERR_MEM if out of memory
633 */
634 static err_t
pub_ack_rec_rel_response(mqtt_client_t * client,u8_t msg,u16_t pkt_id,u8_t qos)635 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
636 {
637 err_t err = ERR_OK;
638 if (mqtt_output_check_space(&client->output, 2)) {
639 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
640 mqtt_output_append_u16(&client->output, pkt_id);
641 mqtt_output_send(&client->output, client->conn);
642 } else {
643 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
644 mqtt_msg_type_to_str(msg), pkt_id));
645 err = ERR_MEM;
646 }
647 return err;
648 }
649
650 /**
651 * Subscribe response from server
652 * @param r Matching request
653 * @param result Result code from server
654 */
655 static void
mqtt_incomming_suback(struct mqtt_request_t * r,u8_t result)656 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
657 {
658 if (r->cb != NULL) {
659 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
660 }
661 }
662
663
664 /**
665 * Complete MQTT message received or buffer full
666 * @param client MQTT client
667 * @param fixed_hdr_len length of fixed header
668 * @param length length received part
669 * @param remaining_length Remaining length of complete message
670 */
671 static mqtt_connection_status_t
mqtt_message_received(mqtt_client_t * client,u8_t fixed_hdr_len,u16_t length,u32_t remaining_length,u8_t * var_hdr_payload)672 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_len, u16_t length, u32_t remaining_length,
673 u8_t *var_hdr_payload)
674 {
675 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
676
677 /* Control packet type */
678 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
679 u16_t pkt_id = 0;
680
681 LWIP_ASSERT("fixed_hdr_len <= client->msg_idx", fixed_hdr_len <= client->msg_idx);
682 LWIP_ERROR("buffer length mismatch", fixed_hdr_len + length <= MQTT_VAR_HEADER_BUFFER_LEN,
683 return MQTT_CONNECT_DISCONNECTED);
684
685 if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
686 if (client->conn_state == MQTT_CONNECTING) {
687 if (length < 2) {
688 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short CONNACK message\n"));
689 goto out_disconnect;
690 }
691 /* Get result code from CONNACK */
692 res = (mqtt_connection_status_t)var_hdr_payload[1];
693 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: Connect response code %d\n", res));
694 if (res == MQTT_CONNECT_ACCEPTED) {
695 /* Reset cyclic_tick when changing to connected state */
696 client->cyclic_tick = 0;
697 client->conn_state = MQTT_CONNECTED;
698 /* Notify upper layer */
699 if (client->connect_cb != 0) {
700 client->connect_cb(client, client->connect_arg, res);
701 }
702 }
703 } else {
704 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Received CONNACK in connected state\n"));
705 }
706 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
707 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ( "mqtt_message_received: Received PINGRESP from server\n"));
708
709 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
710 u16_t payload_offset = 0;
711 u16_t payload_length = length;
712 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
713
714 if (client->msg_idx == (u32_t)(fixed_hdr_len + length)) {
715 /* First publish message frame. Should have topic and pkt id*/
716 size_t var_hdr_payload_bufsize = sizeof(client->rx_buffer) - fixed_hdr_len;
717 u8_t *topic;
718 u16_t after_topic;
719 u8_t bkp;
720 u16_t topic_len;
721 u16_t qos_len = (qos ? 2U : 0U);
722 if (length < 2 + qos_len) {
723 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet\n"));
724 goto out_disconnect;
725 }
726 topic_len = var_hdr_payload[0];
727 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
728 if ((topic_len > length - (2 + qos_len)) ||
729 (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) {
730 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (topic)\n"));
731 goto out_disconnect;
732 }
733
734 topic = var_hdr_payload + 2;
735 after_topic = 2 + topic_len;
736 /* Check buffer length, add one byte even for QoS 0 so that zero termination will fit */
737 if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) {
738 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
739 goto out_disconnect;
740 }
741
742 /* id for QoS 1 and 2 */
743 if (qos > 0) {
744 if (length < after_topic + 2U) {
745 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short PUBLISH packet (after_topic)\n"));
746 goto out_disconnect;
747 }
748 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
749 after_topic += 2;
750 } else {
751 client->inpub_pkt_id = 0;
752 }
753 /* Take backup of byte after topic */
754 bkp = topic[topic_len];
755 /* Zero terminate string */
756 topic[topic_len] = 0;
757 /* Payload data remaining in receive buffer */
758 payload_length = length - after_topic;
759 payload_offset = after_topic;
760
761 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n",
762 qos, topic, remaining_length + payload_length));
763 if (client->pub_cb != NULL) {
764 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
765 }
766 /* Restore byte after topic */
767 topic[topic_len] = bkp;
768 }
769 if (payload_length > 0 || remaining_length == 0) {
770 if (length < (size_t)(payload_offset + payload_length)) {
771 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short packet (payload)\n"));
772 goto out_disconnect;
773 }
774 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
775 /* Reply if QoS > 0 */
776 if (remaining_length == 0 && qos > 0) {
777 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
778 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
779 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
780 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
781 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
782 }
783 }
784 } else {
785 if (length < 2) {
786 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received short message\n"));
787 goto out_disconnect;
788 }
789 /* Get packet identifier */
790 pkt_id = (u16_t)var_hdr_payload[0] << 8;
791 pkt_id |= (u16_t)var_hdr_payload[1];
792 if (pkt_id == 0) {
793 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
794 goto out_disconnect;
795 }
796 if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
797 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n", pkt_id));
798 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
799
800 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
801 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n", pkt_id));
802 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
803
804 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
805 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
806 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
807 if (r != NULL) {
808 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
809 if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
810 if (length < 3) {
811 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_message_received: To small SUBACK packet\n"));
812 goto out_disconnect;
813 } else {
814 mqtt_incomming_suback(r, var_hdr_payload[2]);
815 }
816 } else if (r->cb != NULL) {
817 r->cb(r->arg, ERR_OK);
818 }
819 mqtt_delete_request(r);
820 } else {
821 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
822 }
823 } else {
824 LWIP_DEBUGF(MQTT_DEBUG_WARN, ( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
825 goto out_disconnect;
826 }
827 }
828 return res;
829 out_disconnect:
830 return MQTT_CONNECT_DISCONNECTED;
831 }
832
833
834 /**
835 * MQTT incoming message parser
836 * @param client MQTT client
837 * @param p PBUF chain of received data
838 * @return Connection status
839 */
840 static mqtt_connection_status_t
mqtt_parse_incoming(mqtt_client_t * client,struct pbuf * p)841 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
842 {
843 u16_t in_offset = 0;
844 u32_t msg_rem_len = 0;
845 u8_t fixed_hdr_len = 0;
846 u8_t b = 0;
847
848 while (p->tot_len > in_offset) {
849 /* We ALWAYS parse the header here first. Even if the header was not
850 included in this segment, we re-parse it here by buffering it in
851 client->rx_buffer. client->msg_idx keeps track of this. */
852 if ((fixed_hdr_len < 2) || ((b & 0x80) != 0)) {
853
854 if (fixed_hdr_len < client->msg_idx) {
855 /* parse header from old pbuf (buffered in client->rx_buffer) */
856 b = client->rx_buffer[fixed_hdr_len];
857 } else {
858 /* parse header from this pbuf and save it in client->rx_buffer in case
859 it comes in segmented */
860 b = pbuf_get_at(p, in_offset++);
861 client->rx_buffer[client->msg_idx++] = b;
862 }
863 fixed_hdr_len++;
864
865 if (fixed_hdr_len >= 2) {
866 /* fixed header contains at least 2 bytes but can contain more, depending on
867 'remaining length'. All bytes but the last of this have 0x80 set to
868 indicate more bytes are coming. */
869 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_len - 2) * 7);
870 if ((b & 0x80) == 0) {
871 /* fixed header is done */
872 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
873 if (msg_rem_len == 0) {
874 /* Complete message with no extra headers of payload received */
875 mqtt_message_received(client, fixed_hdr_len, 0, 0, NULL);
876 client->msg_idx = 0;
877 fixed_hdr_len = 0;
878 } else {
879 /* Bytes remaining in message (changes remaining length if this is
880 not the first segment of this message) */
881 msg_rem_len = (msg_rem_len + fixed_hdr_len) - client->msg_idx;
882 }
883 }
884 }
885 } else {
886 /* Fixed header has been parsed, parse variable header */
887 u16_t cpy_len, buffer_space;
888 u8_t *var_hdr_payload;
889 mqtt_connection_status_t res;
890
891 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
892 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
893
894 /* Limit to available space in buffer */
895 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_len;
896 if (cpy_len > buffer_space) {
897 cpy_len = buffer_space;
898 }
899 /* Adjust cpy_len to ensure zero-copy operation for remaining parts of current message */
900 if (client->msg_idx >= MQTT_VAR_HEADER_BUFFER_LEN) {
901 if (cpy_len > (p->len - in_offset))
902 cpy_len = p->len - in_offset;
903 }
904 var_hdr_payload = (u8_t*)pbuf_get_contiguous(p, client->rx_buffer + fixed_hdr_len,
905 buffer_space, cpy_len, in_offset);
906
907 /* Advance get and put indexes */
908 client->msg_idx += cpy_len;
909 in_offset += cpy_len;
910 msg_rem_len -= cpy_len;
911
912 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_parse_incoming: msg_idx: %"U32_F", cpy_len: %"U16_F", remaining %"U32_F"\n", client->msg_idx, cpy_len, msg_rem_len));
913 /* Whole or partial message received */
914 res = mqtt_message_received(client, fixed_hdr_len, cpy_len, msg_rem_len, var_hdr_payload);
915 if (res != MQTT_CONNECT_ACCEPTED) {
916 return res;
917 }
918 if (msg_rem_len == 0) {
919 /* Reset parser state */
920 client->msg_idx = 0;
921 /* msg_tot_len = 0; */
922 fixed_hdr_len = 0;
923 }
924 }
925 }
926 return MQTT_CONNECT_ACCEPTED;
927 }
928
929
930 /**
931 * TCP received callback function. @see tcp_recv_fn
932 * @param arg MQTT client
933 * @param p PBUF chain of received data
934 * @param err Passed as return value if not ERR_OK
935 * @return ERR_OK or err passed into callback
936 */
937 static err_t
mqtt_tcp_recv_cb(void * arg,struct altcp_pcb * pcb,struct pbuf * p,err_t err)938 mqtt_tcp_recv_cb(void *arg, struct altcp_pcb *pcb, struct pbuf *p, err_t err)
939 {
940 mqtt_client_t *client = (mqtt_client_t *)arg;
941 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
942 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
943
944 if (p == NULL) {
945 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
946 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
947 } else {
948 mqtt_connection_status_t res;
949 if (err != ERR_OK) {
950 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_recv_cb: Recv err=%d\n", err));
951 pbuf_free(p);
952 return err;
953 }
954
955 /* Tell remote that data has been received */
956 altcp_recved(pcb, p->tot_len);
957 res = mqtt_parse_incoming(client, p);
958 pbuf_free(p);
959
960 if (res != MQTT_CONNECT_ACCEPTED) {
961 mqtt_close(client, res);
962 }
963 /* If keep alive functionality is used */
964 if (client->keep_alive != 0) {
965 /* Reset server alive watchdog */
966 client->server_watchdog = 0;
967 }
968
969 }
970 return ERR_OK;
971 }
972
973
974 /**
975 * TCP data sent callback function. @see tcp_sent_fn
976 * @param arg MQTT client
977 * @param tpcb TCP connection handle
978 * @param len Number of bytes sent
979 * @return ERR_OK
980 */
981 static err_t
mqtt_tcp_sent_cb(void * arg,struct altcp_pcb * tpcb,u16_t len)982 mqtt_tcp_sent_cb(void *arg, struct altcp_pcb *tpcb, u16_t len)
983 {
984 mqtt_client_t *client = (mqtt_client_t *)arg;
985
986 LWIP_UNUSED_ARG(tpcb);
987 LWIP_UNUSED_ARG(len);
988
989 if (client->conn_state == MQTT_CONNECTED) {
990 struct mqtt_request_t *r;
991
992 /* Reset keep-alive send timer and server watchdog */
993 client->cyclic_tick = 0;
994 client->server_watchdog = 0;
995 /* QoS 0 publish has no response from server, so call its callbacks here */
996 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
997 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
998 if (r->cb != NULL) {
999 r->cb(r->arg, ERR_OK);
1000 }
1001 mqtt_delete_request(r);
1002 }
1003 /* Try send any remaining buffers from output queue */
1004 mqtt_output_send(&client->output, client->conn);
1005 }
1006 return ERR_OK;
1007 }
1008
1009 /**
1010 * TCP error callback function. @see tcp_err_fn
1011 * @param arg MQTT client
1012 * @param err Error encountered
1013 */
1014 static void
mqtt_tcp_err_cb(void * arg,err_t err)1015 mqtt_tcp_err_cb(void *arg, err_t err)
1016 {
1017 mqtt_client_t *client = (mqtt_client_t *)arg;
1018 LWIP_UNUSED_ARG(err); /* only used for debug output */
1019 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
1020 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
1021 /* Set conn to null before calling close as pcb is already deallocated*/
1022 client->conn = 0;
1023 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
1024 }
1025
1026 /**
1027 * TCP poll callback function. @see tcp_poll_fn
1028 * @param arg MQTT client
1029 * @param tpcb TCP connection handle
1030 * @return err ERR_OK
1031 */
1032 static err_t
mqtt_tcp_poll_cb(void * arg,struct altcp_pcb * tpcb)1033 mqtt_tcp_poll_cb(void *arg, struct altcp_pcb *tpcb)
1034 {
1035 mqtt_client_t *client = (mqtt_client_t *)arg;
1036 if (client->conn_state == MQTT_CONNECTED) {
1037 /* Try send any remaining buffers from output queue */
1038 mqtt_output_send(&client->output, tpcb);
1039 }
1040 return ERR_OK;
1041 }
1042
1043 /**
1044 * TCP connect callback function. @see tcp_connected_fn
1045 * @param arg MQTT client
1046 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
1047 * @return ERR_OK
1048 */
1049 static err_t
mqtt_tcp_connect_cb(void * arg,struct altcp_pcb * tpcb,err_t err)1050 mqtt_tcp_connect_cb(void *arg, struct altcp_pcb *tpcb, err_t err)
1051 {
1052 mqtt_client_t *client = (mqtt_client_t *)arg;
1053
1054 if (err != ERR_OK) {
1055 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
1056 return err;
1057 }
1058
1059 /* Initiate receiver state */
1060 client->msg_idx = 0;
1061
1062 /* Setup TCP callbacks */
1063 altcp_recv(tpcb, mqtt_tcp_recv_cb);
1064 altcp_sent(tpcb, mqtt_tcp_sent_cb);
1065 altcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
1066
1067 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_tcp_connect_cb: TCP connection established to server\n"));
1068 /* Enter MQTT connect state */
1069 client->conn_state = MQTT_CONNECTING;
1070
1071 /* Start cyclic timer */
1072 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL * 1000, mqtt_cyclic_timer, client);
1073 client->cyclic_tick = 0;
1074
1075 /* Start transmission from output queue, connect message is the first one out*/
1076 mqtt_output_send(&client->output, client->conn);
1077
1078 return ERR_OK;
1079 }
1080
1081
1082
1083 /*---------------------------------------------------------------------------------------------------- */
1084 /* Public API */
1085
1086
1087 /**
1088 * @ingroup mqtt
1089 * MQTT publish function.
1090 * @param client MQTT client
1091 * @param topic Publish topic string
1092 * @param payload Data to publish (NULL is allowed)
1093 * @param payload_length Length of payload (0 is allowed)
1094 * @param qos Quality of service, 0 1 or 2
1095 * @param retain MQTT retain flag
1096 * @param cb Callback to call when publish is complete or has timed out
1097 * @param arg User supplied argument to publish callback
1098 * @return ERR_OK if successful
1099 * ERR_CONN if client is disconnected
1100 * ERR_MEM if short on memory
1101 */
1102 err_t
mqtt_publish(mqtt_client_t * client,const char * topic,const void * payload,u16_t payload_length,u8_t qos,u8_t retain,mqtt_request_cb_t cb,void * arg)1103 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1104 mqtt_request_cb_t cb, void *arg)
1105 {
1106 struct mqtt_request_t *r;
1107 u16_t pkt_id;
1108 size_t topic_strlen;
1109 size_t total_len;
1110 u16_t topic_len;
1111 u16_t remaining_length;
1112
1113 LWIP_ASSERT_CORE_LOCKED();
1114 LWIP_ASSERT("mqtt_publish: client != NULL", client);
1115 LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1116 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1117
1118 topic_strlen = strlen(topic);
1119 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1120 topic_len = (u16_t)topic_strlen;
1121 total_len = 2 + topic_len + payload_length;
1122
1123 if (qos > 0) {
1124 total_len += 2;
1125 /* Generate pkt_id id for QoS1 and 2 */
1126 pkt_id = msg_generate_packet_id(client);
1127 } else {
1128 /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1129 pkt_id = 0;
1130 }
1131 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1132 remaining_length = (u16_t)total_len;
1133
1134 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1135
1136 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1137 if (r == NULL) {
1138 return ERR_MEM;
1139 }
1140
1141 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1142 mqtt_delete_request(r);
1143 return ERR_MEM;
1144 }
1145 /* Append fixed header */
1146 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1147
1148 /* Append Topic */
1149 mqtt_output_append_string(&client->output, topic, topic_len);
1150
1151 /* Append packet if for QoS 1 and 2*/
1152 if (qos > 0) {
1153 mqtt_output_append_u16(&client->output, pkt_id);
1154 }
1155
1156 /* Append optional publish payload */
1157 if ((payload != NULL) && (payload_length > 0)) {
1158 mqtt_output_append_buf(&client->output, payload, payload_length);
1159 }
1160
1161 mqtt_append_request(&client->pend_req_queue, r);
1162 mqtt_output_send(&client->output, client->conn);
1163 return ERR_OK;
1164 }
1165
1166
1167 /**
1168 * @ingroup mqtt
1169 * MQTT subscribe/unsubscribe function.
1170 * @param client MQTT client
1171 * @param topic topic to subscribe to
1172 * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1173 * @param cb Callback to call when subscribe/unsubscribe reponse is received
1174 * @param arg User supplied argument to publish callback
1175 * @param sub 1 for subscribe, 0 for unsubscribe
1176 * @return ERR_OK if successful, @see err_t enum for other results
1177 */
1178 err_t
mqtt_sub_unsub(mqtt_client_t * client,const char * topic,u8_t qos,mqtt_request_cb_t cb,void * arg,u8_t sub)1179 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1180 {
1181 size_t topic_strlen;
1182 size_t total_len;
1183 u16_t topic_len;
1184 u16_t remaining_length;
1185 u16_t pkt_id;
1186 struct mqtt_request_t *r;
1187
1188 LWIP_ASSERT_CORE_LOCKED();
1189 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1190 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1191
1192 topic_strlen = strlen(topic);
1193 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1194 topic_len = (u16_t)topic_strlen;
1195 /* Topic string, pkt_id, qos for subscribe */
1196 total_len = topic_len + 2 + 2 + (sub != 0);
1197 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1198 remaining_length = (u16_t)total_len;
1199
1200 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1201 if (client->conn_state == TCP_DISCONNECTED) {
1202 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1203 return ERR_CONN;
1204 }
1205
1206 pkt_id = msg_generate_packet_id(client);
1207 r = mqtt_create_request(client->req_list, LWIP_ARRAYSIZE(client->req_list), pkt_id, cb, arg);
1208 if (r == NULL) {
1209 return ERR_MEM;
1210 }
1211
1212 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1213 mqtt_delete_request(r);
1214 return ERR_MEM;
1215 }
1216
1217 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1218
1219 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1220 /* Packet id */
1221 mqtt_output_append_u16(&client->output, pkt_id);
1222 /* Topic */
1223 mqtt_output_append_string(&client->output, topic, topic_len);
1224 /* QoS */
1225 if (sub != 0) {
1226 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1227 }
1228
1229 mqtt_append_request(&client->pend_req_queue, r);
1230 mqtt_output_send(&client->output, client->conn);
1231 return ERR_OK;
1232 }
1233
1234
1235 /**
1236 * @ingroup mqtt
1237 * Set callback to handle incoming publish requests from server
1238 * @param client MQTT client
1239 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1240 * @param data_cb Callback for each fragment of payload that arrives
1241 * @param arg User supplied argument to both callbacks
1242 */
1243 void
mqtt_set_inpub_callback(mqtt_client_t * client,mqtt_incoming_publish_cb_t pub_cb,mqtt_incoming_data_cb_t data_cb,void * arg)1244 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1245 mqtt_incoming_data_cb_t data_cb, void *arg)
1246 {
1247 LWIP_ASSERT_CORE_LOCKED();
1248 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1249 client->data_cb = data_cb;
1250 client->pub_cb = pub_cb;
1251 client->inpub_arg = arg;
1252 }
1253
1254 /**
1255 * @ingroup mqtt
1256 * Create a new MQTT client instance
1257 * @return Pointer to instance on success, NULL otherwise
1258 */
1259 mqtt_client_t *
mqtt_client_new(void)1260 mqtt_client_new(void)
1261 {
1262 LWIP_ASSERT_CORE_LOCKED();
1263 return (mqtt_client_t *)mem_calloc(1, sizeof(mqtt_client_t));
1264 }
1265
1266 /**
1267 * @ingroup mqtt
1268 * Free MQTT client instance
1269 * @param client Pointer to instance to be freed
1270 */
1271 void
mqtt_client_free(mqtt_client_t * client)1272 mqtt_client_free(mqtt_client_t *client)
1273 {
1274 mem_free(client);
1275 }
1276
1277 /**
1278 * @ingroup mqtt
1279 * Connect to MQTT server
1280 * @param client MQTT client
1281 * @param ip_addr Server IP
1282 * @param port Server port
1283 * @param cb Connection state change callback
1284 * @param arg User supplied argument to connection callback
1285 * @param client_info Client identification and connection options
1286 * @return ERR_OK if successful, @see err_t enum for other results
1287 */
1288 err_t
mqtt_client_connect(mqtt_client_t * client,const ip_addr_t * ip_addr,u16_t port,mqtt_connection_cb_t cb,void * arg,const struct mqtt_connect_client_info_t * client_info)1289 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1290 const struct mqtt_connect_client_info_t *client_info)
1291 {
1292 err_t err;
1293 size_t len;
1294 u16_t client_id_length;
1295 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1296 u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1297 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1298 u16_t client_user_len = 0, client_pass_len = 0;
1299
1300 LWIP_ASSERT_CORE_LOCKED();
1301 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1302 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1303 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1304 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1305
1306 if (client->conn_state != TCP_DISCONNECTED) {
1307 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Already connected\n"));
1308 return ERR_ISCONN;
1309 }
1310
1311 /* Wipe clean */
1312 memset(client, 0, sizeof(mqtt_client_t));
1313 client->connect_arg = arg;
1314 client->connect_cb = cb;
1315 client->keep_alive = client_info->keep_alive;
1316 mqtt_init_requests(client->req_list, LWIP_ARRAYSIZE(client->req_list));
1317
1318 /* Build connect message */
1319 if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1320 flags |= MQTT_CONNECT_FLAG_WILL;
1321 flags |= (client_info->will_qos & 3) << 3;
1322 if (client_info->will_retain) {
1323 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1324 }
1325 len = strlen(client_info->will_topic);
1326 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1327 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1328 will_topic_len = (u8_t)len;
1329 len = strlen(client_info->will_msg);
1330 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1331 will_msg_len = (u8_t)len;
1332 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1333 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1334 remaining_length = (u16_t)len;
1335 }
1336 if (client_info->client_user != NULL) {
1337 flags |= MQTT_CONNECT_FLAG_USERNAME;
1338 len = strlen(client_info->client_user);
1339 LWIP_ERROR("mqtt_client_connect: client_info->client_user length overflow", len <= 0xFFFF, return ERR_VAL);
1340 LWIP_ERROR("mqtt_client_connect: client_info->client_user length must be > 0", len > 0, return ERR_VAL);
1341 client_user_len = (u16_t)len;
1342 len = remaining_length + 2 + client_user_len;
1343 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1344 remaining_length = (u16_t)len;
1345 }
1346 if (client_info->client_pass != NULL) {
1347 flags |= MQTT_CONNECT_FLAG_PASSWORD;
1348 len = strlen(client_info->client_pass);
1349 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length overflow", len <= 0xFFFF, return ERR_VAL);
1350 LWIP_ERROR("mqtt_client_connect: client_info->client_pass length must be > 0", len > 0, return ERR_VAL);
1351 client_pass_len = (u16_t)len;
1352 len = remaining_length + 2 + client_pass_len;
1353 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1354 remaining_length = (u16_t)len;
1355 }
1356
1357 /* Don't complicate things, always connect using clean session */
1358 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1359
1360 len = strlen(client_info->client_id);
1361 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1362 client_id_length = (u16_t)len;
1363 len = remaining_length + 2 + client_id_length;
1364 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1365 remaining_length = (u16_t)len;
1366
1367 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1368 return ERR_MEM;
1369 }
1370
1371 #if LWIP_ALTCP && LWIP_ALTCP_TLS
1372 if (client_info->tls_config) {
1373 client->conn = altcp_tls_new(client_info->tls_config, IP_GET_TYPE(ip_addr));
1374 } else
1375 #endif
1376 {
1377 client->conn = altcp_tcp_new_ip_type(IP_GET_TYPE(ip_addr));
1378 }
1379 if (client->conn == NULL) {
1380 return ERR_MEM;
1381 }
1382
1383 /* Set arg pointer for callbacks */
1384 altcp_arg(client->conn, client);
1385 /* Any local address, pick random local port number */
1386 err = altcp_bind(client->conn, IP_ADDR_ANY, 0);
1387 if (err != ERR_OK) {
1388 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1389 goto tcp_fail;
1390 }
1391 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1392
1393 /* Connect to server */
1394 err = altcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1395 if (err != ERR_OK) {
1396 LWIP_DEBUGF(MQTT_DEBUG_TRACE, ("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1397 goto tcp_fail;
1398 }
1399 /* Set error callback */
1400 altcp_err(client->conn, mqtt_tcp_err_cb);
1401 client->conn_state = TCP_CONNECTING;
1402
1403 /* Append fixed header */
1404 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1405 /* Append Protocol string */
1406 mqtt_output_append_string(&client->output, "MQTT", 4);
1407 /* Append Protocol level */
1408 mqtt_output_append_u8(&client->output, 4);
1409 /* Append connect flags */
1410 mqtt_output_append_u8(&client->output, flags);
1411 /* Append keep-alive */
1412 mqtt_output_append_u16(&client->output, client_info->keep_alive);
1413 /* Append client id */
1414 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1415 /* Append will message if used */
1416 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1417 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1418 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1419 }
1420 /* Append user name if given */
1421 if ((flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
1422 mqtt_output_append_string(&client->output, client_info->client_user, client_user_len);
1423 }
1424 /* Append password if given */
1425 if ((flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
1426 mqtt_output_append_string(&client->output, client_info->client_pass, client_pass_len);
1427 }
1428 return ERR_OK;
1429
1430 tcp_fail:
1431 altcp_abort(client->conn);
1432 client->conn = NULL;
1433 return err;
1434 }
1435
1436
1437 /**
1438 * @ingroup mqtt
1439 * Disconnect from MQTT server
1440 * @param client MQTT client
1441 */
1442 void
mqtt_disconnect(mqtt_client_t * client)1443 mqtt_disconnect(mqtt_client_t *client)
1444 {
1445 LWIP_ASSERT_CORE_LOCKED();
1446 LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1447 /* If connection in not already closed */
1448 if (client->conn_state != TCP_DISCONNECTED) {
1449 /* Set conn_state before calling mqtt_close to prevent callback from being called */
1450 client->conn_state = TCP_DISCONNECTED;
1451 mqtt_close(client, (mqtt_connection_status_t)0);
1452 }
1453 }
1454
1455 /**
1456 * @ingroup mqtt
1457 * Check connection with server
1458 * @param client MQTT client
1459 * @return 1 if connected to server, 0 otherwise
1460 */
1461 u8_t
mqtt_client_is_connected(mqtt_client_t * client)1462 mqtt_client_is_connected(mqtt_client_t *client)
1463 {
1464 LWIP_ASSERT_CORE_LOCKED();
1465 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1466 return client->conn_state == MQTT_CONNECTED;
1467 }
1468
1469 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
1470