1 /**
2 * @file
3 * MQTT client
4 *
5 * @defgroup mqtt MQTT client
6 * @ingroup apps
7 * @verbinclude mqtt_client.txt
8 */
9
10 /*
11 * Copyright (c) 2016 Erik Andersson <[email protected]>
12 * All rights reserved.
13 *
14 * Redistribution and use in source and binary forms, with or without modification,
15 * are permitted provided that the following conditions are met:
16 *
17 * 1. Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * 3. The name of the author may not be used to endorse or promote products
23 * derived from this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34 * OF SUCH DAMAGE.
35 *
36 * This file is part of the lwIP TCP/IP stack
37 *
38 * Author: Erik Andersson <[email protected]>
39 *
40 *
41 * @todo:
42 * - Handle large outgoing payloads for PUBLISH messages
43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44 * - Add support for legacy MQTT protocol version
45 *
46 * Please coordinate changes and requests with Erik Andersson
47 * Erik Andersson <[email protected]>
48 *
49 */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/timeouts.h"
52 #include "lwip/ip_addr.h"
53 #include "lwip/mem.h"
54 #include "lwip/err.h"
55 #include "lwip/pbuf.h"
56 #include "lwip/tcp.h"
57 #include <string.h>
58
59 #if LWIP_TCP && LWIP_CALLBACK_API
60
61 /**
62 * MQTT_DEBUG: Default is off.
63 */
64 #if !defined MQTT_DEBUG || defined __DOXYGEN__
65 #define MQTT_DEBUG LWIP_DBG_OFF
66 #endif
67
68 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE)
69 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE)
70 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
71 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
72 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
73
74 static void mqtt_cyclic_timer(void *arg);
75
76 /**
77 * MQTT client connection states
78 */
79 enum {
80 TCP_DISCONNECTED,
81 TCP_CONNECTING,
82 MQTT_CONNECTING,
83 MQTT_CONNECTED
84 };
85
86 /**
87 * MQTT control message types
88 */
89 enum mqtt_message_type {
90 MQTT_MSG_TYPE_CONNECT = 1,
91 MQTT_MSG_TYPE_CONNACK = 2,
92 MQTT_MSG_TYPE_PUBLISH = 3,
93 MQTT_MSG_TYPE_PUBACK = 4,
94 MQTT_MSG_TYPE_PUBREC = 5,
95 MQTT_MSG_TYPE_PUBREL = 6,
96 MQTT_MSG_TYPE_PUBCOMP = 7,
97 MQTT_MSG_TYPE_SUBSCRIBE = 8,
98 MQTT_MSG_TYPE_SUBACK = 9,
99 MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
100 MQTT_MSG_TYPE_UNSUBACK = 11,
101 MQTT_MSG_TYPE_PINGREQ = 12,
102 MQTT_MSG_TYPE_PINGRESP = 13,
103 MQTT_MSG_TYPE_DISCONNECT = 14
104 };
105
106 /** Helpers to extract control packet type and qos from first byte in fixed header */
107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
109
110 /**
111 * MQTT connect flags, only used in CONNECT message
112 */
113 enum mqtt_connect_flag {
114 MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
115 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
116 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
117 MQTT_CONNECT_FLAG_WILL = 1 << 2,
118 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
119 };
120
121
122 #if defined(LWIP_DEBUG)
123 static const char * const mqtt_message_type_str[15] =
124 {
125 "UNDEFINED",
126 "CONNECT",
127 "CONNACK",
128 "PUBLISH",
129 "PUBACK",
130 "PUBREC",
131 "PUBREL",
132 "PUBCOMP",
133 "SUBSCRIBE",
134 "SUBACK",
135 "UNSUBSCRIBE",
136 "UNSUBACK",
137 "PINGREQ",
138 "PINGRESP",
139 "DISCONNECT"
140 };
141
142 /**
143 * Message type value to string
144 * @param msg_type see enum mqtt_message_type
145 *
146 * @return Control message type text string
147 */
148 static const char *
mqtt_msg_type_to_str(u8_t msg_type)149 mqtt_msg_type_to_str(u8_t msg_type)
150 {
151 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
152 msg_type = 0;
153 }
154 return mqtt_message_type_str[msg_type];
155 }
156
157 #endif
158
159
160 /**
161 * Generate MQTT packet identifier
162 * @param client MQTT client
163 * @return New packet identifier, range 1 to 65535
164 */
165 static u16_t
msg_generate_packet_id(mqtt_client_t * client)166 msg_generate_packet_id(mqtt_client_t *client)
167 {
168 client->pkt_id_seq++;
169 if (client->pkt_id_seq == 0) {
170 client->pkt_id_seq++;
171 }
172 return client->pkt_id_seq;
173 }
174
175 /*--------------------------------------------------------------------------------------------------------------------- */
176 /* Output ring buffer */
177
178
179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
180
181 /** Add single item to ring buffer */
182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
183
184 /** Return number of bytes in ring buffer */
185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
186
187 /** Return number of bytes free in ring buffer */
188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
189
190 /** Return number of bytes possible to read without wrapping around */
191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
192
193 /** Return pointer to ring buffer get position */
194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
195
196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
197
198
199 /**
200 * Try send as many bytes as possible from output ring buffer
201 * @param rb Output ring buffer
202 * @param tpcb TCP connection handle
203 */
204 static void
mqtt_output_send(struct mqtt_ringbuf_t * rb,struct tcp_pcb * tpcb)205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
206 {
207 err_t err;
208 u8_t wrap = 0;
209 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
210 u16_t send_len = tcp_sndbuf(tpcb);
211 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
212
213 if (send_len == 0 || ringbuf_lin_len == 0) {
214 return;
215 }
216
217 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
218 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
219
220 if (send_len > ringbuf_lin_len) {
221 /* Space in TCP output buffer is larger than available in ring buffer linear portion */
222 send_len = ringbuf_lin_len;
223 /* Wrap around if more data in ring buffer after linear portion */
224 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
225 }
226 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
227 if ((err == ERR_OK) && wrap) {
228 mqtt_ringbuf_advance_get_idx(rb, send_len);
229 /* Use the lesser one of ring buffer linear length and TCP send buffer size */
230 send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
231 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
232 }
233
234 if (err == ERR_OK) {
235 mqtt_ringbuf_advance_get_idx(rb, send_len);
236 /* Flush */
237 tcp_output(tpcb);
238 } else {
239 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
240 }
241 }
242
243
244
245 /*--------------------------------------------------------------------------------------------------------------------- */
246 /* Request queue */
247
248 /**
249 * Create request item
250 * @param r_objs Pointer to request objects
251 * @param pkt_id Packet identifier of request
252 * @param cb Packet callback to call when requests lifetime ends
253 * @param arg Parameter following callback
254 * @return Request or NULL if failed to create
255 */
256 static struct mqtt_request_t *
mqtt_create_request(struct mqtt_request_t * r_objs,u16_t pkt_id,mqtt_request_cb_t cb,void * arg)257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
258 {
259 struct mqtt_request_t *r = NULL;
260 u8_t n;
261 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
262 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
263 /* Item point to itself if not in use */
264 if (r_objs[n].next == &r_objs[n]) {
265 r = &r_objs[n];
266 r->next = NULL;
267 r->cb = cb;
268 r->arg = arg;
269 r->pkt_id = pkt_id;
270 break;
271 }
272 }
273 return r;
274 }
275
276
277 /**
278 * Append request to pending request queue
279 * @param tail Pointer to request queue tail pointer
280 * @param r Request to append
281 */
282 static void
mqtt_append_request(struct mqtt_request_t ** tail,struct mqtt_request_t * r)283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
284 {
285 struct mqtt_request_t *head = NULL;
286 s16_t time_before = 0;
287 struct mqtt_request_t *iter;
288
289 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
290
291 /* Iterate trough queue to find head, and count total timeout time */
292 for (iter = *tail; iter != NULL; iter = iter->next) {
293 time_before += iter->timeout_diff;
294 head = iter;
295 }
296
297 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
298 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
299 if (head == NULL) {
300 *tail = r;
301 } else {
302 head->next = r;
303 }
304 }
305
306
307 /**
308 * Delete request item
309 * @param r Request item to delete
310 */
311 static void
mqtt_delete_request(struct mqtt_request_t * r)312 mqtt_delete_request(struct mqtt_request_t *r)
313 {
314 if (r != NULL) {
315 r->next = r;
316 }
317 }
318
319 /**
320 * Remove a request item with a specific packet identifier from request queue
321 * @param tail Pointer to request queue tail pointer
322 * @param pkt_id Packet identifier of request to take
323 * @return Request item if found, NULL if not
324 */
325 static struct mqtt_request_t *
mqtt_take_request(struct mqtt_request_t ** tail,u16_t pkt_id)326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
327 {
328 struct mqtt_request_t *iter = NULL, *prev = NULL;
329 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
330 /* Search all request for pkt_id */
331 for (iter = *tail; iter != NULL; iter = iter->next) {
332 if (iter->pkt_id == pkt_id) {
333 break;
334 }
335 prev = iter;
336 }
337
338 /* If request was found */
339 if (iter != NULL) {
340 /* unchain */
341 if (prev == NULL) {
342 *tail= iter->next;
343 } else {
344 prev->next = iter->next;
345 }
346 /* If exists, add remaining timeout time for the request to next */
347 if (iter->next != NULL) {
348 iter->next->timeout_diff += iter->timeout_diff;
349 }
350 iter->next = NULL;
351 }
352 return iter;
353 }
354
355 /**
356 * Handle requests timeout
357 * @param tail Pointer to request queue tail pointer
358 * @param t Time since last call in seconds
359 */
360 static void
mqtt_request_time_elapsed(struct mqtt_request_t ** tail,u8_t t)361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
362 {
363 struct mqtt_request_t *r = *tail;
364 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
365 while (t > 0 && r != NULL) {
366 if (t >= r->timeout_diff) {
367 t -= (u8_t)r->timeout_diff;
368 /* Unchain */
369 *tail = r->next;
370 /* Notify upper layer about timeout */
371 if (r->cb != NULL) {
372 r->cb(r->arg, ERR_TIMEOUT);
373 }
374 mqtt_delete_request(r);
375 /* Tail might be be modified in callback, so re-read it in every iteration */
376 r = *(struct mqtt_request_t * const volatile *)tail;
377 } else {
378 r->timeout_diff -= t;
379 t = 0;
380 }
381 }
382 }
383
384 /**
385 * Free all request items
386 * @param tail Pointer to request queue tail pointer
387 */
388 static void
mqtt_clear_requests(struct mqtt_request_t ** tail)389 mqtt_clear_requests(struct mqtt_request_t **tail)
390 {
391 struct mqtt_request_t *iter, *next;
392 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
393 for (iter = *tail; iter != NULL; iter = next) {
394 next = iter->next;
395 mqtt_delete_request(iter);
396 }
397 *tail = NULL;
398 }
399 /**
400 * Initialize all request items
401 * @param r_objs Pointer to request objects
402 */
403 static void
mqtt_init_requests(struct mqtt_request_t * r_objs)404 mqtt_init_requests(struct mqtt_request_t *r_objs)
405 {
406 u8_t n;
407 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
408 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
409 /* Item pointing to itself indicates unused */
410 r_objs[n].next = &r_objs[n];
411 }
412 }
413
414 /*--------------------------------------------------------------------------------------------------------------------- */
415 /* Output message build helpers */
416
417
418 static void
mqtt_output_append_u8(struct mqtt_ringbuf_t * rb,u8_t value)419 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
420 {
421 mqtt_ringbuf_put(rb, value);
422 }
423
424 static
mqtt_output_append_u16(struct mqtt_ringbuf_t * rb,u16_t value)425 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
426 {
427 mqtt_ringbuf_put(rb, value >> 8);
428 mqtt_ringbuf_put(rb, value & 0xff);
429 }
430
431 static void
mqtt_output_append_buf(struct mqtt_ringbuf_t * rb,const void * data,u16_t length)432 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
433 {
434 u16_t n;
435 for (n = 0; n < length; n++) {
436 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
437 }
438 }
439
440 static void
mqtt_output_append_string(struct mqtt_ringbuf_t * rb,const char * str,u16_t length)441 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
442 {
443 u16_t n;
444 mqtt_ringbuf_put(rb, length >> 8);
445 mqtt_ringbuf_put(rb, length & 0xff);
446 for (n = 0; n < length; n++) {
447 mqtt_ringbuf_put(rb, str[n]);
448 }
449 }
450
451 /**
452 * Append fixed header
453 * @param rb Output ring buffer
454 * @param msg_type see enum mqtt_message_type
455 * @param dup MQTT DUP flag
456 * @param qos MQTT QoS field
457 * @param retain MQTT retain flag
458 * @param r_length Remaining length after fixed header
459 */
460
461 static void
mqtt_output_append_fixed_header(struct mqtt_ringbuf_t * rb,u8_t msg_type,u8_t dup,u8_t qos,u8_t retain,u16_t r_length)462 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
463 u8_t qos, u8_t retain, u16_t r_length)
464 {
465 /* Start with control byte */
466 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
467 /* Encode remaining length field */
468 do {
469 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
470 r_length >>= 7;
471 } while (r_length > 0);
472 }
473
474
475 /**
476 * Check output buffer space
477 * @param rb Output ring buffer
478 * @param r_length Remaining length after fixed header
479 * @return 1 if message will fit, 0 if not enough buffer space
480 */
481 static u8_t
mqtt_output_check_space(struct mqtt_ringbuf_t * rb,u16_t r_length)482 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
483 {
484 /* Start with length of type byte + remaining length */
485 u16_t total_len = 1 + r_length;
486
487 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
488
489 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
490 do {
491 total_len++;
492 r_length >>= 7;
493 } while (r_length > 0);
494
495 return (total_len <= mqtt_ringbuf_free(rb));
496 }
497
498
499 /**
500 * Close connection to server
501 * @param client MQTT client
502 * @param reason Reason for disconnection
503 */
504 static void
mqtt_close(mqtt_client_t * client,mqtt_connection_status_t reason)505 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
506 {
507 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
508
509 /* Bring down TCP connection if not already done */
510 if (client->conn != NULL) {
511 err_t res;
512 tcp_recv(client->conn, NULL);
513 tcp_err(client->conn, NULL);
514 tcp_sent(client->conn, NULL);
515 res = tcp_close(client->conn);
516 if (res != ERR_OK) {
517 tcp_abort(client->conn);
518 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
519 }
520 client->conn = NULL;
521 }
522
523 /* Remove all pending requests */
524 mqtt_clear_requests(&client->pend_req_queue);
525 /* Stop cyclic timer */
526 sys_untimeout(mqtt_cyclic_timer, client);
527
528 /* Notify upper layer of disconnection if changed state */
529 if (client->conn_state != TCP_DISCONNECTED) {
530
531 client->conn_state = TCP_DISCONNECTED;
532 if (client->connect_cb != NULL) {
533 client->connect_cb(client, client->connect_arg, reason);
534 }
535 }
536 }
537
538
539 /**
540 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
541 * @param arg MQTT client
542 */
543 static void
mqtt_cyclic_timer(void * arg)544 mqtt_cyclic_timer(void *arg)
545 {
546 u8_t restart_timer = 1;
547 mqtt_client_t *client = (mqtt_client_t *)arg;
548 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
549
550 if (client->conn_state == MQTT_CONNECTING) {
551 client->cyclic_tick++;
552 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
553 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
554 /* Disconnect TCP */
555 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
556 restart_timer = 0;
557 }
558 } else if (client->conn_state == MQTT_CONNECTED) {
559 /* Handle timeout for pending requests */
560 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
561
562 /* keep_alive > 0 means keep alive functionality shall be used */
563 if (client->keep_alive > 0) {
564
565 client->server_watchdog++;
566 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
567 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
568 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
569 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
570 restart_timer = 0;
571 }
572
573 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
574 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
575 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
576 if (mqtt_output_check_space(&client->output, 0) != 0) {
577 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
578 client->cyclic_tick = 0;
579 }
580 } else {
581 client->cyclic_tick++;
582 }
583 }
584 } else {
585 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
586 restart_timer = 0;
587 }
588 if (restart_timer) {
589 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
590 }
591 }
592
593
594 /**
595 * Send PUBACK, PUBREC or PUBREL response message
596 * @param client MQTT client
597 * @param msg PUBACK, PUBREC or PUBREL
598 * @param pkt_id Packet identifier
599 * @param qos QoS value
600 * @return ERR_OK if successful, ERR_MEM if out of memory
601 */
602 static err_t
pub_ack_rec_rel_response(mqtt_client_t * client,u8_t msg,u16_t pkt_id,u8_t qos)603 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
604 {
605 err_t err = ERR_OK;
606 if (mqtt_output_check_space(&client->output, 2)) {
607 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
608 mqtt_output_append_u16(&client->output, pkt_id);
609 mqtt_output_send(&client->output, client->conn);
610 } else {
611 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
612 mqtt_msg_type_to_str(msg), pkt_id));
613 err = ERR_MEM;
614 }
615 return err;
616 }
617
618 /**
619 * Subscribe response from server
620 * @param r Matching request
621 * @param result Result code from server
622 */
623 static void
mqtt_incomming_suback(struct mqtt_request_t * r,u8_t result)624 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
625 {
626 if (r->cb != NULL) {
627 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
628 }
629 }
630
631
632 /**
633 * Complete MQTT message received or buffer full
634 * @param client MQTT client
635 * @param fixed_hdr_idx header index
636 * @param length length received part
637 * @param remaining_length Remaining length of complete message
638 */
639 static mqtt_connection_status_t
mqtt_message_received(mqtt_client_t * client,u8_t fixed_hdr_idx,u16_t length,u32_t remaining_length)640 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
641 {
642 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
643
644 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
645
646 /* Control packet type */
647 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
648 u16_t pkt_id = 0;
649
650 if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
651 if (client->conn_state == MQTT_CONNECTING) {
652 /* Get result code from CONNACK */
653 res = (mqtt_connection_status_t)var_hdr_payload[1];
654 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
655 if (res == MQTT_CONNECT_ACCEPTED) {
656 /* Reset cyclic_tick when changing to connected state */
657 client->cyclic_tick = 0;
658 client->conn_state = MQTT_CONNECTED;
659 /* Notify upper layer */
660 if (client->connect_cb != 0) {
661 client->connect_cb(client, client->connect_arg, res);
662 }
663 }
664 } else {
665 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
666 }
667 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
668 LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
669
670 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
671 u16_t payload_offset = 0;
672 u16_t payload_length = length;
673 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
674
675 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
676 /* Should have topic and pkt id*/
677 uint8_t *topic;
678 uint16_t after_topic;
679 u8_t bkp;
680 u16_t topic_len = var_hdr_payload[0];
681 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
682
683 topic = var_hdr_payload + 2;
684 after_topic = 2 + topic_len;
685 /* Check length, add one byte even for QoS 0 so that zero termination will fit */
686 if ((after_topic + (qos? 2 : 1)) > length) {
687 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
688 goto out_disconnect;
689 }
690
691 /* id for QoS 1 and 2 */
692 if (qos > 0) {
693 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
694 after_topic += 2;
695 } else {
696 client->inpub_pkt_id = 0;
697 }
698 /* Take backup of byte after topic */
699 bkp = topic[topic_len];
700 /* Zero terminate string */
701 topic[topic_len] = 0;
702 /* Payload data remaining in receive buffer */
703 payload_length = length - after_topic;
704 payload_offset = after_topic;
705
706 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
707 qos, topic, remaining_length + payload_length));
708 if (client->pub_cb != NULL) {
709 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
710 }
711 /* Restore byte after topic */
712 topic[topic_len] = bkp;
713 }
714 if (payload_length > 0 || remaining_length == 0) {
715 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
716 /* Reply if QoS > 0 */
717 if (remaining_length == 0 && qos > 0) {
718 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
719 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
720 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
721 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
722 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
723 }
724 }
725 } else {
726 /* Get packet identifier */
727 pkt_id = (u16_t)var_hdr_payload[0] << 8;
728 pkt_id |= (u16_t)var_hdr_payload[1];
729 if (pkt_id == 0) {
730 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
731 goto out_disconnect;
732 }
733 if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
734 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
735 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
736
737 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
738 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
739 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
740
741 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
742 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
743 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
744 if (r != NULL) {
745 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
746 if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
747 if (length < 3) {
748 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
749 goto out_disconnect;
750 } else {
751 mqtt_incomming_suback(r, var_hdr_payload[2]);
752 }
753 } else if (r->cb != NULL) {
754 r->cb(r->arg, ERR_OK);
755 }
756 mqtt_delete_request(r);
757 } else {
758 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
759 }
760 } else {
761 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
762 goto out_disconnect;
763 }
764 }
765 return res;
766 out_disconnect:
767 return MQTT_CONNECT_DISCONNECTED;
768 }
769
770
771 /**
772 * MQTT incoming message parser
773 * @param client MQTT client
774 * @param p PBUF chain of received data
775 * @return Connection status
776 */
777 static mqtt_connection_status_t
mqtt_parse_incoming(mqtt_client_t * client,struct pbuf * p)778 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
779 {
780 u16_t in_offset = 0;
781 u32_t msg_rem_len = 0;
782 u8_t fixed_hdr_idx = 0;
783 u8_t b = 0;
784
785 while (p->tot_len > in_offset) {
786 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
787
788 if (fixed_hdr_idx < client->msg_idx) {
789 b = client->rx_buffer[fixed_hdr_idx];
790 } else {
791 b = pbuf_get_at(p, in_offset++);
792 client->rx_buffer[client->msg_idx++] = b;
793 }
794 fixed_hdr_idx++;
795
796 if (fixed_hdr_idx >= 2) {
797 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
798 if ((b & 0x80) == 0) {
799 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
800 if (msg_rem_len == 0) {
801 /* Complete message with no extra headers of payload received */
802 mqtt_message_received(client, fixed_hdr_idx, 0, 0);
803 client->msg_idx = 0;
804 fixed_hdr_idx = 0;
805 } else {
806 /* Bytes remaining in message */
807 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
808 }
809 }
810 }
811 } else {
812 u16_t cpy_len, cpy_start, buffer_space;
813
814 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
815
816 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
817 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
818
819 /* Limit to available space in buffer */
820 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
821 if (cpy_len > buffer_space) {
822 cpy_len = buffer_space;
823 }
824 pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
825
826 /* Advance get and put indexes */
827 client->msg_idx += cpy_len;
828 in_offset += cpy_len;
829 msg_rem_len -= cpy_len;
830
831 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
832 if (msg_rem_len == 0 || cpy_len == buffer_space) {
833 /* Whole message received or buffer is full */
834 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
835 if (res != MQTT_CONNECT_ACCEPTED) {
836 return res;
837 }
838 if (msg_rem_len == 0) {
839 /* Reset parser state */
840 client->msg_idx = 0;
841 /* msg_tot_len = 0; */
842 fixed_hdr_idx = 0;
843 }
844 }
845 }
846 }
847 return MQTT_CONNECT_ACCEPTED;
848 }
849
850
851 /**
852 * TCP received callback function. @see tcp_recv_fn
853 * @param arg MQTT client
854 * @param p PBUF chain of received data
855 * @param err Passed as return value if not ERR_OK
856 * @return ERR_OK or err passed into callback
857 */
858 static err_t
mqtt_tcp_recv_cb(void * arg,struct tcp_pcb * pcb,struct pbuf * p,err_t err)859 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
860 {
861 mqtt_client_t *client = (mqtt_client_t *)arg;
862 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
863 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
864
865 if (p == NULL) {
866 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
867 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
868 } else {
869 mqtt_connection_status_t res;
870 if (err != ERR_OK) {
871 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
872 pbuf_free(p);
873 return err;
874 }
875
876 /* Tell remote that data has been received */
877 tcp_recved(pcb, p->tot_len);
878 res = mqtt_parse_incoming(client, p);
879 pbuf_free(p);
880
881 if (res != MQTT_CONNECT_ACCEPTED) {
882 mqtt_close(client, res);
883 }
884 /* If keep alive functionality is used */
885 if (client->keep_alive != 0) {
886 /* Reset server alive watchdog */
887 client->server_watchdog = 0;
888 }
889
890 }
891 return ERR_OK;
892 }
893
894
895 /**
896 * TCP data sent callback function. @see tcp_sent_fn
897 * @param arg MQTT client
898 * @param tpcb TCP connection handle
899 * @param len Number of bytes sent
900 * @return ERR_OK
901 */
902 static err_t
mqtt_tcp_sent_cb(void * arg,struct tcp_pcb * tpcb,u16_t len)903 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
904 {
905 mqtt_client_t *client = (mqtt_client_t *)arg;
906
907 LWIP_UNUSED_ARG(tpcb);
908 LWIP_UNUSED_ARG(len);
909
910 if (client->conn_state == MQTT_CONNECTED) {
911 struct mqtt_request_t *r;
912
913 /* Reset keep-alive send timer and server watchdog */
914 client->cyclic_tick = 0;
915 client->server_watchdog = 0;
916 /* QoS 0 publish has no response from server, so call its callbacks here */
917 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
918 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
919 if (r->cb != NULL) {
920 r->cb(r->arg, ERR_OK);
921 }
922 mqtt_delete_request(r);
923 }
924 /* Try send any remaining buffers from output queue */
925 mqtt_output_send(&client->output, client->conn);
926 }
927 return ERR_OK;
928 }
929
930 /**
931 * TCP error callback function. @see tcp_err_fn
932 * @param arg MQTT client
933 * @param err Error encountered
934 */
935 static void
mqtt_tcp_err_cb(void * arg,err_t err)936 mqtt_tcp_err_cb(void *arg, err_t err)
937 {
938 mqtt_client_t *client = (mqtt_client_t *)arg;
939 LWIP_UNUSED_ARG(err); /* only used for debug output */
940 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
941 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
942 /* Set conn to null before calling close as pcb is already deallocated*/
943 client->conn = 0;
944 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
945 }
946
947 /**
948 * TCP poll callback function. @see tcp_poll_fn
949 * @param arg MQTT client
950 * @param tpcb TCP connection handle
951 * @return err ERR_OK
952 */
953 static err_t
mqtt_tcp_poll_cb(void * arg,struct tcp_pcb * tpcb)954 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
955 {
956 mqtt_client_t *client = (mqtt_client_t *)arg;
957 if (client->conn_state == MQTT_CONNECTED) {
958 /* Try send any remaining buffers from output queue */
959 mqtt_output_send(&client->output, tpcb);
960 }
961 return ERR_OK;
962 }
963
964 /**
965 * TCP connect callback function. @see tcp_connected_fn
966 * @param arg MQTT client
967 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
968 * @return ERR_OK
969 */
970 static err_t
mqtt_tcp_connect_cb(void * arg,struct tcp_pcb * tpcb,err_t err)971 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
972 {
973 mqtt_client_t* client = (mqtt_client_t *)arg;
974
975 if (err != ERR_OK) {
976 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
977 return err;
978 }
979
980 /* Initiate receiver state */
981 client->msg_idx = 0;
982
983 /* Setup TCP callbacks */
984 tcp_recv(tpcb, mqtt_tcp_recv_cb);
985 tcp_sent(tpcb, mqtt_tcp_sent_cb);
986 tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
987
988 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
989 /* Enter MQTT connect state */
990 client->conn_state = MQTT_CONNECTING;
991
992 /* Start cyclic timer */
993 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
994 client->cyclic_tick = 0;
995
996 /* Start transmission from output queue, connect message is the first one out*/
997 mqtt_output_send(&client->output, client->conn);
998
999 return ERR_OK;
1000 }
1001
1002
1003
1004 /*---------------------------------------------------------------------------------------------------- */
1005 /* Public API */
1006
1007
1008 /**
1009 * @ingroup mqtt
1010 * MQTT publish function.
1011 * @param client MQTT client
1012 * @param topic Publish topic string
1013 * @param payload Data to publish (NULL is allowed)
1014 * @param payload_length: Length of payload (0 is allowed)
1015 * @param qos Quality of service, 0 1 or 2
1016 * @param retain MQTT retain flag
1017 * @param cb Callback to call when publish is complete or has timed out
1018 * @param arg User supplied argument to publish callback
1019 * @return ERR_OK if successful
1020 * ERR_CONN if client is disconnected
1021 * ERR_MEM if short on memory
1022 */
1023 err_t
mqtt_publish(mqtt_client_t * client,const char * topic,const void * payload,u16_t payload_length,u8_t qos,u8_t retain,mqtt_request_cb_t cb,void * arg)1024 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1025 mqtt_request_cb_t cb, void *arg)
1026 {
1027 struct mqtt_request_t *r;
1028 u16_t pkt_id;
1029 size_t topic_strlen;
1030 size_t total_len;
1031 u16_t topic_len;
1032 u16_t remaining_length;
1033
1034 LWIP_ASSERT("mqtt_publish: client != NULL", client);
1035 LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1036 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1037
1038 topic_strlen = strlen(topic);
1039 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1040 topic_len = (u16_t)topic_strlen;
1041 total_len = 2 + topic_len + payload_length;
1042 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1043 remaining_length = (u16_t)total_len;
1044
1045 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1046
1047 if (qos > 0) {
1048 remaining_length += 2;
1049 /* Generate pkt_id id for QoS1 and 2 */
1050 pkt_id = msg_generate_packet_id(client);
1051 } else {
1052 /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1053 pkt_id = 0;
1054 }
1055
1056 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1057 if (r == NULL) {
1058 return ERR_MEM;
1059 }
1060
1061 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1062 mqtt_delete_request(r);
1063 return ERR_MEM;
1064 }
1065 /* Append fixed header */
1066 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1067
1068 /* Append Topic */
1069 mqtt_output_append_string(&client->output, topic, topic_len);
1070
1071 /* Append packet if for QoS 1 and 2*/
1072 if (qos > 0) {
1073 mqtt_output_append_u16(&client->output, pkt_id);
1074 }
1075
1076 /* Append optional publish payload */
1077 if ((payload != NULL) && (payload_length > 0)) {
1078 mqtt_output_append_buf(&client->output, payload, payload_length);
1079 }
1080
1081 mqtt_append_request(&client->pend_req_queue, r);
1082 mqtt_output_send(&client->output, client->conn);
1083 return ERR_OK;
1084 }
1085
1086
1087 /**
1088 * @ingroup mqtt
1089 * MQTT subscribe/unsubscribe function.
1090 * @param client MQTT client
1091 * @param topic topic to subscribe to
1092 * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1093 * @param cb Callback to call when subscribe/unsubscribe reponse is received
1094 * @param arg User supplied argument to publish callback
1095 * @param sub 1 for subscribe, 0 for unsubscribe
1096 * @return ERR_OK if successful, @see err_t enum for other results
1097 */
1098 err_t
mqtt_sub_unsub(mqtt_client_t * client,const char * topic,u8_t qos,mqtt_request_cb_t cb,void * arg,u8_t sub)1099 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1100 {
1101 size_t topic_strlen;
1102 size_t total_len;
1103 u16_t topic_len;
1104 u16_t remaining_length;
1105 u16_t pkt_id;
1106 struct mqtt_request_t *r;
1107
1108 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1109 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1110
1111 topic_strlen = strlen(topic);
1112 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1113 topic_len = (u16_t)topic_strlen;
1114 /* Topic string, pkt_id, qos for subscribe */
1115 total_len = topic_len + 2 + 2 + (sub != 0);
1116 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1117 remaining_length = (u16_t)total_len;
1118
1119 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1120 if (client->conn_state == TCP_DISCONNECTED) {
1121 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1122 return ERR_CONN;
1123 }
1124
1125 pkt_id = msg_generate_packet_id(client);
1126 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1127 if (r == NULL) {
1128 return ERR_MEM;
1129 }
1130
1131 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1132 mqtt_delete_request(r);
1133 return ERR_MEM;
1134 }
1135
1136 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1137
1138 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1139 /* Packet id */
1140 mqtt_output_append_u16(&client->output, pkt_id);
1141 /* Topic */
1142 mqtt_output_append_string(&client->output, topic, topic_len);
1143 /* QoS */
1144 if (sub != 0) {
1145 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1146 }
1147
1148 mqtt_append_request(&client->pend_req_queue, r);
1149 mqtt_output_send(&client->output, client->conn);
1150 return ERR_OK;
1151 }
1152
1153
1154 /**
1155 * @ingroup mqtt
1156 * Set callback to handle incoming publish requests from server
1157 * @param client MQTT client
1158 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1159 * @param data_cb Callback for each fragment of payload that arrives
1160 * @param arg User supplied argument to both callbacks
1161 */
1162 void
mqtt_set_inpub_callback(mqtt_client_t * client,mqtt_incoming_publish_cb_t pub_cb,mqtt_incoming_data_cb_t data_cb,void * arg)1163 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1164 mqtt_incoming_data_cb_t data_cb, void *arg)
1165 {
1166 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1167 client->data_cb = data_cb;
1168 client->pub_cb = pub_cb;
1169 client->inpub_arg = arg;
1170 }
1171
1172 /**
1173 * @ingroup mqtt
1174 * Create a new MQTT client instance
1175 * @return Pointer to instance on success, NULL otherwise
1176 */
1177 mqtt_client_t *
mqtt_client_new(void)1178 mqtt_client_new(void)
1179 {
1180 mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
1181 if (client != NULL) {
1182 memset(client, 0, sizeof(mqtt_client_t));
1183 }
1184 return client;
1185 }
1186
1187
1188 /**
1189 * @ingroup mqtt
1190 * Connect to MQTT server
1191 * @param client MQTT client
1192 * @param ip_addr Server IP
1193 * @param port Server port
1194 * @param cb Connection state change callback
1195 * @param arg User supplied argument to connection callback
1196 * @param client_info Client identification and connection options
1197 * @return ERR_OK if successful, @see err_t enum for other results
1198 */
1199 err_t
mqtt_client_connect(mqtt_client_t * client,const ip_addr_t * ip_addr,u16_t port,mqtt_connection_cb_t cb,void * arg,const struct mqtt_connect_client_info_t * client_info)1200 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1201 const struct mqtt_connect_client_info_t *client_info)
1202 {
1203 err_t err;
1204 size_t len;
1205 u16_t client_id_length;
1206 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1207 u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1208 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1209
1210 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1211 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1212 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1213 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1214
1215 if (client->conn_state != TCP_DISCONNECTED) {
1216 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
1217 return ERR_ISCONN;
1218 }
1219
1220 /* Wipe clean */
1221 memset(client, 0, sizeof(mqtt_client_t));
1222 client->connect_arg = arg;
1223 client->connect_cb = cb;
1224 client->keep_alive = client_info->keep_alive;
1225 mqtt_init_requests(client->req_list);
1226
1227 /* Build connect message */
1228 if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1229 flags |= MQTT_CONNECT_FLAG_WILL;
1230 flags |= (client_info->will_qos & 3) << 3;
1231 if (client_info->will_retain) {
1232 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1233 }
1234 len = strlen(client_info->will_topic);
1235 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1236 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1237 will_topic_len = (u8_t)len;
1238 len = strlen(client_info->will_msg);
1239 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1240 will_msg_len = (u8_t)len;
1241 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1242 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1243 remaining_length = (u16_t)len;
1244 }
1245
1246 /* Don't complicate things, always connect using clean session */
1247 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1248
1249 len = strlen(client_info->client_id);
1250 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1251 client_id_length = (u16_t)len;
1252 len = remaining_length + 2 + client_id_length;
1253 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1254 remaining_length = (u16_t)len;
1255
1256 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1257 return ERR_MEM;
1258 }
1259
1260 client->conn = tcp_new();
1261 if (client->conn == NULL) {
1262 return ERR_MEM;
1263 }
1264
1265 /* Set arg pointer for callbacks */
1266 tcp_arg(client->conn, client);
1267 /* Any local address, pick random local port number */
1268 err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
1269 if (err != ERR_OK) {
1270 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1271 goto tcp_fail;
1272 }
1273 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1274
1275 /* Connect to server */
1276 err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1277 if (err != ERR_OK) {
1278 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1279 goto tcp_fail;
1280 }
1281 /* Set error callback */
1282 tcp_err(client->conn, mqtt_tcp_err_cb);
1283 client->conn_state = TCP_CONNECTING;
1284
1285 /* Append fixed header */
1286 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1287 /* Append Protocol string */
1288 mqtt_output_append_string(&client->output, "MQTT", 4);
1289 /* Append Protocol level */
1290 mqtt_output_append_u8(&client->output, 4);
1291 /* Append connect flags */
1292 mqtt_output_append_u8(&client->output, flags);
1293 /* Append keep-alive */
1294 mqtt_output_append_u16(&client->output, client_info->keep_alive);
1295 /* Append client id */
1296 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1297 /* Append will message if used */
1298 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1299 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1300 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1301 }
1302 return ERR_OK;
1303
1304 tcp_fail:
1305 tcp_abort(client->conn);
1306 client->conn = NULL;
1307 return err;
1308 }
1309
1310
1311 /**
1312 * @ingroup mqtt
1313 * Disconnect from MQTT server
1314 * @param client MQTT client
1315 */
1316 void
mqtt_disconnect(mqtt_client_t * client)1317 mqtt_disconnect(mqtt_client_t *client)
1318 {
1319 LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1320 /* If connection in not already closed */
1321 if (client->conn_state != TCP_DISCONNECTED) {
1322 /* Set conn_state before calling mqtt_close to prevent callback from being called */
1323 client->conn_state = TCP_DISCONNECTED;
1324 mqtt_close(client, (mqtt_connection_status_t)0);
1325 }
1326 }
1327
1328 /**
1329 * @ingroup mqtt
1330 * Check connection with server
1331 * @param client MQTT client
1332 * @return 1 if connected to server, 0 otherwise
1333 */
1334 u8_t
mqtt_client_is_connected(mqtt_client_t * client)1335 mqtt_client_is_connected(mqtt_client_t *client)
1336 {
1337 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1338 return client->conn_state == MQTT_CONNECTED;
1339 }
1340
1341 #endif /* LWIP_TCP && LWIP_CALLBACK_API */
1342