1*1c60b9acSAndroid Build Coastguard Worker /*
2*1c60b9acSAndroid Build Coastguard Worker * lws-minimal-mqtt-client
3*1c60b9acSAndroid Build Coastguard Worker *
4*1c60b9acSAndroid Build Coastguard Worker * Written in 2010-2020 by Andy Green <[email protected]>
5*1c60b9acSAndroid Build Coastguard Worker * Sakthi Kannan <[email protected]>
6*1c60b9acSAndroid Build Coastguard Worker *
7*1c60b9acSAndroid Build Coastguard Worker * This file is made available under the Creative Commons CC0 1.0
8*1c60b9acSAndroid Build Coastguard Worker * Universal Public Domain Dedication.
9*1c60b9acSAndroid Build Coastguard Worker */
10*1c60b9acSAndroid Build Coastguard Worker
11*1c60b9acSAndroid Build Coastguard Worker #include <libwebsockets.h>
12*1c60b9acSAndroid Build Coastguard Worker #include <string.h>
13*1c60b9acSAndroid Build Coastguard Worker #include <signal.h>
14*1c60b9acSAndroid Build Coastguard Worker #if defined(WIN32)
15*1c60b9acSAndroid Build Coastguard Worker #define HAVE_STRUCT_TIMESPEC
16*1c60b9acSAndroid Build Coastguard Worker #if defined(pid_t)
17*1c60b9acSAndroid Build Coastguard Worker #undef pid_t
18*1c60b9acSAndroid Build Coastguard Worker #endif
19*1c60b9acSAndroid Build Coastguard Worker #endif
20*1c60b9acSAndroid Build Coastguard Worker #include <pthread.h>
21*1c60b9acSAndroid Build Coastguard Worker #include <assert.h>
22*1c60b9acSAndroid Build Coastguard Worker
23*1c60b9acSAndroid Build Coastguard Worker #define COUNT 8
24*1c60b9acSAndroid Build Coastguard Worker
25*1c60b9acSAndroid Build Coastguard Worker struct test_item {
26*1c60b9acSAndroid Build Coastguard Worker struct lws_context *context;
27*1c60b9acSAndroid Build Coastguard Worker struct lws *wsi;
28*1c60b9acSAndroid Build Coastguard Worker lws_sorted_usec_list_t sul;
29*1c60b9acSAndroid Build Coastguard Worker } items[COUNT];
30*1c60b9acSAndroid Build Coastguard Worker
31*1c60b9acSAndroid Build Coastguard Worker enum {
32*1c60b9acSAndroid Build Coastguard Worker STATE_SUBSCRIBE, /* subscribe to the topic */
33*1c60b9acSAndroid Build Coastguard Worker STATE_WAIT_SUBACK,
34*1c60b9acSAndroid Build Coastguard Worker STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
35*1c60b9acSAndroid Build Coastguard Worker STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
36*1c60b9acSAndroid Build Coastguard Worker STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
37*1c60b9acSAndroid Build Coastguard Worker STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
38*1c60b9acSAndroid Build Coastguard Worker STATE_UNSUBSCRIBE,
39*1c60b9acSAndroid Build Coastguard Worker STATE_WAIT_UNSUBACK,
40*1c60b9acSAndroid Build Coastguard Worker
41*1c60b9acSAndroid Build Coastguard Worker STATE_TEST_FINISH
42*1c60b9acSAndroid Build Coastguard Worker };
43*1c60b9acSAndroid Build Coastguard Worker
44*1c60b9acSAndroid Build Coastguard Worker static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45*1c60b9acSAndroid Build Coastguard Worker done, count = COUNT;
46*1c60b9acSAndroid Build Coastguard Worker
47*1c60b9acSAndroid Build Coastguard Worker static const lws_retry_bo_t retry = {
48*1c60b9acSAndroid Build Coastguard Worker .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
49*1c60b9acSAndroid Build Coastguard Worker .secs_since_valid_hangup = 25, /* hangup if still idle secs */
50*1c60b9acSAndroid Build Coastguard Worker };
51*1c60b9acSAndroid Build Coastguard Worker
52*1c60b9acSAndroid Build Coastguard Worker static const lws_mqtt_client_connect_param_t client_connect_param = {
53*1c60b9acSAndroid Build Coastguard Worker .client_id = NULL,
54*1c60b9acSAndroid Build Coastguard Worker .keep_alive = 60,
55*1c60b9acSAndroid Build Coastguard Worker .clean_start = 1,
56*1c60b9acSAndroid Build Coastguard Worker .client_id_nofree = 1,
57*1c60b9acSAndroid Build Coastguard Worker .username_nofree = 1,
58*1c60b9acSAndroid Build Coastguard Worker .password_nofree = 1,
59*1c60b9acSAndroid Build Coastguard Worker .will_param = {
60*1c60b9acSAndroid Build Coastguard Worker .topic = "good/bye",
61*1c60b9acSAndroid Build Coastguard Worker .message = "sign-off",
62*1c60b9acSAndroid Build Coastguard Worker .qos = 0,
63*1c60b9acSAndroid Build Coastguard Worker .retain = 0,
64*1c60b9acSAndroid Build Coastguard Worker },
65*1c60b9acSAndroid Build Coastguard Worker .username = "lwsUser",
66*1c60b9acSAndroid Build Coastguard Worker .password = "mySecretPassword",
67*1c60b9acSAndroid Build Coastguard Worker };
68*1c60b9acSAndroid Build Coastguard Worker
69*1c60b9acSAndroid Build Coastguard Worker static lws_mqtt_topic_elem_t topics[] = {
70*1c60b9acSAndroid Build Coastguard Worker [0] = { .name = "test/topic0", .qos = QOS0 },
71*1c60b9acSAndroid Build Coastguard Worker [1] = { .name = "test/topic1", .qos = QOS1 },
72*1c60b9acSAndroid Build Coastguard Worker };
73*1c60b9acSAndroid Build Coastguard Worker
74*1c60b9acSAndroid Build Coastguard Worker static lws_mqtt_subscribe_param_t sub_param = {
75*1c60b9acSAndroid Build Coastguard Worker .topic = &topics[0],
76*1c60b9acSAndroid Build Coastguard Worker .num_topics = LWS_ARRAY_SIZE(topics),
77*1c60b9acSAndroid Build Coastguard Worker };
78*1c60b9acSAndroid Build Coastguard Worker
79*1c60b9acSAndroid Build Coastguard Worker static const char * const test_string =
80*1c60b9acSAndroid Build Coastguard Worker "No one would have believed in the last years of the nineteenth "
81*1c60b9acSAndroid Build Coastguard Worker "century that this world was being watched keenly and closely by "
82*1c60b9acSAndroid Build Coastguard Worker "intelligences greater than man's and yet as mortal as his own; that as "
83*1c60b9acSAndroid Build Coastguard Worker "men busied themselves about their various concerns they were "
84*1c60b9acSAndroid Build Coastguard Worker "scrutinised and studied, perhaps almost as narrowly as a man with a "
85*1c60b9acSAndroid Build Coastguard Worker "microscope might scrutinise the transient creatures that swarm and "
86*1c60b9acSAndroid Build Coastguard Worker "multiply in a drop of water. With infinite complacency men went to "
87*1c60b9acSAndroid Build Coastguard Worker "and fro over this globe about their little affairs, serene in their "
88*1c60b9acSAndroid Build Coastguard Worker "assurance of their empire over matter. It is possible that the "
89*1c60b9acSAndroid Build Coastguard Worker "infusoria under the microscope do the same. No one gave a thought to "
90*1c60b9acSAndroid Build Coastguard Worker "the older worlds of space as sources of human danger, or thought of "
91*1c60b9acSAndroid Build Coastguard Worker "them only to dismiss the idea of life upon them as impossible or "
92*1c60b9acSAndroid Build Coastguard Worker "improbable. It is curious to recall some of the mental habits of "
93*1c60b9acSAndroid Build Coastguard Worker "those departed days. At most terrestrial men fancied there might be "
94*1c60b9acSAndroid Build Coastguard Worker "other men upon Mars, perhaps inferior to themselves and ready to "
95*1c60b9acSAndroid Build Coastguard Worker "welcome a missionary enterprise. Yet across the gulf of space, minds "
96*1c60b9acSAndroid Build Coastguard Worker "that are to our minds as ours are to those of the beasts that perish, "
97*1c60b9acSAndroid Build Coastguard Worker "intellects vast and cool and unsympathetic, regarded this earth with "
98*1c60b9acSAndroid Build Coastguard Worker "envious eyes, and slowly and surely drew their plans against us. And "
99*1c60b9acSAndroid Build Coastguard Worker "early in the twentieth century came the great disillusionment. ";
100*1c60b9acSAndroid Build Coastguard Worker
101*1c60b9acSAndroid Build Coastguard Worker /* this reflects the length of the string above */
102*1c60b9acSAndroid Build Coastguard Worker #define TEST_STRING_LEN 1337
103*1c60b9acSAndroid Build Coastguard Worker
104*1c60b9acSAndroid Build Coastguard Worker struct pss {
105*1c60b9acSAndroid Build Coastguard Worker lws_mqtt_publish_param_t pub_param;
106*1c60b9acSAndroid Build Coastguard Worker int state;
107*1c60b9acSAndroid Build Coastguard Worker size_t pos;
108*1c60b9acSAndroid Build Coastguard Worker int retries;
109*1c60b9acSAndroid Build Coastguard Worker };
110*1c60b9acSAndroid Build Coastguard Worker
111*1c60b9acSAndroid Build Coastguard Worker static void
sigint_handler(int sig)112*1c60b9acSAndroid Build Coastguard Worker sigint_handler(int sig)
113*1c60b9acSAndroid Build Coastguard Worker {
114*1c60b9acSAndroid Build Coastguard Worker interrupted = 1;
115*1c60b9acSAndroid Build Coastguard Worker }
116*1c60b9acSAndroid Build Coastguard Worker
117*1c60b9acSAndroid Build Coastguard Worker static int
connect_client(struct lws_context * context,struct test_item * item)118*1c60b9acSAndroid Build Coastguard Worker connect_client(struct lws_context *context, struct test_item *item)
119*1c60b9acSAndroid Build Coastguard Worker {
120*1c60b9acSAndroid Build Coastguard Worker struct lws_client_connect_info i;
121*1c60b9acSAndroid Build Coastguard Worker
122*1c60b9acSAndroid Build Coastguard Worker memset(&i, 0, sizeof i);
123*1c60b9acSAndroid Build Coastguard Worker
124*1c60b9acSAndroid Build Coastguard Worker i.mqtt_cp = &client_connect_param;
125*1c60b9acSAndroid Build Coastguard Worker i.opaque_user_data = item;
126*1c60b9acSAndroid Build Coastguard Worker i.protocol = "test-mqtt";
127*1c60b9acSAndroid Build Coastguard Worker i.address = "localhost";
128*1c60b9acSAndroid Build Coastguard Worker i.host = "localhost";
129*1c60b9acSAndroid Build Coastguard Worker i.pwsi = &item->wsi;
130*1c60b9acSAndroid Build Coastguard Worker i.context = context;
131*1c60b9acSAndroid Build Coastguard Worker i.method = "MQTT";
132*1c60b9acSAndroid Build Coastguard Worker i.alpn = "mqtt";
133*1c60b9acSAndroid Build Coastguard Worker i.port = 1883;
134*1c60b9acSAndroid Build Coastguard Worker
135*1c60b9acSAndroid Build Coastguard Worker if (do_ssl) {
136*1c60b9acSAndroid Build Coastguard Worker i.ssl_connection = LCCSCF_USE_SSL;
137*1c60b9acSAndroid Build Coastguard Worker i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138*1c60b9acSAndroid Build Coastguard Worker i.port = 8883;
139*1c60b9acSAndroid Build Coastguard Worker }
140*1c60b9acSAndroid Build Coastguard Worker
141*1c60b9acSAndroid Build Coastguard Worker if (pipeline)
142*1c60b9acSAndroid Build Coastguard Worker i.ssl_connection |= LCCSCF_PIPELINE;
143*1c60b9acSAndroid Build Coastguard Worker
144*1c60b9acSAndroid Build Coastguard Worker if (!lws_client_connect_via_info(&i)) {
145*1c60b9acSAndroid Build Coastguard Worker lwsl_err("%s: Client Connect Failed\n", __func__);
146*1c60b9acSAndroid Build Coastguard Worker
147*1c60b9acSAndroid Build Coastguard Worker return 1;
148*1c60b9acSAndroid Build Coastguard Worker }
149*1c60b9acSAndroid Build Coastguard Worker
150*1c60b9acSAndroid Build Coastguard Worker return 0;
151*1c60b9acSAndroid Build Coastguard Worker }
152*1c60b9acSAndroid Build Coastguard Worker
153*1c60b9acSAndroid Build Coastguard Worker static void
start_conn(struct lws_sorted_usec_list * sul)154*1c60b9acSAndroid Build Coastguard Worker start_conn(struct lws_sorted_usec_list *sul)
155*1c60b9acSAndroid Build Coastguard Worker {
156*1c60b9acSAndroid Build Coastguard Worker struct test_item *item = lws_container_of(sul, struct test_item, sul);
157*1c60b9acSAndroid Build Coastguard Worker
158*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159*1c60b9acSAndroid Build Coastguard Worker
160*1c60b9acSAndroid Build Coastguard Worker if (connect_client(item->context, item))
161*1c60b9acSAndroid Build Coastguard Worker interrupted = 1;
162*1c60b9acSAndroid Build Coastguard Worker }
163*1c60b9acSAndroid Build Coastguard Worker
164*1c60b9acSAndroid Build Coastguard Worker
165*1c60b9acSAndroid Build Coastguard Worker static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)166*1c60b9acSAndroid Build Coastguard Worker system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167*1c60b9acSAndroid Build Coastguard Worker int current, int target)
168*1c60b9acSAndroid Build Coastguard Worker {
169*1c60b9acSAndroid Build Coastguard Worker struct lws_context *context = mgr->parent;
170*1c60b9acSAndroid Build Coastguard Worker int n;
171*1c60b9acSAndroid Build Coastguard Worker
172*1c60b9acSAndroid Build Coastguard Worker if (current != LWS_SYSTATE_OPERATIONAL ||
173*1c60b9acSAndroid Build Coastguard Worker target != LWS_SYSTATE_OPERATIONAL)
174*1c60b9acSAndroid Build Coastguard Worker return 0;
175*1c60b9acSAndroid Build Coastguard Worker
176*1c60b9acSAndroid Build Coastguard Worker /*
177*1c60b9acSAndroid Build Coastguard Worker * We delay trying to do the client connection until the protocols have
178*1c60b9acSAndroid Build Coastguard Worker * been initialized for each vhost... this happens after we have network
179*1c60b9acSAndroid Build Coastguard Worker * and time so we can judge tls cert validity.
180*1c60b9acSAndroid Build Coastguard Worker *
181*1c60b9acSAndroid Build Coastguard Worker * Stagger the connection attempts so we get some joining before the
182*1c60b9acSAndroid Build Coastguard Worker * first has connected and some afterwards
183*1c60b9acSAndroid Build Coastguard Worker */
184*1c60b9acSAndroid Build Coastguard Worker
185*1c60b9acSAndroid Build Coastguard Worker for (n = 0; n < count; n++) {
186*1c60b9acSAndroid Build Coastguard Worker items[n].context = context;
187*1c60b9acSAndroid Build Coastguard Worker lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188*1c60b9acSAndroid Build Coastguard Worker n * stagger_us);
189*1c60b9acSAndroid Build Coastguard Worker }
190*1c60b9acSAndroid Build Coastguard Worker
191*1c60b9acSAndroid Build Coastguard Worker return 0;
192*1c60b9acSAndroid Build Coastguard Worker }
193*1c60b9acSAndroid Build Coastguard Worker
194*1c60b9acSAndroid Build Coastguard Worker
195*1c60b9acSAndroid Build Coastguard Worker static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)196*1c60b9acSAndroid Build Coastguard Worker callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197*1c60b9acSAndroid Build Coastguard Worker void *user, void *in, size_t len)
198*1c60b9acSAndroid Build Coastguard Worker {
199*1c60b9acSAndroid Build Coastguard Worker struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200*1c60b9acSAndroid Build Coastguard Worker struct pss *pss = (struct pss *)user;
201*1c60b9acSAndroid Build Coastguard Worker lws_mqtt_publish_param_t *pub;
202*1c60b9acSAndroid Build Coastguard Worker size_t chunk;
203*1c60b9acSAndroid Build Coastguard Worker
204*1c60b9acSAndroid Build Coastguard Worker switch (reason) {
205*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206*1c60b9acSAndroid Build Coastguard Worker lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207*1c60b9acSAndroid Build Coastguard Worker in ? (char *)in : "(null)");
208*1c60b9acSAndroid Build Coastguard Worker
209*1c60b9acSAndroid Build Coastguard Worker if (++done == count)
210*1c60b9acSAndroid Build Coastguard Worker goto finish_test;
211*1c60b9acSAndroid Build Coastguard Worker break;
212*1c60b9acSAndroid Build Coastguard Worker
213*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215*1c60b9acSAndroid Build Coastguard Worker
216*1c60b9acSAndroid Build Coastguard Worker if (++done == count)
217*1c60b9acSAndroid Build Coastguard Worker goto finish_test;
218*1c60b9acSAndroid Build Coastguard Worker break;
219*1c60b9acSAndroid Build Coastguard Worker
220*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222*1c60b9acSAndroid Build Coastguard Worker lws_callback_on_writable(wsi);
223*1c60b9acSAndroid Build Coastguard Worker
224*1c60b9acSAndroid Build Coastguard Worker return 0;
225*1c60b9acSAndroid Build Coastguard Worker
226*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_SUBSCRIBED:
227*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228*1c60b9acSAndroid Build Coastguard Worker
229*1c60b9acSAndroid Build Coastguard Worker /* then we can get on with the actual test part */
230*1c60b9acSAndroid Build Coastguard Worker
231*1c60b9acSAndroid Build Coastguard Worker pss->state++;
232*1c60b9acSAndroid Build Coastguard Worker lws_callback_on_writable(wsi);
233*1c60b9acSAndroid Build Coastguard Worker break;
234*1c60b9acSAndroid Build Coastguard Worker
235*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237*1c60b9acSAndroid Build Coastguard Worker __func__, (int)(item - &item[0]), wsi);
238*1c60b9acSAndroid Build Coastguard Worker okay++;
239*1c60b9acSAndroid Build Coastguard Worker
240*1c60b9acSAndroid Build Coastguard Worker if (++pss->state == STATE_TEST_FINISH) {
241*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242*1c60b9acSAndroid Build Coastguard Worker __func__, (int)(item - &items[0]), okay, count);
243*1c60b9acSAndroid Build Coastguard Worker /* We are done, request to close */
244*1c60b9acSAndroid Build Coastguard Worker return -1;
245*1c60b9acSAndroid Build Coastguard Worker }
246*1c60b9acSAndroid Build Coastguard Worker break;
247*1c60b9acSAndroid Build Coastguard Worker
248*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249*1c60b9acSAndroid Build Coastguard Worker
250*1c60b9acSAndroid Build Coastguard Worker /*
251*1c60b9acSAndroid Build Coastguard Worker * Extra WRITEABLE may appear here other than ones we asked
252*1c60b9acSAndroid Build Coastguard Worker * for, so we must consult our own state to decide if we want
253*1c60b9acSAndroid Build Coastguard Worker * to make use of the opportunity
254*1c60b9acSAndroid Build Coastguard Worker */
255*1c60b9acSAndroid Build Coastguard Worker
256*1c60b9acSAndroid Build Coastguard Worker switch (pss->state) {
257*1c60b9acSAndroid Build Coastguard Worker case STATE_SUBSCRIBE:
258*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259*1c60b9acSAndroid Build Coastguard Worker
260*1c60b9acSAndroid Build Coastguard Worker if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: subscribe failed\n", __func__);
262*1c60b9acSAndroid Build Coastguard Worker
263*1c60b9acSAndroid Build Coastguard Worker return -1;
264*1c60b9acSAndroid Build Coastguard Worker }
265*1c60b9acSAndroid Build Coastguard Worker pss->state++;
266*1c60b9acSAndroid Build Coastguard Worker break;
267*1c60b9acSAndroid Build Coastguard Worker
268*1c60b9acSAndroid Build Coastguard Worker case STATE_PUBLISH_QOS0:
269*1c60b9acSAndroid Build Coastguard Worker case STATE_PUBLISH_QOS1:
270*1c60b9acSAndroid Build Coastguard Worker
271*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272*1c60b9acSAndroid Build Coastguard Worker
273*1c60b9acSAndroid Build Coastguard Worker pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
274*1c60b9acSAndroid Build Coastguard Worker "test/topic0" : "test/topic1";
275*1c60b9acSAndroid Build Coastguard Worker pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276*1c60b9acSAndroid Build Coastguard Worker pss->pub_param.qos =
277*1c60b9acSAndroid Build Coastguard Worker pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278*1c60b9acSAndroid Build Coastguard Worker pss->pub_param.payload_len = TEST_STRING_LEN;
279*1c60b9acSAndroid Build Coastguard Worker
280*1c60b9acSAndroid Build Coastguard Worker /* We send the message out 300 bytes or less at at time */
281*1c60b9acSAndroid Build Coastguard Worker
282*1c60b9acSAndroid Build Coastguard Worker chunk = 300;
283*1c60b9acSAndroid Build Coastguard Worker
284*1c60b9acSAndroid Build Coastguard Worker if (chunk > TEST_STRING_LEN - pss->pos)
285*1c60b9acSAndroid Build Coastguard Worker chunk = TEST_STRING_LEN - pss->pos;
286*1c60b9acSAndroid Build Coastguard Worker
287*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: sending %d at +%d\n", __func__,
288*1c60b9acSAndroid Build Coastguard Worker (int)chunk, (int)pss->pos);
289*1c60b9acSAndroid Build Coastguard Worker
290*1c60b9acSAndroid Build Coastguard Worker if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291*1c60b9acSAndroid Build Coastguard Worker test_string + pss->pos, (uint32_t)chunk,
292*1c60b9acSAndroid Build Coastguard Worker (pss->pos + chunk == TEST_STRING_LEN))) {
293*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: publish failed\n", __func__);
294*1c60b9acSAndroid Build Coastguard Worker return -1;
295*1c60b9acSAndroid Build Coastguard Worker }
296*1c60b9acSAndroid Build Coastguard Worker
297*1c60b9acSAndroid Build Coastguard Worker pss->pos += chunk;
298*1c60b9acSAndroid Build Coastguard Worker
299*1c60b9acSAndroid Build Coastguard Worker if (pss->pos == TEST_STRING_LEN) {
300*1c60b9acSAndroid Build Coastguard Worker lwsl_debug("%s: sent message\n", __func__);
301*1c60b9acSAndroid Build Coastguard Worker pss->pos = 0;
302*1c60b9acSAndroid Build Coastguard Worker pss->state++;
303*1c60b9acSAndroid Build Coastguard Worker }
304*1c60b9acSAndroid Build Coastguard Worker break;
305*1c60b9acSAndroid Build Coastguard Worker
306*1c60b9acSAndroid Build Coastguard Worker case STATE_UNSUBSCRIBE:
307*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308*1c60b9acSAndroid Build Coastguard Worker __func__, (int)(item - &item[0]), wsi);
309*1c60b9acSAndroid Build Coastguard Worker pss->state++;
310*1c60b9acSAndroid Build Coastguard Worker if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: subscribe failed\n", __func__);
312*1c60b9acSAndroid Build Coastguard Worker return -1;
313*1c60b9acSAndroid Build Coastguard Worker }
314*1c60b9acSAndroid Build Coastguard Worker break;
315*1c60b9acSAndroid Build Coastguard Worker default:
316*1c60b9acSAndroid Build Coastguard Worker break;
317*1c60b9acSAndroid Build Coastguard Worker }
318*1c60b9acSAndroid Build Coastguard Worker
319*1c60b9acSAndroid Build Coastguard Worker return 0;
320*1c60b9acSAndroid Build Coastguard Worker
321*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_ACK:
322*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323*1c60b9acSAndroid Build Coastguard Worker /*
324*1c60b9acSAndroid Build Coastguard Worker * We can forget about the message we just sent, it's done.
325*1c60b9acSAndroid Build Coastguard Worker *
326*1c60b9acSAndroid Build Coastguard Worker * For our test, that's the indication we can close the wsi.
327*1c60b9acSAndroid Build Coastguard Worker */
328*1c60b9acSAndroid Build Coastguard Worker
329*1c60b9acSAndroid Build Coastguard Worker pss->state++;
330*1c60b9acSAndroid Build Coastguard Worker if (pss->state != STATE_TEST_FINISH) {
331*1c60b9acSAndroid Build Coastguard Worker lws_callback_on_writable(wsi);
332*1c60b9acSAndroid Build Coastguard Worker break;
333*1c60b9acSAndroid Build Coastguard Worker }
334*1c60b9acSAndroid Build Coastguard Worker
335*1c60b9acSAndroid Build Coastguard Worker break;
336*1c60b9acSAndroid Build Coastguard Worker
337*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_RESEND:
338*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: MQTT_RESEND\n", __func__);
339*1c60b9acSAndroid Build Coastguard Worker /*
340*1c60b9acSAndroid Build Coastguard Worker * We must resend the packet ID mentioned in len
341*1c60b9acSAndroid Build Coastguard Worker */
342*1c60b9acSAndroid Build Coastguard Worker if (++pss->retries == 3) {
343*1c60b9acSAndroid Build Coastguard Worker lwsl_notice("%s: too many retries\n", __func__);
344*1c60b9acSAndroid Build Coastguard Worker return 1; /* kill the connection */
345*1c60b9acSAndroid Build Coastguard Worker }
346*1c60b9acSAndroid Build Coastguard Worker pss->state--;
347*1c60b9acSAndroid Build Coastguard Worker pss->pos = 0;
348*1c60b9acSAndroid Build Coastguard Worker break;
349*1c60b9acSAndroid Build Coastguard Worker
350*1c60b9acSAndroid Build Coastguard Worker case LWS_CALLBACK_MQTT_CLIENT_RX:
351*1c60b9acSAndroid Build Coastguard Worker pub = (lws_mqtt_publish_param_t *)in;
352*1c60b9acSAndroid Build Coastguard Worker assert(pub);
353*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354*1c60b9acSAndroid Build Coastguard Worker (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355*1c60b9acSAndroid Build Coastguard Worker (int)pub->payload_len, (int)len);
356*1c60b9acSAndroid Build Coastguard Worker
357*1c60b9acSAndroid Build Coastguard Worker //lwsl_hexdump_info(pub->payload, len);
358*1c60b9acSAndroid Build Coastguard Worker
359*1c60b9acSAndroid Build Coastguard Worker return 0;
360*1c60b9acSAndroid Build Coastguard Worker
361*1c60b9acSAndroid Build Coastguard Worker default:
362*1c60b9acSAndroid Build Coastguard Worker break;
363*1c60b9acSAndroid Build Coastguard Worker }
364*1c60b9acSAndroid Build Coastguard Worker
365*1c60b9acSAndroid Build Coastguard Worker return 0;
366*1c60b9acSAndroid Build Coastguard Worker
367*1c60b9acSAndroid Build Coastguard Worker finish_test:
368*1c60b9acSAndroid Build Coastguard Worker interrupted = 1;
369*1c60b9acSAndroid Build Coastguard Worker lws_cancel_service(lws_get_context(wsi));
370*1c60b9acSAndroid Build Coastguard Worker
371*1c60b9acSAndroid Build Coastguard Worker return 0;
372*1c60b9acSAndroid Build Coastguard Worker }
373*1c60b9acSAndroid Build Coastguard Worker
374*1c60b9acSAndroid Build Coastguard Worker static const struct lws_protocols protocols[] = {
375*1c60b9acSAndroid Build Coastguard Worker {
376*1c60b9acSAndroid Build Coastguard Worker .name = "test-mqtt",
377*1c60b9acSAndroid Build Coastguard Worker .callback = callback_mqtt,
378*1c60b9acSAndroid Build Coastguard Worker .per_session_data_size = sizeof(struct pss)
379*1c60b9acSAndroid Build Coastguard Worker },
380*1c60b9acSAndroid Build Coastguard Worker LWS_PROTOCOL_LIST_TERM
381*1c60b9acSAndroid Build Coastguard Worker };
382*1c60b9acSAndroid Build Coastguard Worker
main(int argc,const char ** argv)383*1c60b9acSAndroid Build Coastguard Worker int main(int argc, const char **argv)
384*1c60b9acSAndroid Build Coastguard Worker {
385*1c60b9acSAndroid Build Coastguard Worker lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386*1c60b9acSAndroid Build Coastguard Worker system_notify_cb, "app" };
387*1c60b9acSAndroid Build Coastguard Worker lws_state_notify_link_t *na[] = { ¬ifier, NULL };
388*1c60b9acSAndroid Build Coastguard Worker struct lws_context_creation_info info;
389*1c60b9acSAndroid Build Coastguard Worker struct lws_context *context;
390*1c60b9acSAndroid Build Coastguard Worker const char *p;
391*1c60b9acSAndroid Build Coastguard Worker int n = 0;
392*1c60b9acSAndroid Build Coastguard Worker
393*1c60b9acSAndroid Build Coastguard Worker signal(SIGINT, sigint_handler);
394*1c60b9acSAndroid Build Coastguard Worker memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395*1c60b9acSAndroid Build Coastguard Worker lws_cmdline_option_handle_builtin(argc, argv, &info);
396*1c60b9acSAndroid Build Coastguard Worker
397*1c60b9acSAndroid Build Coastguard Worker do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398*1c60b9acSAndroid Build Coastguard Worker if (do_ssl)
399*1c60b9acSAndroid Build Coastguard Worker info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400*1c60b9acSAndroid Build Coastguard Worker
401*1c60b9acSAndroid Build Coastguard Worker if (lws_cmdline_option(argc, argv, "-p"))
402*1c60b9acSAndroid Build Coastguard Worker pipeline = 1;
403*1c60b9acSAndroid Build Coastguard Worker
404*1c60b9acSAndroid Build Coastguard Worker if ((p = lws_cmdline_option(argc, argv, "-i")))
405*1c60b9acSAndroid Build Coastguard Worker stagger_us = atoi(p);
406*1c60b9acSAndroid Build Coastguard Worker
407*1c60b9acSAndroid Build Coastguard Worker if ((p = lws_cmdline_option(argc, argv, "-c")))
408*1c60b9acSAndroid Build Coastguard Worker count = atoi(p);
409*1c60b9acSAndroid Build Coastguard Worker
410*1c60b9acSAndroid Build Coastguard Worker if (count > COUNT) {
411*1c60b9acSAndroid Build Coastguard Worker count = COUNT;
412*1c60b9acSAndroid Build Coastguard Worker lwsl_err("%s: clipped count at max %d\n", __func__, count);
413*1c60b9acSAndroid Build Coastguard Worker }
414*1c60b9acSAndroid Build Coastguard Worker
415*1c60b9acSAndroid Build Coastguard Worker lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416*1c60b9acSAndroid Build Coastguard Worker do_ssl ? "tls enabled": "unencrypted");
417*1c60b9acSAndroid Build Coastguard Worker
418*1c60b9acSAndroid Build Coastguard Worker info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419*1c60b9acSAndroid Build Coastguard Worker info.protocols = protocols;
420*1c60b9acSAndroid Build Coastguard Worker info.register_notifier_list = na;
421*1c60b9acSAndroid Build Coastguard Worker info.fd_limit_per_thread = 1 + COUNT + 1;
422*1c60b9acSAndroid Build Coastguard Worker info.retry_and_idle_policy = &retry;
423*1c60b9acSAndroid Build Coastguard Worker
424*1c60b9acSAndroid Build Coastguard Worker #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425*1c60b9acSAndroid Build Coastguard Worker /*
426*1c60b9acSAndroid Build Coastguard Worker * OpenSSL uses the system trust store. mbedTLS has to be told which
427*1c60b9acSAndroid Build Coastguard Worker * CA to trust explicitly.
428*1c60b9acSAndroid Build Coastguard Worker */
429*1c60b9acSAndroid Build Coastguard Worker info.client_ssl_ca_filepath = "./mosq-ca.crt";
430*1c60b9acSAndroid Build Coastguard Worker #endif
431*1c60b9acSAndroid Build Coastguard Worker
432*1c60b9acSAndroid Build Coastguard Worker context = lws_create_context(&info);
433*1c60b9acSAndroid Build Coastguard Worker if (!context) {
434*1c60b9acSAndroid Build Coastguard Worker lwsl_err("lws init failed\n");
435*1c60b9acSAndroid Build Coastguard Worker return 1;
436*1c60b9acSAndroid Build Coastguard Worker }
437*1c60b9acSAndroid Build Coastguard Worker
438*1c60b9acSAndroid Build Coastguard Worker /* Event loop */
439*1c60b9acSAndroid Build Coastguard Worker while (n >= 0 && !interrupted)
440*1c60b9acSAndroid Build Coastguard Worker n = lws_service(context, 0);
441*1c60b9acSAndroid Build Coastguard Worker
442*1c60b9acSAndroid Build Coastguard Worker lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443*1c60b9acSAndroid Build Coastguard Worker okay != count ? "failed" : "OK");
444*1c60b9acSAndroid Build Coastguard Worker lws_context_destroy(context);
445*1c60b9acSAndroid Build Coastguard Worker
446*1c60b9acSAndroid Build Coastguard Worker return okay != count;
447*1c60b9acSAndroid Build Coastguard Worker }
448