1*1c60b9acSAndroid Build Coastguard Worker /*
2*1c60b9acSAndroid Build Coastguard Worker  * lws-minimal-mqtt-client
3*1c60b9acSAndroid Build Coastguard Worker  *
4*1c60b9acSAndroid Build Coastguard Worker  * Written in 2010-2020 by Andy Green <[email protected]>
5*1c60b9acSAndroid Build Coastguard Worker  *                         Sakthi Kannan <[email protected]>
6*1c60b9acSAndroid Build Coastguard Worker  *
7*1c60b9acSAndroid Build Coastguard Worker  * This file is made available under the Creative Commons CC0 1.0
8*1c60b9acSAndroid Build Coastguard Worker  * Universal Public Domain Dedication.
9*1c60b9acSAndroid Build Coastguard Worker  */
10*1c60b9acSAndroid Build Coastguard Worker 
11*1c60b9acSAndroid Build Coastguard Worker #include <libwebsockets.h>
12*1c60b9acSAndroid Build Coastguard Worker #include <string.h>
13*1c60b9acSAndroid Build Coastguard Worker #include <signal.h>
14*1c60b9acSAndroid Build Coastguard Worker #if defined(WIN32)
15*1c60b9acSAndroid Build Coastguard Worker #define HAVE_STRUCT_TIMESPEC
16*1c60b9acSAndroid Build Coastguard Worker #if defined(pid_t)
17*1c60b9acSAndroid Build Coastguard Worker #undef pid_t
18*1c60b9acSAndroid Build Coastguard Worker #endif
19*1c60b9acSAndroid Build Coastguard Worker #endif
20*1c60b9acSAndroid Build Coastguard Worker #include <pthread.h>
21*1c60b9acSAndroid Build Coastguard Worker #include <assert.h>
22*1c60b9acSAndroid Build Coastguard Worker 
23*1c60b9acSAndroid Build Coastguard Worker #define COUNT 8
24*1c60b9acSAndroid Build Coastguard Worker 
25*1c60b9acSAndroid Build Coastguard Worker struct test_item {
26*1c60b9acSAndroid Build Coastguard Worker 	struct lws_context		*context;
27*1c60b9acSAndroid Build Coastguard Worker 	struct lws			*wsi;
28*1c60b9acSAndroid Build Coastguard Worker 	lws_sorted_usec_list_t		sul;
29*1c60b9acSAndroid Build Coastguard Worker } items[COUNT];
30*1c60b9acSAndroid Build Coastguard Worker 
31*1c60b9acSAndroid Build Coastguard Worker enum {
32*1c60b9acSAndroid Build Coastguard Worker 	STATE_SUBSCRIBE,	/* subscribe to the topic */
33*1c60b9acSAndroid Build Coastguard Worker 	STATE_WAIT_SUBACK,
34*1c60b9acSAndroid Build Coastguard Worker 	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
35*1c60b9acSAndroid Build Coastguard Worker 	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
36*1c60b9acSAndroid Build Coastguard Worker 	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
37*1c60b9acSAndroid Build Coastguard Worker 	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
38*1c60b9acSAndroid Build Coastguard Worker 	STATE_UNSUBSCRIBE,
39*1c60b9acSAndroid Build Coastguard Worker 	STATE_WAIT_UNSUBACK,
40*1c60b9acSAndroid Build Coastguard Worker 
41*1c60b9acSAndroid Build Coastguard Worker 	STATE_TEST_FINISH
42*1c60b9acSAndroid Build Coastguard Worker };
43*1c60b9acSAndroid Build Coastguard Worker 
44*1c60b9acSAndroid Build Coastguard Worker static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45*1c60b9acSAndroid Build Coastguard Worker 	   done, count = COUNT;
46*1c60b9acSAndroid Build Coastguard Worker 
47*1c60b9acSAndroid Build Coastguard Worker static const lws_retry_bo_t retry = {
48*1c60b9acSAndroid Build Coastguard Worker 	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
49*1c60b9acSAndroid Build Coastguard Worker 	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
50*1c60b9acSAndroid Build Coastguard Worker };
51*1c60b9acSAndroid Build Coastguard Worker 
52*1c60b9acSAndroid Build Coastguard Worker static const lws_mqtt_client_connect_param_t client_connect_param = {
53*1c60b9acSAndroid Build Coastguard Worker 	.client_id			= NULL,
54*1c60b9acSAndroid Build Coastguard Worker 	.keep_alive			= 60,
55*1c60b9acSAndroid Build Coastguard Worker 	.clean_start			= 1,
56*1c60b9acSAndroid Build Coastguard Worker 	.client_id_nofree		= 1,
57*1c60b9acSAndroid Build Coastguard Worker 	.username_nofree		= 1,
58*1c60b9acSAndroid Build Coastguard Worker 	.password_nofree		= 1,
59*1c60b9acSAndroid Build Coastguard Worker 	.will_param = {
60*1c60b9acSAndroid Build Coastguard Worker 		.topic			= "good/bye",
61*1c60b9acSAndroid Build Coastguard Worker 		.message		= "sign-off",
62*1c60b9acSAndroid Build Coastguard Worker 		.qos			= 0,
63*1c60b9acSAndroid Build Coastguard Worker 		.retain			= 0,
64*1c60b9acSAndroid Build Coastguard Worker 	},
65*1c60b9acSAndroid Build Coastguard Worker 	.username			= "lwsUser",
66*1c60b9acSAndroid Build Coastguard Worker 	.password			= "mySecretPassword",
67*1c60b9acSAndroid Build Coastguard Worker };
68*1c60b9acSAndroid Build Coastguard Worker 
69*1c60b9acSAndroid Build Coastguard Worker static lws_mqtt_topic_elem_t topics[] = {
70*1c60b9acSAndroid Build Coastguard Worker 	[0] = { .name = "test/topic0", .qos = QOS0 },
71*1c60b9acSAndroid Build Coastguard Worker 	[1] = { .name = "test/topic1", .qos = QOS1 },
72*1c60b9acSAndroid Build Coastguard Worker };
73*1c60b9acSAndroid Build Coastguard Worker 
74*1c60b9acSAndroid Build Coastguard Worker static lws_mqtt_subscribe_param_t sub_param = {
75*1c60b9acSAndroid Build Coastguard Worker 	.topic				= &topics[0],
76*1c60b9acSAndroid Build Coastguard Worker 	.num_topics			= LWS_ARRAY_SIZE(topics),
77*1c60b9acSAndroid Build Coastguard Worker };
78*1c60b9acSAndroid Build Coastguard Worker 
79*1c60b9acSAndroid Build Coastguard Worker static const char * const test_string =
80*1c60b9acSAndroid Build Coastguard Worker 	"No one would have believed in the last years of the nineteenth "
81*1c60b9acSAndroid Build Coastguard Worker 	"century that this world was being watched keenly and closely by "
82*1c60b9acSAndroid Build Coastguard Worker 	"intelligences greater than man's and yet as mortal as his own; that as "
83*1c60b9acSAndroid Build Coastguard Worker 	"men busied themselves about their various concerns they were "
84*1c60b9acSAndroid Build Coastguard Worker 	"scrutinised and studied, perhaps almost as narrowly as a man with a "
85*1c60b9acSAndroid Build Coastguard Worker 	"microscope might scrutinise the transient creatures that swarm and "
86*1c60b9acSAndroid Build Coastguard Worker 	"multiply in a drop of water.  With infinite complacency men went to "
87*1c60b9acSAndroid Build Coastguard Worker 	"and fro over this globe about their little affairs, serene in their "
88*1c60b9acSAndroid Build Coastguard Worker 	"assurance of their empire over matter. It is possible that the "
89*1c60b9acSAndroid Build Coastguard Worker 	"infusoria under the microscope do the same.  No one gave a thought to "
90*1c60b9acSAndroid Build Coastguard Worker 	"the older worlds of space as sources of human danger, or thought of "
91*1c60b9acSAndroid Build Coastguard Worker 	"them only to dismiss the idea of life upon them as impossible or "
92*1c60b9acSAndroid Build Coastguard Worker 	"improbable.  It is curious to recall some of the mental habits of "
93*1c60b9acSAndroid Build Coastguard Worker 	"those departed days.  At most terrestrial men fancied there might be "
94*1c60b9acSAndroid Build Coastguard Worker 	"other men upon Mars, perhaps inferior to themselves and ready to "
95*1c60b9acSAndroid Build Coastguard Worker 	"welcome a missionary enterprise. Yet across the gulf of space, minds "
96*1c60b9acSAndroid Build Coastguard Worker 	"that are to our minds as ours are to those of the beasts that perish, "
97*1c60b9acSAndroid Build Coastguard Worker 	"intellects vast and cool and unsympathetic, regarded this earth with "
98*1c60b9acSAndroid Build Coastguard Worker 	"envious eyes, and slowly and surely drew their plans against us.  And "
99*1c60b9acSAndroid Build Coastguard Worker 	"early in the twentieth century came the great disillusionment. ";
100*1c60b9acSAndroid Build Coastguard Worker 
101*1c60b9acSAndroid Build Coastguard Worker /* this reflects the length of the string above */
102*1c60b9acSAndroid Build Coastguard Worker #define TEST_STRING_LEN 1337
103*1c60b9acSAndroid Build Coastguard Worker 
104*1c60b9acSAndroid Build Coastguard Worker struct pss {
105*1c60b9acSAndroid Build Coastguard Worker 	lws_mqtt_publish_param_t	pub_param;
106*1c60b9acSAndroid Build Coastguard Worker 	int				state;
107*1c60b9acSAndroid Build Coastguard Worker 	size_t				pos;
108*1c60b9acSAndroid Build Coastguard Worker 	int				retries;
109*1c60b9acSAndroid Build Coastguard Worker };
110*1c60b9acSAndroid Build Coastguard Worker 
111*1c60b9acSAndroid Build Coastguard Worker static void
sigint_handler(int sig)112*1c60b9acSAndroid Build Coastguard Worker sigint_handler(int sig)
113*1c60b9acSAndroid Build Coastguard Worker {
114*1c60b9acSAndroid Build Coastguard Worker 	interrupted = 1;
115*1c60b9acSAndroid Build Coastguard Worker }
116*1c60b9acSAndroid Build Coastguard Worker 
117*1c60b9acSAndroid Build Coastguard Worker static int
connect_client(struct lws_context * context,struct test_item * item)118*1c60b9acSAndroid Build Coastguard Worker connect_client(struct lws_context *context, struct test_item *item)
119*1c60b9acSAndroid Build Coastguard Worker {
120*1c60b9acSAndroid Build Coastguard Worker 	struct lws_client_connect_info i;
121*1c60b9acSAndroid Build Coastguard Worker 
122*1c60b9acSAndroid Build Coastguard Worker 	memset(&i, 0, sizeof i);
123*1c60b9acSAndroid Build Coastguard Worker 
124*1c60b9acSAndroid Build Coastguard Worker 	i.mqtt_cp = &client_connect_param;
125*1c60b9acSAndroid Build Coastguard Worker 	i.opaque_user_data = item;
126*1c60b9acSAndroid Build Coastguard Worker 	i.protocol = "test-mqtt";
127*1c60b9acSAndroid Build Coastguard Worker 	i.address = "localhost";
128*1c60b9acSAndroid Build Coastguard Worker 	i.host = "localhost";
129*1c60b9acSAndroid Build Coastguard Worker 	i.pwsi = &item->wsi;
130*1c60b9acSAndroid Build Coastguard Worker 	i.context = context;
131*1c60b9acSAndroid Build Coastguard Worker 	i.method = "MQTT";
132*1c60b9acSAndroid Build Coastguard Worker 	i.alpn = "mqtt";
133*1c60b9acSAndroid Build Coastguard Worker 	i.port = 1883;
134*1c60b9acSAndroid Build Coastguard Worker 
135*1c60b9acSAndroid Build Coastguard Worker 	if (do_ssl) {
136*1c60b9acSAndroid Build Coastguard Worker 		i.ssl_connection = LCCSCF_USE_SSL;
137*1c60b9acSAndroid Build Coastguard Worker 		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138*1c60b9acSAndroid Build Coastguard Worker 		i.port = 8883;
139*1c60b9acSAndroid Build Coastguard Worker 	}
140*1c60b9acSAndroid Build Coastguard Worker 
141*1c60b9acSAndroid Build Coastguard Worker 	if (pipeline)
142*1c60b9acSAndroid Build Coastguard Worker 		i.ssl_connection |= LCCSCF_PIPELINE;
143*1c60b9acSAndroid Build Coastguard Worker 
144*1c60b9acSAndroid Build Coastguard Worker 	if (!lws_client_connect_via_info(&i)) {
145*1c60b9acSAndroid Build Coastguard Worker 		lwsl_err("%s: Client Connect Failed\n", __func__);
146*1c60b9acSAndroid Build Coastguard Worker 
147*1c60b9acSAndroid Build Coastguard Worker 		return 1;
148*1c60b9acSAndroid Build Coastguard Worker 	}
149*1c60b9acSAndroid Build Coastguard Worker 
150*1c60b9acSAndroid Build Coastguard Worker 	return 0;
151*1c60b9acSAndroid Build Coastguard Worker }
152*1c60b9acSAndroid Build Coastguard Worker 
153*1c60b9acSAndroid Build Coastguard Worker static void
start_conn(struct lws_sorted_usec_list * sul)154*1c60b9acSAndroid Build Coastguard Worker start_conn(struct lws_sorted_usec_list *sul)
155*1c60b9acSAndroid Build Coastguard Worker {
156*1c60b9acSAndroid Build Coastguard Worker 	struct test_item *item = lws_container_of(sul, struct test_item, sul);
157*1c60b9acSAndroid Build Coastguard Worker 
158*1c60b9acSAndroid Build Coastguard Worker 	lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159*1c60b9acSAndroid Build Coastguard Worker 
160*1c60b9acSAndroid Build Coastguard Worker 	if (connect_client(item->context, item))
161*1c60b9acSAndroid Build Coastguard Worker 		interrupted = 1;
162*1c60b9acSAndroid Build Coastguard Worker }
163*1c60b9acSAndroid Build Coastguard Worker 
164*1c60b9acSAndroid Build Coastguard Worker 
165*1c60b9acSAndroid Build Coastguard Worker static int
system_notify_cb(lws_state_manager_t * mgr,lws_state_notify_link_t * link,int current,int target)166*1c60b9acSAndroid Build Coastguard Worker system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167*1c60b9acSAndroid Build Coastguard Worker 		 int current, int target)
168*1c60b9acSAndroid Build Coastguard Worker {
169*1c60b9acSAndroid Build Coastguard Worker 	struct lws_context *context = mgr->parent;
170*1c60b9acSAndroid Build Coastguard Worker 	int n;
171*1c60b9acSAndroid Build Coastguard Worker 
172*1c60b9acSAndroid Build Coastguard Worker 	if (current != LWS_SYSTATE_OPERATIONAL ||
173*1c60b9acSAndroid Build Coastguard Worker 	    target != LWS_SYSTATE_OPERATIONAL)
174*1c60b9acSAndroid Build Coastguard Worker 		return 0;
175*1c60b9acSAndroid Build Coastguard Worker 
176*1c60b9acSAndroid Build Coastguard Worker 	/*
177*1c60b9acSAndroid Build Coastguard Worker 	* We delay trying to do the client connection until the protocols have
178*1c60b9acSAndroid Build Coastguard Worker 	* been initialized for each vhost... this happens after we have network
179*1c60b9acSAndroid Build Coastguard Worker 	* and time so we can judge tls cert validity.
180*1c60b9acSAndroid Build Coastguard Worker 	*
181*1c60b9acSAndroid Build Coastguard Worker 	* Stagger the connection attempts so we get some joining before the
182*1c60b9acSAndroid Build Coastguard Worker 	* first has connected and some afterwards
183*1c60b9acSAndroid Build Coastguard Worker 	*/
184*1c60b9acSAndroid Build Coastguard Worker 
185*1c60b9acSAndroid Build Coastguard Worker 	for (n = 0; n < count; n++) {
186*1c60b9acSAndroid Build Coastguard Worker 		items[n].context = context;
187*1c60b9acSAndroid Build Coastguard Worker 		lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188*1c60b9acSAndroid Build Coastguard Worker 				 n * stagger_us);
189*1c60b9acSAndroid Build Coastguard Worker 	}
190*1c60b9acSAndroid Build Coastguard Worker 
191*1c60b9acSAndroid Build Coastguard Worker 	return 0;
192*1c60b9acSAndroid Build Coastguard Worker }
193*1c60b9acSAndroid Build Coastguard Worker 
194*1c60b9acSAndroid Build Coastguard Worker 
195*1c60b9acSAndroid Build Coastguard Worker static int
callback_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)196*1c60b9acSAndroid Build Coastguard Worker callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197*1c60b9acSAndroid Build Coastguard Worker 	      void *user, void *in, size_t len)
198*1c60b9acSAndroid Build Coastguard Worker {
199*1c60b9acSAndroid Build Coastguard Worker 	struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200*1c60b9acSAndroid Build Coastguard Worker 	struct pss *pss = (struct pss *)user;
201*1c60b9acSAndroid Build Coastguard Worker 	lws_mqtt_publish_param_t *pub;
202*1c60b9acSAndroid Build Coastguard Worker 	size_t chunk;
203*1c60b9acSAndroid Build Coastguard Worker 
204*1c60b9acSAndroid Build Coastguard Worker 	switch (reason) {
205*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206*1c60b9acSAndroid Build Coastguard Worker 		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207*1c60b9acSAndroid Build Coastguard Worker 			 in ? (char *)in : "(null)");
208*1c60b9acSAndroid Build Coastguard Worker 
209*1c60b9acSAndroid Build Coastguard Worker 		if (++done == count)
210*1c60b9acSAndroid Build Coastguard Worker 			goto finish_test;
211*1c60b9acSAndroid Build Coastguard Worker 		break;
212*1c60b9acSAndroid Build Coastguard Worker 
213*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215*1c60b9acSAndroid Build Coastguard Worker 
216*1c60b9acSAndroid Build Coastguard Worker 		if (++done == count)
217*1c60b9acSAndroid Build Coastguard Worker 			goto finish_test;
218*1c60b9acSAndroid Build Coastguard Worker 		break;
219*1c60b9acSAndroid Build Coastguard Worker 
220*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222*1c60b9acSAndroid Build Coastguard Worker 		lws_callback_on_writable(wsi);
223*1c60b9acSAndroid Build Coastguard Worker 
224*1c60b9acSAndroid Build Coastguard Worker 		return 0;
225*1c60b9acSAndroid Build Coastguard Worker 
226*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_SUBSCRIBED:
227*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228*1c60b9acSAndroid Build Coastguard Worker 
229*1c60b9acSAndroid Build Coastguard Worker 		/* then we can get on with the actual test part */
230*1c60b9acSAndroid Build Coastguard Worker 
231*1c60b9acSAndroid Build Coastguard Worker 		pss->state++;
232*1c60b9acSAndroid Build Coastguard Worker 		lws_callback_on_writable(wsi);
233*1c60b9acSAndroid Build Coastguard Worker 		break;
234*1c60b9acSAndroid Build Coastguard Worker 
235*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237*1c60b9acSAndroid Build Coastguard Worker 			  __func__, (int)(item - &item[0]), wsi);
238*1c60b9acSAndroid Build Coastguard Worker 		okay++;
239*1c60b9acSAndroid Build Coastguard Worker 
240*1c60b9acSAndroid Build Coastguard Worker 		if (++pss->state == STATE_TEST_FINISH) {
241*1c60b9acSAndroid Build Coastguard Worker 			lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242*1c60b9acSAndroid Build Coastguard Worker 				    __func__, (int)(item - &items[0]), okay, count);
243*1c60b9acSAndroid Build Coastguard Worker 			/* We are done, request to close */
244*1c60b9acSAndroid Build Coastguard Worker 			return -1;
245*1c60b9acSAndroid Build Coastguard Worker 		}
246*1c60b9acSAndroid Build Coastguard Worker 		break;
247*1c60b9acSAndroid Build Coastguard Worker 
248*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249*1c60b9acSAndroid Build Coastguard Worker 
250*1c60b9acSAndroid Build Coastguard Worker 		/*
251*1c60b9acSAndroid Build Coastguard Worker 		 * Extra WRITEABLE may appear here other than ones we asked
252*1c60b9acSAndroid Build Coastguard Worker 		 * for, so we must consult our own state to decide if we want
253*1c60b9acSAndroid Build Coastguard Worker 		 * to make use of the opportunity
254*1c60b9acSAndroid Build Coastguard Worker 		 */
255*1c60b9acSAndroid Build Coastguard Worker 
256*1c60b9acSAndroid Build Coastguard Worker 		switch (pss->state) {
257*1c60b9acSAndroid Build Coastguard Worker 		case STATE_SUBSCRIBE:
258*1c60b9acSAndroid Build Coastguard Worker 			lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259*1c60b9acSAndroid Build Coastguard Worker 
260*1c60b9acSAndroid Build Coastguard Worker 			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261*1c60b9acSAndroid Build Coastguard Worker 				lwsl_notice("%s: subscribe failed\n", __func__);
262*1c60b9acSAndroid Build Coastguard Worker 
263*1c60b9acSAndroid Build Coastguard Worker 				return -1;
264*1c60b9acSAndroid Build Coastguard Worker 			}
265*1c60b9acSAndroid Build Coastguard Worker 			pss->state++;
266*1c60b9acSAndroid Build Coastguard Worker 			break;
267*1c60b9acSAndroid Build Coastguard Worker 
268*1c60b9acSAndroid Build Coastguard Worker 		case STATE_PUBLISH_QOS0:
269*1c60b9acSAndroid Build Coastguard Worker 		case STATE_PUBLISH_QOS1:
270*1c60b9acSAndroid Build Coastguard Worker 
271*1c60b9acSAndroid Build Coastguard Worker 			lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272*1c60b9acSAndroid Build Coastguard Worker 
273*1c60b9acSAndroid Build Coastguard Worker 			pss->pub_param.topic	= pss->state == STATE_PUBLISH_QOS0 ?
274*1c60b9acSAndroid Build Coastguard Worker 						"test/topic0" : "test/topic1";
275*1c60b9acSAndroid Build Coastguard Worker 			pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276*1c60b9acSAndroid Build Coastguard Worker 			pss->pub_param.qos =
277*1c60b9acSAndroid Build Coastguard Worker 				pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278*1c60b9acSAndroid Build Coastguard Worker 			pss->pub_param.payload_len = TEST_STRING_LEN;
279*1c60b9acSAndroid Build Coastguard Worker 
280*1c60b9acSAndroid Build Coastguard Worker 			/* We send the message out 300 bytes or less at at time */
281*1c60b9acSAndroid Build Coastguard Worker 
282*1c60b9acSAndroid Build Coastguard Worker 			chunk = 300;
283*1c60b9acSAndroid Build Coastguard Worker 
284*1c60b9acSAndroid Build Coastguard Worker 			if (chunk > TEST_STRING_LEN - pss->pos)
285*1c60b9acSAndroid Build Coastguard Worker 				chunk = TEST_STRING_LEN - pss->pos;
286*1c60b9acSAndroid Build Coastguard Worker 
287*1c60b9acSAndroid Build Coastguard Worker 			lwsl_notice("%s: sending %d at +%d\n", __func__,
288*1c60b9acSAndroid Build Coastguard Worker 					(int)chunk, (int)pss->pos);
289*1c60b9acSAndroid Build Coastguard Worker 
290*1c60b9acSAndroid Build Coastguard Worker 			if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291*1c60b9acSAndroid Build Coastguard Worker 					test_string + pss->pos, (uint32_t)chunk,
292*1c60b9acSAndroid Build Coastguard Worker 					(pss->pos + chunk == TEST_STRING_LEN))) {
293*1c60b9acSAndroid Build Coastguard Worker 				lwsl_notice("%s: publish failed\n", __func__);
294*1c60b9acSAndroid Build Coastguard Worker 				return -1;
295*1c60b9acSAndroid Build Coastguard Worker 			}
296*1c60b9acSAndroid Build Coastguard Worker 
297*1c60b9acSAndroid Build Coastguard Worker 			pss->pos += chunk;
298*1c60b9acSAndroid Build Coastguard Worker 
299*1c60b9acSAndroid Build Coastguard Worker 			if (pss->pos == TEST_STRING_LEN) {
300*1c60b9acSAndroid Build Coastguard Worker 				lwsl_debug("%s: sent message\n", __func__);
301*1c60b9acSAndroid Build Coastguard Worker 				pss->pos = 0;
302*1c60b9acSAndroid Build Coastguard Worker 				pss->state++;
303*1c60b9acSAndroid Build Coastguard Worker 			}
304*1c60b9acSAndroid Build Coastguard Worker 			break;
305*1c60b9acSAndroid Build Coastguard Worker 
306*1c60b9acSAndroid Build Coastguard Worker 		case STATE_UNSUBSCRIBE:
307*1c60b9acSAndroid Build Coastguard Worker 			lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308*1c60b9acSAndroid Build Coastguard Worker 				  __func__, (int)(item - &item[0]), wsi);
309*1c60b9acSAndroid Build Coastguard Worker 			pss->state++;
310*1c60b9acSAndroid Build Coastguard Worker 			if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311*1c60b9acSAndroid Build Coastguard Worker 				lwsl_notice("%s: subscribe failed\n", __func__);
312*1c60b9acSAndroid Build Coastguard Worker 				return -1;
313*1c60b9acSAndroid Build Coastguard Worker 			}
314*1c60b9acSAndroid Build Coastguard Worker 			break;
315*1c60b9acSAndroid Build Coastguard Worker 		default:
316*1c60b9acSAndroid Build Coastguard Worker 			break;
317*1c60b9acSAndroid Build Coastguard Worker 		}
318*1c60b9acSAndroid Build Coastguard Worker 
319*1c60b9acSAndroid Build Coastguard Worker 		return 0;
320*1c60b9acSAndroid Build Coastguard Worker 
321*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_ACK:
322*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323*1c60b9acSAndroid Build Coastguard Worker 		/*
324*1c60b9acSAndroid Build Coastguard Worker 		 * We can forget about the message we just sent, it's done.
325*1c60b9acSAndroid Build Coastguard Worker 		 *
326*1c60b9acSAndroid Build Coastguard Worker 		 * For our test, that's the indication we can close the wsi.
327*1c60b9acSAndroid Build Coastguard Worker 		 */
328*1c60b9acSAndroid Build Coastguard Worker 
329*1c60b9acSAndroid Build Coastguard Worker 		pss->state++;
330*1c60b9acSAndroid Build Coastguard Worker 		if (pss->state != STATE_TEST_FINISH) {
331*1c60b9acSAndroid Build Coastguard Worker 			lws_callback_on_writable(wsi);
332*1c60b9acSAndroid Build Coastguard Worker 			break;
333*1c60b9acSAndroid Build Coastguard Worker 		}
334*1c60b9acSAndroid Build Coastguard Worker 
335*1c60b9acSAndroid Build Coastguard Worker 		break;
336*1c60b9acSAndroid Build Coastguard Worker 
337*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_RESEND:
338*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: MQTT_RESEND\n", __func__);
339*1c60b9acSAndroid Build Coastguard Worker 		/*
340*1c60b9acSAndroid Build Coastguard Worker 		 * We must resend the packet ID mentioned in len
341*1c60b9acSAndroid Build Coastguard Worker 		 */
342*1c60b9acSAndroid Build Coastguard Worker 		if (++pss->retries == 3) {
343*1c60b9acSAndroid Build Coastguard Worker 			lwsl_notice("%s: too many retries\n", __func__);
344*1c60b9acSAndroid Build Coastguard Worker 			return 1; /* kill the connection */
345*1c60b9acSAndroid Build Coastguard Worker 		}
346*1c60b9acSAndroid Build Coastguard Worker 		pss->state--;
347*1c60b9acSAndroid Build Coastguard Worker 		pss->pos = 0;
348*1c60b9acSAndroid Build Coastguard Worker 		break;
349*1c60b9acSAndroid Build Coastguard Worker 
350*1c60b9acSAndroid Build Coastguard Worker 	case LWS_CALLBACK_MQTT_CLIENT_RX:
351*1c60b9acSAndroid Build Coastguard Worker 		pub = (lws_mqtt_publish_param_t *)in;
352*1c60b9acSAndroid Build Coastguard Worker 		assert(pub);
353*1c60b9acSAndroid Build Coastguard Worker 		lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354*1c60b9acSAndroid Build Coastguard Worker 			  (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355*1c60b9acSAndroid Build Coastguard Worker 			  (int)pub->payload_len, (int)len);
356*1c60b9acSAndroid Build Coastguard Worker 
357*1c60b9acSAndroid Build Coastguard Worker 		//lwsl_hexdump_info(pub->payload, len);
358*1c60b9acSAndroid Build Coastguard Worker 
359*1c60b9acSAndroid Build Coastguard Worker 		return 0;
360*1c60b9acSAndroid Build Coastguard Worker 
361*1c60b9acSAndroid Build Coastguard Worker 	default:
362*1c60b9acSAndroid Build Coastguard Worker 		break;
363*1c60b9acSAndroid Build Coastguard Worker 	}
364*1c60b9acSAndroid Build Coastguard Worker 
365*1c60b9acSAndroid Build Coastguard Worker 	return 0;
366*1c60b9acSAndroid Build Coastguard Worker 
367*1c60b9acSAndroid Build Coastguard Worker finish_test:
368*1c60b9acSAndroid Build Coastguard Worker 	interrupted = 1;
369*1c60b9acSAndroid Build Coastguard Worker 	lws_cancel_service(lws_get_context(wsi));
370*1c60b9acSAndroid Build Coastguard Worker 
371*1c60b9acSAndroid Build Coastguard Worker 	return 0;
372*1c60b9acSAndroid Build Coastguard Worker }
373*1c60b9acSAndroid Build Coastguard Worker 
374*1c60b9acSAndroid Build Coastguard Worker static const struct lws_protocols protocols[] = {
375*1c60b9acSAndroid Build Coastguard Worker 	{
376*1c60b9acSAndroid Build Coastguard Worker 		.name			= "test-mqtt",
377*1c60b9acSAndroid Build Coastguard Worker 		.callback		= callback_mqtt,
378*1c60b9acSAndroid Build Coastguard Worker 		.per_session_data_size	= sizeof(struct pss)
379*1c60b9acSAndroid Build Coastguard Worker 	},
380*1c60b9acSAndroid Build Coastguard Worker 	LWS_PROTOCOL_LIST_TERM
381*1c60b9acSAndroid Build Coastguard Worker };
382*1c60b9acSAndroid Build Coastguard Worker 
main(int argc,const char ** argv)383*1c60b9acSAndroid Build Coastguard Worker int main(int argc, const char **argv)
384*1c60b9acSAndroid Build Coastguard Worker {
385*1c60b9acSAndroid Build Coastguard Worker 	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386*1c60b9acSAndroid Build Coastguard Worker 					     system_notify_cb, "app" };
387*1c60b9acSAndroid Build Coastguard Worker 	lws_state_notify_link_t *na[] = { &notifier, NULL };
388*1c60b9acSAndroid Build Coastguard Worker 	struct lws_context_creation_info info;
389*1c60b9acSAndroid Build Coastguard Worker 	struct lws_context *context;
390*1c60b9acSAndroid Build Coastguard Worker 	const char *p;
391*1c60b9acSAndroid Build Coastguard Worker 	int n = 0;
392*1c60b9acSAndroid Build Coastguard Worker 
393*1c60b9acSAndroid Build Coastguard Worker 	signal(SIGINT, sigint_handler);
394*1c60b9acSAndroid Build Coastguard Worker 	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395*1c60b9acSAndroid Build Coastguard Worker 	lws_cmdline_option_handle_builtin(argc, argv, &info);
396*1c60b9acSAndroid Build Coastguard Worker 
397*1c60b9acSAndroid Build Coastguard Worker 	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398*1c60b9acSAndroid Build Coastguard Worker 	if (do_ssl)
399*1c60b9acSAndroid Build Coastguard Worker 		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400*1c60b9acSAndroid Build Coastguard Worker 
401*1c60b9acSAndroid Build Coastguard Worker 	if (lws_cmdline_option(argc, argv, "-p"))
402*1c60b9acSAndroid Build Coastguard Worker 		pipeline = 1;
403*1c60b9acSAndroid Build Coastguard Worker 
404*1c60b9acSAndroid Build Coastguard Worker 	if ((p = lws_cmdline_option(argc, argv, "-i")))
405*1c60b9acSAndroid Build Coastguard Worker 		stagger_us = atoi(p);
406*1c60b9acSAndroid Build Coastguard Worker 
407*1c60b9acSAndroid Build Coastguard Worker 	if ((p = lws_cmdline_option(argc, argv, "-c")))
408*1c60b9acSAndroid Build Coastguard Worker 		count = atoi(p);
409*1c60b9acSAndroid Build Coastguard Worker 
410*1c60b9acSAndroid Build Coastguard Worker 	if (count > COUNT) {
411*1c60b9acSAndroid Build Coastguard Worker 		count = COUNT;
412*1c60b9acSAndroid Build Coastguard Worker 		lwsl_err("%s: clipped count at max %d\n", __func__, count);
413*1c60b9acSAndroid Build Coastguard Worker 	}
414*1c60b9acSAndroid Build Coastguard Worker 
415*1c60b9acSAndroid Build Coastguard Worker 	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416*1c60b9acSAndroid Build Coastguard Worker 			do_ssl ? "tls enabled": "unencrypted");
417*1c60b9acSAndroid Build Coastguard Worker 
418*1c60b9acSAndroid Build Coastguard Worker 	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419*1c60b9acSAndroid Build Coastguard Worker 	info.protocols = protocols;
420*1c60b9acSAndroid Build Coastguard Worker 	info.register_notifier_list = na;
421*1c60b9acSAndroid Build Coastguard Worker 	info.fd_limit_per_thread = 1 + COUNT + 1;
422*1c60b9acSAndroid Build Coastguard Worker 	info.retry_and_idle_policy = &retry;
423*1c60b9acSAndroid Build Coastguard Worker 
424*1c60b9acSAndroid Build Coastguard Worker #if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425*1c60b9acSAndroid Build Coastguard Worker 	/*
426*1c60b9acSAndroid Build Coastguard Worker 	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
427*1c60b9acSAndroid Build Coastguard Worker 	 * CA to trust explicitly.
428*1c60b9acSAndroid Build Coastguard Worker 	 */
429*1c60b9acSAndroid Build Coastguard Worker 	info.client_ssl_ca_filepath = "./mosq-ca.crt";
430*1c60b9acSAndroid Build Coastguard Worker #endif
431*1c60b9acSAndroid Build Coastguard Worker 
432*1c60b9acSAndroid Build Coastguard Worker 	context = lws_create_context(&info);
433*1c60b9acSAndroid Build Coastguard Worker 	if (!context) {
434*1c60b9acSAndroid Build Coastguard Worker 		lwsl_err("lws init failed\n");
435*1c60b9acSAndroid Build Coastguard Worker 		return 1;
436*1c60b9acSAndroid Build Coastguard Worker 	}
437*1c60b9acSAndroid Build Coastguard Worker 
438*1c60b9acSAndroid Build Coastguard Worker 	/* Event loop */
439*1c60b9acSAndroid Build Coastguard Worker 	while (n >= 0 && !interrupted)
440*1c60b9acSAndroid Build Coastguard Worker 		n = lws_service(context, 0);
441*1c60b9acSAndroid Build Coastguard Worker 
442*1c60b9acSAndroid Build Coastguard Worker 	lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443*1c60b9acSAndroid Build Coastguard Worker 			okay != count ? "failed" : "OK");
444*1c60b9acSAndroid Build Coastguard Worker 	lws_context_destroy(context);
445*1c60b9acSAndroid Build Coastguard Worker 
446*1c60b9acSAndroid Build Coastguard Worker 	return okay != count;
447*1c60b9acSAndroid Build Coastguard Worker }
448