1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5 #include <jni.h>
6
7 #include <aws/common/atomics.h>
8 #include <aws/common/condition_variable.h>
9 #include <aws/common/logging.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/string.h>
12 #include <aws/common/thread.h>
13 #include <aws/http/connection.h>
14 #include <aws/http/proxy.h>
15 #include <aws/http/request_response.h>
16 #include <aws/io/channel.h>
17 #include <aws/io/channel_bootstrap.h>
18 #include <aws/io/event_loop.h>
19 #include <aws/io/host_resolver.h>
20 #include <aws/io/socket.h>
21 #include <aws/io/socket_channel_handler.h>
22 #include <aws/io/tls_channel_handler.h>
23 #include <aws/mqtt/client.h>
24
25 #include <ctype.h>
26 #include <string.h>
27
28 #include "crt.h"
29
30 #include "http_request_utils.h"
31 #include "java_class_ids.h"
32 #include "mqtt5_client_jni.h"
33
34 /*******************************************************************************
35 * mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
36 * async ops, and is used to deliver callbacks. Also hangs on to JNI references
37 * to buffers and strings that need to outlive the request
38 ******************************************************************************/
39 struct mqtt_jni_async_callback {
40 struct mqtt_jni_connection *connection;
41 jobject async_callback;
42 struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
43 };
44
45 /*******************************************************************************
46 * mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
47 ******************************************************************************/
48 struct mqtt_jni_connection {
49 struct aws_mqtt_client *client; /* Provided to mqtt_connect */
50 struct aws_mqtt_client_connection *client_connection;
51 struct aws_socket_options socket_options;
52 struct aws_tls_connection_options tls_options;
53
54 JavaVM *jvm;
55 jweak java_mqtt_connection; /* MqttClientConnection instance */
56 struct mqtt_jni_async_callback *on_message;
57
58 struct aws_atomic_var ref_count;
59 };
60
61 /*******************************************************************************
62 * mqtt_jni_ws_handshake - Data needed to perform the async websocket handshake
63 * transform operations. Destroyed when transform is complete.
64 ******************************************************************************/
65 struct mqtt_jni_ws_handshake {
66 struct mqtt_jni_connection *connection;
67 struct aws_http_message *http_request;
68 aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn;
69 void *complete_ctx;
70 };
71
72 static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection);
73
s_mqtt_jni_connection_acquire(struct mqtt_jni_connection * connection)74 static void s_mqtt_jni_connection_acquire(struct mqtt_jni_connection *connection) {
75 size_t old_value = aws_atomic_fetch_add(&connection->ref_count, 1);
76
77 AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection acquire, ref count now = %d", (int)old_value + 1);
78 }
79
80 static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data);
81
s_mqtt_jni_connection_release(struct mqtt_jni_connection * connection)82 static void s_mqtt_jni_connection_release(struct mqtt_jni_connection *connection) {
83 size_t old_value = aws_atomic_fetch_sub(&connection->ref_count, 1);
84
85 AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection release, ref count now = %d", (int)old_value - 1);
86 }
87
88 /* The destroy function is called on Java MqttClientConnection resource release. */
s_mqtt_jni_connection_destroy(struct mqtt_jni_connection * connection)89 static void s_mqtt_jni_connection_destroy(struct mqtt_jni_connection *connection) {
90 /* For mqtt311 client, we have to call aws_mqtt_client_connection_disconnect before releasing the underlying c
91 * connection.*/
92 if (aws_mqtt_client_connection_disconnect(
93 connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) {
94
95 /*
96 * This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown
97 * time. Log it (in case it was unexpected) and then shutdown the underlying connection manually.
98 */
99 AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Client disconnect failed. Release the client connection.");
100 s_on_shutdown_disconnect_complete(connection->client_connection, NULL);
101 }
102 }
103
s_mqtt_jni_async_callback_new(struct mqtt_jni_connection * connection,jobject async_callback,JNIEnv * env)104 static struct mqtt_jni_async_callback *s_mqtt_jni_async_callback_new(
105 struct mqtt_jni_connection *connection,
106 jobject async_callback,
107 JNIEnv *env) {
108
109 if (env == NULL) {
110 return NULL;
111 }
112
113 struct aws_allocator *allocator = aws_jni_get_allocator();
114 /* allocate cannot fail */
115 struct mqtt_jni_async_callback *callback = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_async_callback));
116 callback->connection = connection;
117 callback->async_callback = async_callback ? (*env)->NewGlobalRef(env, async_callback) : NULL;
118 aws_byte_buf_init(&callback->buffer, aws_jni_get_allocator(), 0);
119
120 return callback;
121 }
122
s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback * callback,JNIEnv * env)123 static void s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
124 AWS_FATAL_ASSERT(callback && callback->connection);
125
126 if (env == NULL) {
127 return;
128 }
129
130 if (callback->async_callback) {
131 (*env)->DeleteGlobalRef(env, callback->async_callback);
132 }
133
134 aws_byte_buf_clean_up(&callback->buffer);
135
136 struct aws_allocator *allocator = aws_jni_get_allocator();
137 aws_mem_release(allocator, callback);
138 }
139
s_new_mqtt_exception(JNIEnv * env,int error_code)140 static jobject s_new_mqtt_exception(JNIEnv *env, int error_code) {
141 jobject exception = (*env)->NewObject(
142 env, mqtt_exception_properties.jni_mqtt_exception, mqtt_exception_properties.jni_constructor, error_code);
143 return exception;
144 }
145
146 /* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
147 #if UINTPTR_MAX == 0xffffffff
148 # if defined(_MSC_VER)
149 # pragma warning(push)
150 # pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
151 # else
152 # pragma GCC diagnostic push
153 # pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
154 # pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
155 # endif
156 #endif
157
158 /*******************************************************************************
159 * new
160 ******************************************************************************/
161 static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data);
s_on_connection_complete(struct aws_mqtt_client_connection * client_connection,int error_code,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)162 static void s_on_connection_complete(
163 struct aws_mqtt_client_connection *client_connection,
164 int error_code,
165 enum aws_mqtt_connect_return_code return_code,
166 bool session_present,
167 void *user_data) {
168 (void)client_connection;
169 (void)return_code;
170
171 struct mqtt_jni_async_callback *connect_callback = user_data;
172 struct mqtt_jni_connection *connection = connect_callback->connection;
173
174 /********** JNI ENV ACQUIRE **********/
175 JavaVM *jvm = connection->jvm;
176 JNIEnv *env = aws_jni_acquire_thread_env(jvm);
177 if (env == NULL) {
178 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
179 return;
180 }
181
182 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
183 if (mqtt_connection != NULL) {
184 (*env)->CallVoidMethod(
185 env, mqtt_connection, mqtt_connection_properties.on_connection_complete, error_code, session_present);
186
187 (*env)->DeleteLocalRef(env, mqtt_connection);
188
189 if (aws_jni_check_and_clear_exception(env)) {
190 aws_jni_release_thread_env(connection->jvm, env);
191 /********** JNI ENV RELEASE EARLY OUT **********/
192 aws_mqtt_client_connection_disconnect(client_connection, s_on_connection_disconnected, connect_callback);
193 return; /* callback and ref count will be cleaned up in s_on_connection_disconnected */
194 }
195 }
196
197 s_mqtt_jni_async_callback_destroy(connect_callback, env);
198
199 aws_jni_release_thread_env(jvm, env);
200 /********** JNI ENV RELEASE **********/
201 s_mqtt_jni_connection_release(connection);
202 }
203
s_on_connection_interrupted_internal(struct mqtt_jni_connection * connection,int error_code,jobject ack_callback,JNIEnv * env)204 static void s_on_connection_interrupted_internal(
205 struct mqtt_jni_connection *connection,
206 int error_code,
207 jobject ack_callback,
208 JNIEnv *env) {
209
210 AWS_FATAL_ASSERT(env);
211
212 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
213 if (mqtt_connection) {
214 (*env)->CallVoidMethod(
215 env, mqtt_connection, mqtt_connection_properties.on_connection_interrupted, error_code, ack_callback);
216
217 (*env)->DeleteLocalRef(env, mqtt_connection);
218
219 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
220 }
221 }
222
s_on_connection_interrupted(struct aws_mqtt_client_connection * client_connection,int error_code,void * user_data)223 static void s_on_connection_interrupted(
224 struct aws_mqtt_client_connection *client_connection,
225 int error_code,
226 void *user_data) {
227 (void)client_connection;
228
229 struct mqtt_jni_connection *connection = user_data;
230
231 /********** JNI ENV ACQUIRE **********/
232 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
233 if (env == NULL) {
234 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
235 return;
236 }
237
238 s_on_connection_interrupted_internal(user_data, error_code, NULL, env);
239
240 aws_jni_release_thread_env(connection->jvm, env);
241 /********** JNI ENV RELEASE **********/
242 }
243
s_on_connection_success(struct aws_mqtt_client_connection * client_connection,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)244 static void s_on_connection_success(
245 struct aws_mqtt_client_connection *client_connection,
246 enum aws_mqtt_connect_return_code return_code,
247 bool session_present,
248 void *user_data) {
249 (void)client_connection;
250 (void)return_code;
251
252 struct mqtt_jni_connection *connection = user_data;
253
254 /********** JNI ENV ACQUIRE **********/
255 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
256 if (env == NULL) {
257 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
258 return;
259 }
260 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
261 if (mqtt_connection) {
262
263 (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_success, session_present);
264
265 (*env)->DeleteLocalRef(env, mqtt_connection);
266
267 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
268 }
269 aws_jni_release_thread_env(connection->jvm, env);
270 /********** JNI ENV RELEASE **********/
271 }
272
s_on_connection_failure(struct aws_mqtt_client_connection * client_connection,int error_code,void * user_data)273 static void s_on_connection_failure(
274 struct aws_mqtt_client_connection *client_connection,
275 int error_code,
276 void *user_data) {
277 (void)client_connection;
278
279 struct mqtt_jni_connection *connection = user_data;
280
281 /********** JNI ENV ACQUIRE **********/
282 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
283 if (env == NULL) {
284 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
285 return;
286 }
287 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
288 if (mqtt_connection) {
289 (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_failure, error_code);
290
291 (*env)->DeleteLocalRef(env, mqtt_connection);
292
293 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
294 }
295 aws_jni_release_thread_env(connection->jvm, env);
296 /********** JNI ENV RELEASE **********/
297 }
298
s_on_connection_resumed(struct aws_mqtt_client_connection * client_connection,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)299 static void s_on_connection_resumed(
300 struct aws_mqtt_client_connection *client_connection,
301 enum aws_mqtt_connect_return_code return_code,
302 bool session_present,
303 void *user_data) {
304 (void)client_connection;
305 (void)return_code;
306
307 struct mqtt_jni_connection *connection = user_data;
308
309 /********** JNI ENV ACQUIRE **********/
310 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
311 if (env == NULL) {
312 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
313 return;
314 }
315
316 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
317 if (mqtt_connection) {
318
319 (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_resumed, session_present);
320
321 (*env)->DeleteLocalRef(env, mqtt_connection);
322
323 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
324 }
325
326 aws_jni_release_thread_env(connection->jvm, env);
327 /********** JNI ENV RELEASE **********/
328 }
329
s_on_connection_disconnected(struct aws_mqtt_client_connection * client_connection,void * user_data)330 static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data) {
331 (void)client_connection;
332
333 struct mqtt_jni_async_callback *connect_callback = user_data;
334 struct mqtt_jni_connection *jni_connection = connect_callback->connection;
335
336 /********** JNI ENV ACQUIRE **********/
337 JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
338 if (env == NULL) {
339 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
340 return;
341 }
342
343 s_on_connection_interrupted_internal(connect_callback->connection, 0, connect_callback->async_callback, env);
344
345 s_mqtt_jni_async_callback_destroy(connect_callback, env);
346
347 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
348
349 aws_jni_release_thread_env(jni_connection->jvm, env);
350 /********** JNI ENV RELEASE **********/
351
352 /* Do not call release here: s_on_connection_closed will (normally) be called
353 * right after and so we can call the release there instead. */
354 }
355
s_on_connection_closed(struct aws_mqtt_client_connection * client_connection,struct on_connection_closed_data * data,void * user_data)356 static void s_on_connection_closed(
357 struct aws_mqtt_client_connection *client_connection,
358 struct on_connection_closed_data *data,
359 void *user_data) {
360 (void)client_connection;
361 (void)data;
362
363 struct mqtt_jni_connection *connection = user_data;
364
365 /********** JNI ENV ACQUIRE **********/
366 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
367 if (env == NULL) {
368 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
369 return;
370 }
371
372 // Make sure the Java object has not been garbage collected
373 if (!(*env)->IsSameObject(env, connection->java_mqtt_connection, NULL)) {
374 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
375 if (mqtt_connection) {
376 (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_closed);
377 (*env)->DeleteLocalRef(env, mqtt_connection);
378 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
379 }
380 }
381 aws_jni_release_thread_env(connection->jvm, env);
382 /********** JNI ENV RELEASE **********/
383 }
384
s_on_connection_terminated(void * user_data)385 static void s_on_connection_terminated(void *user_data) {
386
387 struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data;
388
389 /********** JNI ENV ACQUIRE **********/
390 JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
391 if (env == NULL) {
392 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
393 return;
394 }
395
396 jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection);
397 if (mqtt_connection != NULL) {
398 (*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references);
399
400 (*env)->DeleteLocalRef(env, mqtt_connection);
401
402 aws_jni_check_and_clear_exception(env);
403 }
404
405 JavaVM *jvm = jni_connection->jvm;
406
407 s_mqtt_connection_destroy(env, jni_connection);
408 aws_jni_release_thread_env(jvm, env);
409 /********** JNI ENV RELEASE **********/
410 }
411
s_mqtt_connection_new(JNIEnv * env,struct aws_mqtt_client * client3,struct aws_mqtt5_client_java_jni * client5_jni,jobject java_mqtt_connection)412 static struct mqtt_jni_connection *s_mqtt_connection_new(
413 JNIEnv *env,
414 struct aws_mqtt_client *client3,
415 struct aws_mqtt5_client_java_jni *client5_jni,
416 jobject java_mqtt_connection) {
417 struct aws_allocator *allocator = aws_jni_get_allocator();
418
419 struct mqtt_jni_connection *connection = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_connection));
420 if (!connection) {
421 aws_jni_throw_runtime_exception(
422 env, "MqttClientConnection.mqtt_connect: Out of memory allocating JNI connection");
423 return NULL;
424 }
425
426 aws_atomic_store_int(&connection->ref_count, 1);
427 connection->java_mqtt_connection = (*env)->NewWeakGlobalRef(env, java_mqtt_connection);
428 jint jvmresult = (*env)->GetJavaVM(env, &connection->jvm);
429 AWS_FATAL_ASSERT(jvmresult == 0);
430
431 if (client3 != NULL) {
432 connection->client = client3;
433 connection->client_connection = aws_mqtt_client_connection_new(client3);
434 } else if (client5_jni != NULL) {
435 connection->client_connection = aws_mqtt_client_connection_new_from_mqtt5_client(client5_jni->client);
436 }
437
438 if (!connection->client_connection) {
439 aws_jni_throw_runtime_exception(
440 env,
441 "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to create new "
442 "connection");
443 goto on_error;
444 }
445
446 if (aws_mqtt_client_connection_set_connection_termination_handler(
447 connection->client_connection, s_on_connection_terminated, connection)) {
448 aws_jni_throw_runtime_exception(
449 env,
450 "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to set termination "
451 "callback");
452 goto on_error;
453 }
454
455 return connection;
456
457 on_error:
458
459 s_mqtt_jni_connection_release(connection);
460
461 return NULL;
462 }
463
s_mqtt_connection_destroy(JNIEnv * env,struct mqtt_jni_connection * connection)464 static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection) {
465 if (connection == NULL) {
466 return;
467 }
468
469 if (connection->on_message) {
470 s_mqtt_jni_async_callback_destroy(connection->on_message, env);
471 }
472
473 if (connection->java_mqtt_connection) {
474 (*env)->DeleteWeakGlobalRef(env, connection->java_mqtt_connection);
475 }
476
477 aws_tls_connection_options_clean_up(&connection->tls_options);
478
479 struct aws_allocator *allocator = aws_jni_get_allocator();
480 aws_mem_release(allocator, connection);
481 }
482
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(JNIEnv * env,jclass jni_class,jlong jni_client,jobject jni_mqtt_connection)483 JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(
484 JNIEnv *env,
485 jclass jni_class,
486 jlong jni_client,
487 jobject jni_mqtt_connection) {
488 (void)jni_class;
489 aws_cache_jni_ids(env);
490
491 struct mqtt_jni_connection *connection = NULL;
492 struct aws_mqtt_client *client3 = (struct aws_mqtt_client *)jni_client;
493 if (!client3) {
494 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt3 Client is invalid/null");
495 return (jlong)NULL;
496 }
497
498 connection = s_mqtt_connection_new(env, client3, NULL, jni_mqtt_connection);
499 if (!connection) {
500 return (jlong)NULL;
501 }
502
503 aws_mqtt_client_connection_set_connection_result_handlers(
504 connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
505 aws_mqtt_client_connection_set_connection_interruption_handlers(
506 connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
507 aws_mqtt_client_connection_set_connection_closed_handler(
508 connection->client_connection, s_on_connection_closed, connection);
509
510 return (jlong)connection;
511 }
512
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(JNIEnv * env,jclass jni_class,jlong jni_client,jobject jni_mqtt_connection)513 JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(
514 JNIEnv *env,
515 jclass jni_class,
516 jlong jni_client,
517 jobject jni_mqtt_connection) {
518 (void)jni_class;
519 aws_cache_jni_ids(env);
520
521 struct mqtt_jni_connection *connection = NULL;
522 struct aws_mqtt5_client_java_jni *client5_jni = (struct aws_mqtt5_client_java_jni *)jni_client;
523 if (!client5_jni) {
524 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt5 Client is invalid/null");
525 return (jlong)NULL;
526 }
527
528 connection = s_mqtt_connection_new(env, NULL, client5_jni, jni_mqtt_connection);
529 if (!connection) {
530 return (jlong)NULL;
531 }
532
533 aws_mqtt_client_connection_set_connection_result_handlers(
534 connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
535 aws_mqtt_client_connection_set_connection_interruption_handlers(
536 connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
537 aws_mqtt_client_connection_set_connection_closed_handler(
538 connection->client_connection, s_on_connection_closed, connection);
539
540 return (jlong)connection;
541 }
542
543 /* The disconnect callback called on shutdown. We will release the underlying connection here, which should init the
544 ** client shutdown process. Then on termination callback, we will finally release all jni resources.
545 */
s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection * connection,void * user_data)546 static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) {
547 (void)user_data;
548
549 AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references");
550
551 /* Release the underlying mqtt connection */
552 aws_mqtt_client_connection_release(connection);
553 }
554
555 /*******************************************************************************
556 * clean_up
557 ******************************************************************************/
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(JNIEnv * env,jclass jni_class,jlong jni_connection)558 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(
559 JNIEnv *env,
560 jclass jni_class,
561 jlong jni_connection) {
562 (void)jni_class;
563 (void)env;
564 aws_cache_jni_ids(env);
565
566 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
567 s_mqtt_jni_connection_destroy(connection);
568 }
569
570 /*******************************************************************************
571 * connect
572 ******************************************************************************/
573 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_endpoint,jint jni_port,jlong jni_socket_options,jlong jni_tls_ctx,jstring jni_client_id,jboolean jni_clean_session,jint keep_alive_secs,jshort ping_timeout_ms,jint protocol_operation_timeout_ms)574 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(
575 JNIEnv *env,
576 jclass jni_class,
577 jlong jni_connection,
578 jstring jni_endpoint,
579 jint jni_port,
580 jlong jni_socket_options,
581 jlong jni_tls_ctx,
582 jstring jni_client_id,
583 jboolean jni_clean_session,
584 jint keep_alive_secs,
585 jshort ping_timeout_ms,
586 jint protocol_operation_timeout_ms) {
587 (void)jni_class;
588 aws_cache_jni_ids(env);
589
590 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
591 if (!connection) {
592 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Connection is invalid/null");
593 return;
594 }
595
596 struct aws_byte_cursor client_id;
597 AWS_ZERO_STRUCT(client_id);
598 struct aws_byte_cursor endpoint = aws_jni_byte_cursor_from_jstring_acquire(env, jni_endpoint);
599 uint32_t port = (uint32_t)jni_port;
600 if (!port) {
601 aws_jni_throw_runtime_exception(
602 env,
603 "MqttClientConnection.mqtt_new: Endpoint should be in the format hostname:port and port must not be 0");
604 goto cleanup;
605 }
606
607 struct mqtt_jni_async_callback *connect_callback = s_mqtt_jni_async_callback_new(connection, NULL, env);
608 if (connect_callback == NULL) {
609 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Failed to create async callback");
610 goto cleanup;
611 }
612
613 s_mqtt_jni_connection_acquire(connection);
614
615 struct aws_socket_options default_socket_options;
616 AWS_ZERO_STRUCT(default_socket_options);
617 default_socket_options.type = AWS_SOCKET_STREAM;
618 default_socket_options.connect_timeout_ms = 3000;
619 struct aws_socket_options *socket_options = &default_socket_options;
620 if (jni_socket_options) {
621 socket_options = (struct aws_socket_options *)jni_socket_options;
622 }
623 memcpy(&connection->socket_options, socket_options, sizeof(struct aws_socket_options));
624
625 /* if a tls_ctx was provided, initialize tls options */
626 struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_ctx;
627 struct aws_tls_connection_options *tls_options = NULL;
628 if (tls_ctx) {
629 tls_options = &connection->tls_options;
630 aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx);
631 aws_tls_connection_options_set_server_name(tls_options, aws_jni_get_allocator(), &endpoint);
632 }
633
634 client_id = aws_jni_byte_cursor_from_jstring_acquire(env, jni_client_id);
635 bool clean_session = jni_clean_session != 0;
636
637 struct aws_mqtt_connection_options connect_options;
638 AWS_ZERO_STRUCT(connect_options);
639 connect_options.host_name = endpoint;
640 connect_options.port = port;
641 connect_options.socket_options = &connection->socket_options;
642 connect_options.tls_options = tls_options;
643 connect_options.client_id = client_id;
644 connect_options.keep_alive_time_secs = (uint16_t)keep_alive_secs;
645 connect_options.ping_timeout_ms = ping_timeout_ms;
646 connect_options.protocol_operation_timeout_ms = protocol_operation_timeout_ms;
647 connect_options.clean_session = clean_session;
648 connect_options.on_connection_complete = s_on_connection_complete;
649 connect_options.user_data = connect_callback;
650
651 int result = aws_mqtt_client_connection_connect(connection->client_connection, &connect_options);
652 if (result != AWS_OP_SUCCESS) {
653 s_mqtt_jni_connection_release(connection);
654 s_mqtt_jni_async_callback_destroy(connect_callback, env);
655 aws_jni_throw_runtime_exception(
656 env, "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_connect failed");
657 }
658
659 cleanup:
660 aws_jni_byte_cursor_from_jstring_release(env, jni_endpoint, endpoint);
661 aws_jni_byte_cursor_from_jstring_release(env, jni_client_id, client_id);
662 }
663
664 /*******************************************************************************
665 * disconnect
666 ******************************************************************************/
667 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(JNIEnv * env,jclass jni_class,jlong jni_connection,jobject jni_ack)668 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(
669 JNIEnv *env,
670 jclass jni_class,
671 jlong jni_connection,
672 jobject jni_ack) {
673 (void)jni_class;
674 aws_cache_jni_ids(env);
675
676 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
677 if (!connection) {
678 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Invalid connection");
679 return;
680 }
681
682 struct mqtt_jni_async_callback *disconnect_callback = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
683 if (disconnect_callback == NULL) {
684 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Failed to create async callback");
685 return;
686 }
687
688 if (aws_mqtt_client_connection_disconnect(
689 connection->client_connection, s_on_connection_disconnected, disconnect_callback) != AWS_OP_SUCCESS) {
690 int error = aws_last_error();
691 /*
692 * Disconnect invoked on a disconnected connection can happen under normal circumstances. Invoke the callback
693 * manually since it won't get invoked otherwise.
694 */
695 AWS_LOGF_WARN(
696 AWS_LS_MQTT_CLIENT,
697 "MqttClientConnection.mqtt_disconnect: error calling disconnect - %d(%s)",
698 error,
699 aws_error_str(error));
700 s_on_connection_disconnected(connection->client_connection, disconnect_callback);
701 }
702 }
703
704 /*******************************************************************************
705 * subscribe
706 ******************************************************************************/
707 /* called from any sub, unsub, or pub ack */
s_deliver_ack_success(struct mqtt_jni_async_callback * callback,JNIEnv * env)708 static void s_deliver_ack_success(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
709 AWS_FATAL_ASSERT(callback);
710 AWS_FATAL_ASSERT(callback->connection);
711
712 if (callback->async_callback) {
713 (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_success);
714 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
715 }
716 }
717
s_deliver_ack_failure(struct mqtt_jni_async_callback * callback,int error_code,JNIEnv * env)718 static void s_deliver_ack_failure(struct mqtt_jni_async_callback *callback, int error_code, JNIEnv *env) {
719 AWS_FATAL_ASSERT(callback);
720 AWS_FATAL_ASSERT(callback->connection);
721 AWS_FATAL_ASSERT(env);
722
723 if (callback->async_callback) {
724 jobject jni_reason = s_new_mqtt_exception(env, error_code);
725 (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_failure, jni_reason);
726 (*env)->DeleteLocalRef(env, jni_reason);
727 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
728 }
729 }
730
s_on_op_complete(struct aws_mqtt_client_connection * connection,uint16_t packet_id,int error_code,void * user_data)731 static void s_on_op_complete(
732 struct aws_mqtt_client_connection *connection,
733 uint16_t packet_id,
734 int error_code,
735 void *user_data) {
736 AWS_FATAL_ASSERT(connection);
737 (void)packet_id;
738
739 struct mqtt_jni_async_callback *callback = user_data;
740 if (!callback) {
741 return;
742 }
743
744 /********** JNI ENV ACQUIRE **********/
745 JavaVM *jvm = callback->connection->jvm;
746 JNIEnv *env = aws_jni_acquire_thread_env(jvm);
747 if (env == NULL) {
748 return;
749 }
750
751 if (error_code) {
752 s_deliver_ack_failure(callback, error_code, env);
753 } else {
754 s_deliver_ack_success(callback, env);
755 }
756
757 s_mqtt_jni_async_callback_destroy(callback, env);
758
759 aws_jni_release_thread_env(jvm, env);
760 /********** JNI ENV RELEASE **********/
761 }
762
s_is_qos_successful(enum aws_mqtt_qos qos)763 static bool s_is_qos_successful(enum aws_mqtt_qos qos) {
764 return qos < 128;
765 }
766
s_on_ack(struct aws_mqtt_client_connection * connection,uint16_t packet_id,const struct aws_byte_cursor * topic,enum aws_mqtt_qos qos,int error_code,void * user_data)767 static void s_on_ack(
768 struct aws_mqtt_client_connection *connection,
769 uint16_t packet_id,
770 const struct aws_byte_cursor *topic,
771 enum aws_mqtt_qos qos,
772 int error_code,
773 void *user_data) {
774 (void)topic;
775
776 // Handle a case when the server processed SUBSCRIBE request successfully, but rejected a subscription for some
777 // reason, i.e. error_code is 0 and qos is 0x80.
778 // This mostly applies to mqtt5to3adapter, as MQTT3 client will be disconnected on unsuccessful subscribe.
779 if (error_code == 0 && !s_is_qos_successful(qos)) {
780 error_code = AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE;
781 }
782
783 s_on_op_complete(connection, packet_id, error_code, user_data);
784 }
785
s_cleanup_handler(void * user_data)786 static void s_cleanup_handler(void *user_data) {
787 struct mqtt_jni_async_callback *handler = user_data;
788
789 /********** JNI ENV ACQUIRE **********/
790 JavaVM *jvm = handler->connection->jvm;
791 JNIEnv *env = aws_jni_acquire_thread_env(jvm);
792 if (env == NULL) {
793 return;
794 }
795
796 s_mqtt_jni_async_callback_destroy(handler, env);
797
798 aws_jni_release_thread_env(jvm, env);
799 /********** JNI ENV RELEASE **********/
800 }
801
s_on_subscription_delivered(struct aws_mqtt_client_connection * connection,const struct aws_byte_cursor * topic,const struct aws_byte_cursor * payload,bool dup,enum aws_mqtt_qos qos,bool retain,void * user_data)802 static void s_on_subscription_delivered(
803 struct aws_mqtt_client_connection *connection,
804 const struct aws_byte_cursor *topic,
805 const struct aws_byte_cursor *payload,
806 bool dup,
807 enum aws_mqtt_qos qos,
808 bool retain,
809 void *user_data) {
810
811 AWS_FATAL_ASSERT(connection);
812 AWS_FATAL_ASSERT(topic);
813 AWS_FATAL_ASSERT(payload);
814 AWS_FATAL_ASSERT(user_data);
815
816 struct mqtt_jni_async_callback *callback = user_data;
817 if (!callback->async_callback) {
818 return;
819 }
820
821 /********** JNI ENV ACQUIRE **********/
822 JNIEnv *env = aws_jni_acquire_thread_env(callback->connection->jvm);
823 if (env == NULL) {
824 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
825 return;
826 }
827
828 jbyteArray jni_payload = (*env)->NewByteArray(env, (jsize)payload->len);
829 (*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)payload->len, (const signed char *)payload->ptr);
830
831 jstring jni_topic = aws_jni_string_from_cursor(env, topic);
832
833 (*env)->CallVoidMethod(
834 env, callback->async_callback, message_handler_properties.deliver, jni_topic, jni_payload, dup, qos, retain);
835
836 (*env)->DeleteLocalRef(env, jni_payload);
837 (*env)->DeleteLocalRef(env, jni_topic);
838
839 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
840
841 aws_jni_release_thread_env(callback->connection->jvm, env);
842 /********** JNI ENV RELEASE **********/
843 }
844
845 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jobject jni_handler,jobject jni_ack)846 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(
847 JNIEnv *env,
848 jclass jni_class,
849 jlong jni_connection,
850 jstring jni_topic,
851 jint jni_qos,
852 jobject jni_handler,
853 jobject jni_ack) {
854 (void)jni_class;
855 aws_cache_jni_ids(env);
856
857 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
858 if (!connection) {
859 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Invalid connection");
860 return 0;
861 }
862
863 struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
864 if (!handler) {
865 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate handler");
866 return 0;
867 }
868
869 /* from here, any failure requires error_cleanup */
870 struct mqtt_jni_async_callback *sub_ack = NULL;
871 if (jni_ack) {
872 sub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
873 if (!sub_ack) {
874 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate sub ack");
875 goto error_cleanup;
876 }
877 }
878
879 struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
880 enum aws_mqtt_qos qos = jni_qos;
881
882 uint16_t msg_id = aws_mqtt_client_connection_subscribe(
883 connection->client_connection,
884 &topic,
885 qos,
886 s_on_subscription_delivered,
887 handler,
888 s_cleanup_handler,
889 s_on_ack,
890 sub_ack);
891 aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
892 if (msg_id == 0) {
893 aws_jni_throw_runtime_exception(
894 env, "MqttClientConnection.mqtt_subscribe: aws_mqtt_client_connection_subscribe failed");
895 goto error_cleanup;
896 }
897
898 return msg_id;
899
900 error_cleanup:
901 if (handler) {
902 s_mqtt_jni_async_callback_destroy(handler, env);
903 }
904
905 if (sub_ack) {
906 s_mqtt_jni_async_callback_destroy(sub_ack, env);
907 }
908
909 return 0;
910 }
911
912 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(JNIEnv * env,jclass jni_class,jlong jni_connection,jobject jni_handler)913 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(
914 JNIEnv *env,
915 jclass jni_class,
916 jlong jni_connection,
917 jobject jni_handler) {
918 (void)jni_class;
919 aws_cache_jni_ids(env);
920
921 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
922 if (!connection) {
923 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid connection");
924 return;
925 }
926
927 if (!jni_handler) {
928 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid handler");
929 return;
930 }
931
932 struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
933 if (!handler) {
934 aws_jni_throw_runtime_exception(
935 env, "MqttClientConnection.mqttClientConnectionOnMessage: Unable to allocate handler");
936 return;
937 }
938
939 if (aws_mqtt_client_connection_set_on_any_publish_handler(
940 connection->client_connection, s_on_subscription_delivered, handler)) {
941 aws_jni_throw_runtime_exception(
942 env, "MqttClientConnection.mqttClientConnectionOnMessage: Failed to install on_any_publish_handler");
943 goto error_cleanup;
944 }
945
946 if (connection->on_message) {
947 s_mqtt_jni_async_callback_destroy(connection->on_message, env);
948 }
949
950 connection->on_message = handler;
951
952 return;
953
954 error_cleanup:
955 if (handler) {
956 s_mqtt_jni_async_callback_destroy(handler, env);
957 }
958 }
959
960 /*******************************************************************************
961 * unsubscribe
962 ******************************************************************************/
963 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jobject jni_ack)964 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(
965 JNIEnv *env,
966 jclass jni_class,
967 jlong jni_connection,
968 jstring jni_topic,
969 jobject jni_ack) {
970 (void)jni_class;
971 aws_cache_jni_ids(env);
972
973 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
974 if (!connection) {
975 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Invalid connection");
976 return 0;
977 }
978
979 struct mqtt_jni_async_callback *unsub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
980 if (!unsub_ack) {
981 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Unable to allocate unsub ack");
982 goto error_cleanup;
983 }
984
985 struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
986 uint16_t msg_id =
987 aws_mqtt_client_connection_unsubscribe(connection->client_connection, &topic, s_on_op_complete, unsub_ack);
988 aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
989 if (msg_id == 0) {
990 aws_jni_throw_runtime_exception(
991 env, "MqttClientConnection.mqtt_unsubscribe: aws_mqtt_client_connection_unsubscribe failed");
992 goto error_cleanup;
993 }
994
995 return msg_id;
996
997 error_cleanup:
998 if (unsub_ack) {
999 s_mqtt_jni_async_callback_destroy(unsub_ack, env);
1000 }
1001 return 0;
1002 }
1003
1004 /*******************************************************************************
1005 * publish
1006 ******************************************************************************/
1007 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jboolean jni_retain,jbyteArray jni_payload,jobject jni_ack)1008 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(
1009 JNIEnv *env,
1010 jclass jni_class,
1011 jlong jni_connection,
1012 jstring jni_topic,
1013 jint jni_qos,
1014 jboolean jni_retain,
1015 jbyteArray jni_payload,
1016 jobject jni_ack) {
1017 (void)jni_class;
1018 aws_cache_jni_ids(env);
1019
1020 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1021 if (!connection) {
1022 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid connection");
1023 return 0;
1024 }
1025
1026 if (!jni_topic) {
1027 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid/null topic");
1028 return 0;
1029 }
1030
1031 struct mqtt_jni_async_callback *pub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
1032 if (!pub_ack) {
1033 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Unable to allocate pub ack");
1034 goto error_cleanup;
1035 }
1036
1037 struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
1038
1039 struct aws_byte_cursor payload;
1040 AWS_ZERO_STRUCT(payload);
1041 if (jni_payload != NULL) {
1042 payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
1043 }
1044
1045 enum aws_mqtt_qos qos = jni_qos;
1046 bool retain = jni_retain != 0;
1047
1048 uint16_t msg_id = aws_mqtt_client_connection_publish(
1049 connection->client_connection, &topic, qos, retain, &payload, s_on_op_complete, pub_ack);
1050 aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
1051
1052 if (jni_payload != NULL) {
1053 aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
1054 }
1055
1056 if (msg_id == 0) {
1057 aws_jni_throw_runtime_exception(
1058 env, "MqttClientConnection.mqtt_publish: aws_mqtt_client_connection_publish failed");
1059 goto error_cleanup;
1060 }
1061
1062 return msg_id;
1063
1064 error_cleanup:
1065 if (pub_ack) {
1066 s_mqtt_jni_async_callback_destroy(pub_ack, env);
1067 }
1068
1069 return 0;
1070 }
1071
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jboolean jni_retain,jbyteArray jni_payload)1072 JNIEXPORT jboolean JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(
1073 JNIEnv *env,
1074 jclass jni_class,
1075 jlong jni_connection,
1076 jstring jni_topic,
1077 jint jni_qos,
1078 jboolean jni_retain,
1079 jbyteArray jni_payload) {
1080 (void)jni_class;
1081 aws_cache_jni_ids(env);
1082
1083 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1084 if (!connection) {
1085 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Invalid connection");
1086 return false;
1087 }
1088
1089 if (jni_topic == NULL) {
1090 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Topic must be non-null");
1091 return false;
1092 }
1093 struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
1094
1095 struct aws_byte_cursor payload;
1096 AWS_ZERO_STRUCT(payload);
1097 if (jni_payload != NULL) {
1098 payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
1099 }
1100
1101 enum aws_mqtt_qos qos = jni_qos;
1102 bool retain = jni_retain != 0;
1103
1104 int result = aws_mqtt_client_connection_set_will(connection->client_connection, &topic, qos, retain, &payload);
1105 aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
1106
1107 if (jni_payload != NULL) {
1108 aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
1109 }
1110
1111 return (result == AWS_OP_SUCCESS);
1112 }
1113
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_user,jstring jni_pass)1114 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(
1115 JNIEnv *env,
1116 jclass jni_class,
1117 jlong jni_connection,
1118 jstring jni_user,
1119 jstring jni_pass) {
1120 (void)jni_class;
1121 aws_cache_jni_ids(env);
1122
1123 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1124 if (!connection) {
1125 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Invalid connection");
1126 return;
1127 }
1128
1129 struct aws_byte_cursor username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_user);
1130 struct aws_byte_cursor password;
1131 struct aws_byte_cursor *password_ptr = NULL;
1132 AWS_ZERO_STRUCT(password);
1133 if (jni_pass != NULL) {
1134 password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_pass);
1135 password_ptr = &password;
1136 }
1137
1138 if (aws_mqtt_client_connection_set_login(connection->client_connection, &username, password_ptr)) {
1139 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Failed to set login");
1140 }
1141
1142 aws_jni_byte_cursor_from_jstring_release(env, jni_user, username);
1143
1144 if (password.len > 0) {
1145 aws_jni_byte_cursor_from_jstring_release(env, jni_pass, password);
1146 }
1147 }
1148
1149 JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(JNIEnv * env,jclass jni_class,jlong jni_connection,jlong jni_min_timeout,jlong jni_max_timeout)1150 Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(
1151 JNIEnv *env,
1152 jclass jni_class,
1153 jlong jni_connection,
1154 jlong jni_min_timeout,
1155 jlong jni_max_timeout) {
1156 (void)jni_class;
1157 aws_cache_jni_ids(env);
1158
1159 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1160 if (!connection) {
1161 aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_reconnect_timeout: Invalid connection");
1162 return;
1163 }
1164
1165 if (aws_mqtt_client_connection_set_reconnect_timeout(
1166 connection->client_connection, jni_min_timeout, jni_max_timeout)) {
1167 aws_jni_throw_runtime_exception(
1168 env, "MqttClientConnection.mqtt_reconnect_timeout: Failed to set reconnect timeout");
1169 }
1170 }
1171
1172 ///////
s_ws_handshake_destroy(struct mqtt_jni_ws_handshake * ws_handshake)1173 static void s_ws_handshake_destroy(struct mqtt_jni_ws_handshake *ws_handshake) {
1174 if (!ws_handshake) {
1175 return;
1176 }
1177
1178 s_mqtt_jni_connection_release(ws_handshake->connection);
1179 aws_mem_release(aws_jni_get_allocator(), ws_handshake);
1180 }
1181
s_ws_handshake_transform(struct aws_http_message * request,void * user_data,aws_mqtt_transform_websocket_handshake_complete_fn * complete_fn,void * complete_ctx)1182 static void s_ws_handshake_transform(
1183 struct aws_http_message *request,
1184 void *user_data,
1185 aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
1186 void *complete_ctx) {
1187
1188 struct mqtt_jni_connection *connection = user_data;
1189
1190 /********** JNI ENV ACQUIRE **********/
1191 JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
1192 if (env == NULL) {
1193 /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
1194 complete_fn(request, AWS_ERROR_INVALID_STATE, complete_ctx);
1195 return;
1196 }
1197
1198 struct aws_allocator *alloc = aws_jni_get_allocator();
1199
1200 struct mqtt_jni_ws_handshake *ws_handshake = aws_mem_calloc(alloc, 1, sizeof(struct mqtt_jni_ws_handshake));
1201 if (!ws_handshake) {
1202 goto error;
1203 }
1204
1205 ws_handshake->connection = connection;
1206 s_mqtt_jni_connection_acquire(ws_handshake->connection);
1207
1208 ws_handshake->complete_ctx = complete_ctx;
1209 ws_handshake->complete_fn = complete_fn;
1210 ws_handshake->http_request = request;
1211
1212 jobject java_http_request = aws_java_http_request_from_native(env, request, NULL);
1213 if (!java_http_request) {
1214 aws_raise_error(AWS_ERROR_UNKNOWN); /* TODO: given java exception, choose appropriate aws error code */
1215 goto error;
1216 }
1217
1218 jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
1219 if (mqtt_connection != NULL) {
1220 (*env)->CallVoidMethod(
1221 env, mqtt_connection, mqtt_connection_properties.on_websocket_handshake, java_http_request, ws_handshake);
1222
1223 (*env)->DeleteLocalRef(env, mqtt_connection);
1224
1225 AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
1226 }
1227
1228 (*env)->DeleteLocalRef(env, java_http_request);
1229 aws_jni_release_thread_env(connection->jvm, env);
1230 /********** JNI ENV RELEASE SUCCESS PATH **********/
1231
1232 return;
1233
1234 error:;
1235
1236 int error_code = aws_last_error();
1237 s_ws_handshake_destroy(ws_handshake);
1238 complete_fn(request, error_code, complete_ctx);
1239 aws_jni_release_thread_env(connection->jvm, env);
1240 /********** JNI ENV RELEASE FAILURE PATH **********/
1241 }
1242
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(JNIEnv * env,jclass jni_class,jlong jni_connection)1243 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(
1244 JNIEnv *env,
1245 jclass jni_class,
1246 jlong jni_connection) {
1247 (void)jni_class;
1248 aws_cache_jni_ids(env);
1249
1250 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1251 if (!connection) {
1252 aws_raise_error(AWS_ERROR_INVALID_STATE);
1253 aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Invalid connection");
1254 return;
1255 }
1256
1257 if (aws_mqtt_client_connection_use_websockets(
1258 connection->client_connection, s_ws_handshake_transform, connection, NULL, NULL)) {
1259 aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Failed to use websockets");
1260 return;
1261 }
1262 }
1263
1264 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(JNIEnv * env,jclass jni_class,jlong jni_connection,jbyteArray jni_marshalled_request,jobject jni_throwable,jlong jni_user_data)1265 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(
1266 JNIEnv *env,
1267 jclass jni_class,
1268 jlong jni_connection,
1269 jbyteArray jni_marshalled_request,
1270 jobject jni_throwable,
1271 jlong jni_user_data) {
1272 (void)jni_class;
1273 (void)jni_connection;
1274 aws_cache_jni_ids(env);
1275
1276 struct mqtt_jni_ws_handshake *ws_handshake = (void *)jni_user_data;
1277 int error_code = AWS_ERROR_SUCCESS;
1278
1279 if (jni_throwable != NULL) {
1280 if ((*env)->IsInstanceOf(env, jni_throwable, crt_runtime_exception_properties.crt_runtime_exception_class)) {
1281 error_code = (*env)->GetIntField(env, jni_throwable, crt_runtime_exception_properties.error_code_field_id);
1282 }
1283
1284 if (error_code == AWS_ERROR_SUCCESS) {
1285 error_code = AWS_ERROR_UNKNOWN; /* is there anything more that could be done here? */
1286 }
1287
1288 goto done;
1289 }
1290
1291 if (aws_apply_java_http_request_changes_to_native_request(
1292 env, jni_marshalled_request, NULL, ws_handshake->http_request)) {
1293 error_code = aws_last_error();
1294 goto done;
1295 }
1296
1297 done:
1298 ws_handshake->complete_fn(ws_handshake->http_request, error_code, ws_handshake->complete_ctx);
1299 s_ws_handshake_destroy(ws_handshake);
1300 }
1301
1302 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(JNIEnv * env,jclass jni_class,jlong jni_connection,jint jni_proxy_connection_type,jstring jni_proxy_host,jint jni_proxy_port,jlong jni_proxy_tls_context,jint jni_proxy_authorization_type,jstring jni_proxy_authorization_username,jstring jni_proxy_authorization_password)1303 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(
1304 JNIEnv *env,
1305 jclass jni_class,
1306 jlong jni_connection,
1307 jint jni_proxy_connection_type,
1308 jstring jni_proxy_host,
1309 jint jni_proxy_port,
1310 jlong jni_proxy_tls_context,
1311 jint jni_proxy_authorization_type,
1312 jstring jni_proxy_authorization_username,
1313 jstring jni_proxy_authorization_password) {
1314
1315 (void)jni_class;
1316 aws_cache_jni_ids(env);
1317
1318 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1319
1320 struct aws_http_proxy_options proxy_options;
1321 AWS_ZERO_STRUCT(proxy_options);
1322
1323 if (!jni_proxy_host) {
1324 aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: proxyHost must not be null.");
1325 return;
1326 }
1327
1328 proxy_options.connection_type = (enum aws_http_proxy_connection_type)jni_proxy_connection_type;
1329
1330 proxy_options.host = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_host);
1331 proxy_options.port = (uint32_t)jni_proxy_port;
1332
1333 proxy_options.auth_type = (enum aws_http_proxy_authentication_type)jni_proxy_authorization_type;
1334
1335 if (jni_proxy_authorization_username) {
1336 proxy_options.auth_username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_username);
1337 }
1338
1339 if (jni_proxy_authorization_password) {
1340 proxy_options.auth_password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_password);
1341 }
1342
1343 struct aws_tls_connection_options proxy_tls_conn_options;
1344 AWS_ZERO_STRUCT(proxy_tls_conn_options);
1345
1346 if (jni_proxy_tls_context != 0) {
1347 struct aws_tls_ctx *proxy_tls_ctx = (struct aws_tls_ctx *)jni_proxy_tls_context;
1348 aws_tls_connection_options_init_from_ctx(&proxy_tls_conn_options, proxy_tls_ctx);
1349 aws_tls_connection_options_set_server_name(
1350 &proxy_tls_conn_options, aws_jni_get_allocator(), &proxy_options.host);
1351 proxy_options.tls_options = &proxy_tls_conn_options;
1352 }
1353
1354 if (aws_mqtt_client_connection_set_http_proxy_options(connection->client_connection, &proxy_options)) {
1355 aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: Failed to set proxy options");
1356 }
1357
1358 if (jni_proxy_authorization_password) {
1359 aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_password, proxy_options.auth_password);
1360 }
1361
1362 if (jni_proxy_authorization_username) {
1363 aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_username, proxy_options.auth_username);
1364 }
1365
1366 aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_host, proxy_options.host);
1367 aws_tls_connection_options_clean_up(&proxy_tls_conn_options);
1368 }
1369
1370 JNIEXPORT jobject JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(JNIEnv * env,jclass jni_class,jlong jni_connection)1371 Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(
1372 JNIEnv *env,
1373 jclass jni_class,
1374 jlong jni_connection) {
1375 (void)jni_class;
1376 aws_cache_jni_ids(env);
1377
1378 struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1379 if (!connection) {
1380 aws_raise_error(AWS_ERROR_INVALID_STATE);
1381 aws_jni_throw_runtime_exception(env, "MqttClientConnection.getOperationStatistics: Invalid connection");
1382 return NULL;
1383 }
1384
1385 /* Construct Java object */
1386 jobject jni_operation_statistics = (*env)->NewObject(
1387 env,
1388 mqtt_connection_operation_statistics_properties.statistics_class,
1389 mqtt_connection_operation_statistics_properties.statistics_constructor_id);
1390 if (jni_operation_statistics == NULL) {
1391 aws_raise_error(AWS_ERROR_INVALID_STATE);
1392 aws_jni_throw_runtime_exception(
1393 env, "MqttClientConnection.getOperationStatistics: Could not create operation statistics object");
1394 return NULL;
1395 }
1396
1397 struct aws_mqtt_connection_operation_statistics connection_stats;
1398 aws_mqtt_client_connection_get_stats(connection->client_connection, &connection_stats);
1399
1400 (*env)->SetLongField(
1401 env,
1402 jni_operation_statistics,
1403 mqtt_connection_operation_statistics_properties.incomplete_operation_count_field_id,
1404 (jlong)connection_stats.incomplete_operation_count);
1405 if (aws_jni_check_and_clear_exception(env)) {
1406 aws_raise_error(AWS_ERROR_INVALID_STATE);
1407 aws_jni_throw_runtime_exception(
1408 env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation count");
1409 return NULL;
1410 }
1411
1412 (*env)->SetLongField(
1413 env,
1414 jni_operation_statistics,
1415 mqtt_connection_operation_statistics_properties.incomplete_operation_size_field_id,
1416 (jlong)connection_stats.incomplete_operation_size);
1417 if (aws_jni_check_and_clear_exception(env)) {
1418 aws_raise_error(AWS_ERROR_INVALID_STATE);
1419 aws_jni_throw_runtime_exception(
1420 env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation size");
1421 return NULL;
1422 }
1423
1424 (*env)->SetLongField(
1425 env,
1426 jni_operation_statistics,
1427 mqtt_connection_operation_statistics_properties.unacked_operation_count_field_id,
1428 (jlong)connection_stats.unacked_operation_count);
1429 if (aws_jni_check_and_clear_exception(env)) {
1430 aws_raise_error(AWS_ERROR_INVALID_STATE);
1431 aws_jni_throw_runtime_exception(
1432 env, "MqttClientConnection.getOperationStatistics: could not create unacked operation count");
1433 return NULL;
1434 }
1435
1436 (*env)->SetLongField(
1437 env,
1438 jni_operation_statistics,
1439 mqtt_connection_operation_statistics_properties.unacked_operation_size_field_id,
1440 (jlong)connection_stats.unacked_operation_size);
1441 if (aws_jni_check_and_clear_exception(env)) {
1442 aws_raise_error(AWS_ERROR_INVALID_STATE);
1443 aws_jni_throw_runtime_exception(
1444 env, "MqttClientConnection.getOperationStatistics: could not create unacked operation size");
1445 return NULL;
1446 }
1447
1448 return jni_operation_statistics;
1449 }
1450
1451 #if UINTPTR_MAX == 0xffffffff
1452 # if defined(_MSC_VER)
1453 # pragma warning(pop)
1454 # else
1455 # pragma GCC diagnostic pop
1456 # endif
1457 #endif
1458