xref: /aosp_15_r20/external/aws-crt-java/src/native/mqtt_connection.c (revision 3c7ae9de214676c52d19f01067dc1a404272dc11)
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 #include <jni.h>
6 
7 #include <aws/common/atomics.h>
8 #include <aws/common/condition_variable.h>
9 #include <aws/common/logging.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/string.h>
12 #include <aws/common/thread.h>
13 #include <aws/http/connection.h>
14 #include <aws/http/proxy.h>
15 #include <aws/http/request_response.h>
16 #include <aws/io/channel.h>
17 #include <aws/io/channel_bootstrap.h>
18 #include <aws/io/event_loop.h>
19 #include <aws/io/host_resolver.h>
20 #include <aws/io/socket.h>
21 #include <aws/io/socket_channel_handler.h>
22 #include <aws/io/tls_channel_handler.h>
23 #include <aws/mqtt/client.h>
24 
25 #include <ctype.h>
26 #include <string.h>
27 
28 #include "crt.h"
29 
30 #include "http_request_utils.h"
31 #include "java_class_ids.h"
32 #include "mqtt5_client_jni.h"
33 
34 /*******************************************************************************
35  * mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
36  * async ops, and is used to deliver callbacks. Also hangs on to JNI references
37  * to buffers and strings that need to outlive the request
38  ******************************************************************************/
39 struct mqtt_jni_async_callback {
40     struct mqtt_jni_connection *connection;
41     jobject async_callback;
42     struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
43 };
44 
45 /*******************************************************************************
46  * mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
47  ******************************************************************************/
48 struct mqtt_jni_connection {
49     struct aws_mqtt_client *client; /* Provided to mqtt_connect */
50     struct aws_mqtt_client_connection *client_connection;
51     struct aws_socket_options socket_options;
52     struct aws_tls_connection_options tls_options;
53 
54     JavaVM *jvm;
55     jweak java_mqtt_connection; /* MqttClientConnection instance */
56     struct mqtt_jni_async_callback *on_message;
57 
58     struct aws_atomic_var ref_count;
59 };
60 
61 /*******************************************************************************
62  * mqtt_jni_ws_handshake - Data needed to perform the async websocket handshake
63  * transform operations. Destroyed when transform is complete.
64  ******************************************************************************/
65 struct mqtt_jni_ws_handshake {
66     struct mqtt_jni_connection *connection;
67     struct aws_http_message *http_request;
68     aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn;
69     void *complete_ctx;
70 };
71 
72 static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection);
73 
s_mqtt_jni_connection_acquire(struct mqtt_jni_connection * connection)74 static void s_mqtt_jni_connection_acquire(struct mqtt_jni_connection *connection) {
75     size_t old_value = aws_atomic_fetch_add(&connection->ref_count, 1);
76 
77     AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection acquire, ref count now = %d", (int)old_value + 1);
78 }
79 
80 static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data);
81 
s_mqtt_jni_connection_release(struct mqtt_jni_connection * connection)82 static void s_mqtt_jni_connection_release(struct mqtt_jni_connection *connection) {
83     size_t old_value = aws_atomic_fetch_sub(&connection->ref_count, 1);
84 
85     AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection release, ref count now = %d", (int)old_value - 1);
86 }
87 
88 /* The destroy function is called on Java MqttClientConnection resource release. */
s_mqtt_jni_connection_destroy(struct mqtt_jni_connection * connection)89 static void s_mqtt_jni_connection_destroy(struct mqtt_jni_connection *connection) {
90     /* For mqtt311 client, we have to call aws_mqtt_client_connection_disconnect before releasing the underlying c
91      * connection.*/
92     if (aws_mqtt_client_connection_disconnect(
93             connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) {
94 
95         /*
96          * This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown
97          * time. Log it (in case it was unexpected) and then shutdown the underlying connection manually.
98          */
99         AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Client disconnect failed. Release the client connection.");
100         s_on_shutdown_disconnect_complete(connection->client_connection, NULL);
101     }
102 }
103 
s_mqtt_jni_async_callback_new(struct mqtt_jni_connection * connection,jobject async_callback,JNIEnv * env)104 static struct mqtt_jni_async_callback *s_mqtt_jni_async_callback_new(
105     struct mqtt_jni_connection *connection,
106     jobject async_callback,
107     JNIEnv *env) {
108 
109     if (env == NULL) {
110         return NULL;
111     }
112 
113     struct aws_allocator *allocator = aws_jni_get_allocator();
114     /* allocate cannot fail */
115     struct mqtt_jni_async_callback *callback = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_async_callback));
116     callback->connection = connection;
117     callback->async_callback = async_callback ? (*env)->NewGlobalRef(env, async_callback) : NULL;
118     aws_byte_buf_init(&callback->buffer, aws_jni_get_allocator(), 0);
119 
120     return callback;
121 }
122 
s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback * callback,JNIEnv * env)123 static void s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
124     AWS_FATAL_ASSERT(callback && callback->connection);
125 
126     if (env == NULL) {
127         return;
128     }
129 
130     if (callback->async_callback) {
131         (*env)->DeleteGlobalRef(env, callback->async_callback);
132     }
133 
134     aws_byte_buf_clean_up(&callback->buffer);
135 
136     struct aws_allocator *allocator = aws_jni_get_allocator();
137     aws_mem_release(allocator, callback);
138 }
139 
s_new_mqtt_exception(JNIEnv * env,int error_code)140 static jobject s_new_mqtt_exception(JNIEnv *env, int error_code) {
141     jobject exception = (*env)->NewObject(
142         env, mqtt_exception_properties.jni_mqtt_exception, mqtt_exception_properties.jni_constructor, error_code);
143     return exception;
144 }
145 
146 /* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
147 #if UINTPTR_MAX == 0xffffffff
148 #    if defined(_MSC_VER)
149 #        pragma warning(push)
150 #        pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
151 #    else
152 #        pragma GCC diagnostic push
153 #        pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
154 #        pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
155 #    endif
156 #endif
157 
158 /*******************************************************************************
159  * new
160  ******************************************************************************/
161 static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data);
s_on_connection_complete(struct aws_mqtt_client_connection * client_connection,int error_code,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)162 static void s_on_connection_complete(
163     struct aws_mqtt_client_connection *client_connection,
164     int error_code,
165     enum aws_mqtt_connect_return_code return_code,
166     bool session_present,
167     void *user_data) {
168     (void)client_connection;
169     (void)return_code;
170 
171     struct mqtt_jni_async_callback *connect_callback = user_data;
172     struct mqtt_jni_connection *connection = connect_callback->connection;
173 
174     /********** JNI ENV ACQUIRE **********/
175     JavaVM *jvm = connection->jvm;
176     JNIEnv *env = aws_jni_acquire_thread_env(jvm);
177     if (env == NULL) {
178         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
179         return;
180     }
181 
182     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
183     if (mqtt_connection != NULL) {
184         (*env)->CallVoidMethod(
185             env, mqtt_connection, mqtt_connection_properties.on_connection_complete, error_code, session_present);
186 
187         (*env)->DeleteLocalRef(env, mqtt_connection);
188 
189         if (aws_jni_check_and_clear_exception(env)) {
190             aws_jni_release_thread_env(connection->jvm, env);
191             /********** JNI ENV RELEASE EARLY OUT **********/
192             aws_mqtt_client_connection_disconnect(client_connection, s_on_connection_disconnected, connect_callback);
193             return; /* callback and ref count will be cleaned up in s_on_connection_disconnected */
194         }
195     }
196 
197     s_mqtt_jni_async_callback_destroy(connect_callback, env);
198 
199     aws_jni_release_thread_env(jvm, env);
200     /********** JNI ENV RELEASE **********/
201     s_mqtt_jni_connection_release(connection);
202 }
203 
s_on_connection_interrupted_internal(struct mqtt_jni_connection * connection,int error_code,jobject ack_callback,JNIEnv * env)204 static void s_on_connection_interrupted_internal(
205     struct mqtt_jni_connection *connection,
206     int error_code,
207     jobject ack_callback,
208     JNIEnv *env) {
209 
210     AWS_FATAL_ASSERT(env);
211 
212     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
213     if (mqtt_connection) {
214         (*env)->CallVoidMethod(
215             env, mqtt_connection, mqtt_connection_properties.on_connection_interrupted, error_code, ack_callback);
216 
217         (*env)->DeleteLocalRef(env, mqtt_connection);
218 
219         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
220     }
221 }
222 
s_on_connection_interrupted(struct aws_mqtt_client_connection * client_connection,int error_code,void * user_data)223 static void s_on_connection_interrupted(
224     struct aws_mqtt_client_connection *client_connection,
225     int error_code,
226     void *user_data) {
227     (void)client_connection;
228 
229     struct mqtt_jni_connection *connection = user_data;
230 
231     /********** JNI ENV ACQUIRE **********/
232     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
233     if (env == NULL) {
234         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
235         return;
236     }
237 
238     s_on_connection_interrupted_internal(user_data, error_code, NULL, env);
239 
240     aws_jni_release_thread_env(connection->jvm, env);
241     /********** JNI ENV RELEASE **********/
242 }
243 
s_on_connection_success(struct aws_mqtt_client_connection * client_connection,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)244 static void s_on_connection_success(
245     struct aws_mqtt_client_connection *client_connection,
246     enum aws_mqtt_connect_return_code return_code,
247     bool session_present,
248     void *user_data) {
249     (void)client_connection;
250     (void)return_code;
251 
252     struct mqtt_jni_connection *connection = user_data;
253 
254     /********** JNI ENV ACQUIRE **********/
255     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
256     if (env == NULL) {
257         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
258         return;
259     }
260     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
261     if (mqtt_connection) {
262 
263         (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_success, session_present);
264 
265         (*env)->DeleteLocalRef(env, mqtt_connection);
266 
267         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
268     }
269     aws_jni_release_thread_env(connection->jvm, env);
270     /********** JNI ENV RELEASE **********/
271 }
272 
s_on_connection_failure(struct aws_mqtt_client_connection * client_connection,int error_code,void * user_data)273 static void s_on_connection_failure(
274     struct aws_mqtt_client_connection *client_connection,
275     int error_code,
276     void *user_data) {
277     (void)client_connection;
278 
279     struct mqtt_jni_connection *connection = user_data;
280 
281     /********** JNI ENV ACQUIRE **********/
282     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
283     if (env == NULL) {
284         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
285         return;
286     }
287     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
288     if (mqtt_connection) {
289         (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_failure, error_code);
290 
291         (*env)->DeleteLocalRef(env, mqtt_connection);
292 
293         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
294     }
295     aws_jni_release_thread_env(connection->jvm, env);
296     /********** JNI ENV RELEASE **********/
297 }
298 
s_on_connection_resumed(struct aws_mqtt_client_connection * client_connection,enum aws_mqtt_connect_return_code return_code,bool session_present,void * user_data)299 static void s_on_connection_resumed(
300     struct aws_mqtt_client_connection *client_connection,
301     enum aws_mqtt_connect_return_code return_code,
302     bool session_present,
303     void *user_data) {
304     (void)client_connection;
305     (void)return_code;
306 
307     struct mqtt_jni_connection *connection = user_data;
308 
309     /********** JNI ENV ACQUIRE **********/
310     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
311     if (env == NULL) {
312         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
313         return;
314     }
315 
316     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
317     if (mqtt_connection) {
318 
319         (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_resumed, session_present);
320 
321         (*env)->DeleteLocalRef(env, mqtt_connection);
322 
323         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
324     }
325 
326     aws_jni_release_thread_env(connection->jvm, env);
327     /********** JNI ENV RELEASE **********/
328 }
329 
s_on_connection_disconnected(struct aws_mqtt_client_connection * client_connection,void * user_data)330 static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data) {
331     (void)client_connection;
332 
333     struct mqtt_jni_async_callback *connect_callback = user_data;
334     struct mqtt_jni_connection *jni_connection = connect_callback->connection;
335 
336     /********** JNI ENV ACQUIRE **********/
337     JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
338     if (env == NULL) {
339         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
340         return;
341     }
342 
343     s_on_connection_interrupted_internal(connect_callback->connection, 0, connect_callback->async_callback, env);
344 
345     s_mqtt_jni_async_callback_destroy(connect_callback, env);
346 
347     AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
348 
349     aws_jni_release_thread_env(jni_connection->jvm, env);
350     /********** JNI ENV RELEASE **********/
351 
352     /* Do not call release here: s_on_connection_closed will (normally) be called
353      * right after and so we can call the release there instead. */
354 }
355 
s_on_connection_closed(struct aws_mqtt_client_connection * client_connection,struct on_connection_closed_data * data,void * user_data)356 static void s_on_connection_closed(
357     struct aws_mqtt_client_connection *client_connection,
358     struct on_connection_closed_data *data,
359     void *user_data) {
360     (void)client_connection;
361     (void)data;
362 
363     struct mqtt_jni_connection *connection = user_data;
364 
365     /********** JNI ENV ACQUIRE **********/
366     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
367     if (env == NULL) {
368         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
369         return;
370     }
371 
372     // Make sure the Java object has not been garbage collected
373     if (!(*env)->IsSameObject(env, connection->java_mqtt_connection, NULL)) {
374         jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
375         if (mqtt_connection) {
376             (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_closed);
377             (*env)->DeleteLocalRef(env, mqtt_connection);
378             AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
379         }
380     }
381     aws_jni_release_thread_env(connection->jvm, env);
382     /********** JNI ENV RELEASE **********/
383 }
384 
s_on_connection_terminated(void * user_data)385 static void s_on_connection_terminated(void *user_data) {
386 
387     struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data;
388 
389     /********** JNI ENV ACQUIRE **********/
390     JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
391     if (env == NULL) {
392         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
393         return;
394     }
395 
396     jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection);
397     if (mqtt_connection != NULL) {
398         (*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references);
399 
400         (*env)->DeleteLocalRef(env, mqtt_connection);
401 
402         aws_jni_check_and_clear_exception(env);
403     }
404 
405     JavaVM *jvm = jni_connection->jvm;
406 
407     s_mqtt_connection_destroy(env, jni_connection);
408     aws_jni_release_thread_env(jvm, env);
409     /********** JNI ENV RELEASE **********/
410 }
411 
s_mqtt_connection_new(JNIEnv * env,struct aws_mqtt_client * client3,struct aws_mqtt5_client_java_jni * client5_jni,jobject java_mqtt_connection)412 static struct mqtt_jni_connection *s_mqtt_connection_new(
413     JNIEnv *env,
414     struct aws_mqtt_client *client3,
415     struct aws_mqtt5_client_java_jni *client5_jni,
416     jobject java_mqtt_connection) {
417     struct aws_allocator *allocator = aws_jni_get_allocator();
418 
419     struct mqtt_jni_connection *connection = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_connection));
420     if (!connection) {
421         aws_jni_throw_runtime_exception(
422             env, "MqttClientConnection.mqtt_connect: Out of memory allocating JNI connection");
423         return NULL;
424     }
425 
426     aws_atomic_store_int(&connection->ref_count, 1);
427     connection->java_mqtt_connection = (*env)->NewWeakGlobalRef(env, java_mqtt_connection);
428     jint jvmresult = (*env)->GetJavaVM(env, &connection->jvm);
429     AWS_FATAL_ASSERT(jvmresult == 0);
430 
431     if (client3 != NULL) {
432         connection->client = client3;
433         connection->client_connection = aws_mqtt_client_connection_new(client3);
434     } else if (client5_jni != NULL) {
435         connection->client_connection = aws_mqtt_client_connection_new_from_mqtt5_client(client5_jni->client);
436     }
437 
438     if (!connection->client_connection) {
439         aws_jni_throw_runtime_exception(
440             env,
441             "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to create new "
442             "connection");
443         goto on_error;
444     }
445 
446     if (aws_mqtt_client_connection_set_connection_termination_handler(
447             connection->client_connection, s_on_connection_terminated, connection)) {
448         aws_jni_throw_runtime_exception(
449             env,
450             "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to set termination "
451             "callback");
452         goto on_error;
453     }
454 
455     return connection;
456 
457 on_error:
458 
459     s_mqtt_jni_connection_release(connection);
460 
461     return NULL;
462 }
463 
s_mqtt_connection_destroy(JNIEnv * env,struct mqtt_jni_connection * connection)464 static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection) {
465     if (connection == NULL) {
466         return;
467     }
468 
469     if (connection->on_message) {
470         s_mqtt_jni_async_callback_destroy(connection->on_message, env);
471     }
472 
473     if (connection->java_mqtt_connection) {
474         (*env)->DeleteWeakGlobalRef(env, connection->java_mqtt_connection);
475     }
476 
477     aws_tls_connection_options_clean_up(&connection->tls_options);
478 
479     struct aws_allocator *allocator = aws_jni_get_allocator();
480     aws_mem_release(allocator, connection);
481 }
482 
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(JNIEnv * env,jclass jni_class,jlong jni_client,jobject jni_mqtt_connection)483 JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(
484     JNIEnv *env,
485     jclass jni_class,
486     jlong jni_client,
487     jobject jni_mqtt_connection) {
488     (void)jni_class;
489     aws_cache_jni_ids(env);
490 
491     struct mqtt_jni_connection *connection = NULL;
492     struct aws_mqtt_client *client3 = (struct aws_mqtt_client *)jni_client;
493     if (!client3) {
494         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt3 Client is invalid/null");
495         return (jlong)NULL;
496     }
497 
498     connection = s_mqtt_connection_new(env, client3, NULL, jni_mqtt_connection);
499     if (!connection) {
500         return (jlong)NULL;
501     }
502 
503     aws_mqtt_client_connection_set_connection_result_handlers(
504         connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
505     aws_mqtt_client_connection_set_connection_interruption_handlers(
506         connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
507     aws_mqtt_client_connection_set_connection_closed_handler(
508         connection->client_connection, s_on_connection_closed, connection);
509 
510     return (jlong)connection;
511 }
512 
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(JNIEnv * env,jclass jni_class,jlong jni_client,jobject jni_mqtt_connection)513 JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(
514     JNIEnv *env,
515     jclass jni_class,
516     jlong jni_client,
517     jobject jni_mqtt_connection) {
518     (void)jni_class;
519     aws_cache_jni_ids(env);
520 
521     struct mqtt_jni_connection *connection = NULL;
522     struct aws_mqtt5_client_java_jni *client5_jni = (struct aws_mqtt5_client_java_jni *)jni_client;
523     if (!client5_jni) {
524         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt5 Client is invalid/null");
525         return (jlong)NULL;
526     }
527 
528     connection = s_mqtt_connection_new(env, NULL, client5_jni, jni_mqtt_connection);
529     if (!connection) {
530         return (jlong)NULL;
531     }
532 
533     aws_mqtt_client_connection_set_connection_result_handlers(
534         connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
535     aws_mqtt_client_connection_set_connection_interruption_handlers(
536         connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
537     aws_mqtt_client_connection_set_connection_closed_handler(
538         connection->client_connection, s_on_connection_closed, connection);
539 
540     return (jlong)connection;
541 }
542 
543 /* The disconnect callback called on shutdown. We will release the underlying connection here, which should init the
544 ** client shutdown process. Then on termination callback, we will finally release all jni resources.
545 */
s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection * connection,void * user_data)546 static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) {
547     (void)user_data;
548 
549     AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references");
550 
551     /* Release the underlying mqtt connection */
552     aws_mqtt_client_connection_release(connection);
553 }
554 
555 /*******************************************************************************
556  * clean_up
557  ******************************************************************************/
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(JNIEnv * env,jclass jni_class,jlong jni_connection)558 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(
559     JNIEnv *env,
560     jclass jni_class,
561     jlong jni_connection) {
562     (void)jni_class;
563     (void)env;
564     aws_cache_jni_ids(env);
565 
566     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
567     s_mqtt_jni_connection_destroy(connection);
568 }
569 
570 /*******************************************************************************
571  * connect
572  ******************************************************************************/
573 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_endpoint,jint jni_port,jlong jni_socket_options,jlong jni_tls_ctx,jstring jni_client_id,jboolean jni_clean_session,jint keep_alive_secs,jshort ping_timeout_ms,jint protocol_operation_timeout_ms)574 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(
575     JNIEnv *env,
576     jclass jni_class,
577     jlong jni_connection,
578     jstring jni_endpoint,
579     jint jni_port,
580     jlong jni_socket_options,
581     jlong jni_tls_ctx,
582     jstring jni_client_id,
583     jboolean jni_clean_session,
584     jint keep_alive_secs,
585     jshort ping_timeout_ms,
586     jint protocol_operation_timeout_ms) {
587     (void)jni_class;
588     aws_cache_jni_ids(env);
589 
590     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
591     if (!connection) {
592         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Connection is invalid/null");
593         return;
594     }
595 
596     struct aws_byte_cursor client_id;
597     AWS_ZERO_STRUCT(client_id);
598     struct aws_byte_cursor endpoint = aws_jni_byte_cursor_from_jstring_acquire(env, jni_endpoint);
599     uint32_t port = (uint32_t)jni_port;
600     if (!port) {
601         aws_jni_throw_runtime_exception(
602             env,
603             "MqttClientConnection.mqtt_new: Endpoint should be in the format hostname:port and port must not be 0");
604         goto cleanup;
605     }
606 
607     struct mqtt_jni_async_callback *connect_callback = s_mqtt_jni_async_callback_new(connection, NULL, env);
608     if (connect_callback == NULL) {
609         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Failed to create async callback");
610         goto cleanup;
611     }
612 
613     s_mqtt_jni_connection_acquire(connection);
614 
615     struct aws_socket_options default_socket_options;
616     AWS_ZERO_STRUCT(default_socket_options);
617     default_socket_options.type = AWS_SOCKET_STREAM;
618     default_socket_options.connect_timeout_ms = 3000;
619     struct aws_socket_options *socket_options = &default_socket_options;
620     if (jni_socket_options) {
621         socket_options = (struct aws_socket_options *)jni_socket_options;
622     }
623     memcpy(&connection->socket_options, socket_options, sizeof(struct aws_socket_options));
624 
625     /* if a tls_ctx was provided, initialize tls options */
626     struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_ctx;
627     struct aws_tls_connection_options *tls_options = NULL;
628     if (tls_ctx) {
629         tls_options = &connection->tls_options;
630         aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx);
631         aws_tls_connection_options_set_server_name(tls_options, aws_jni_get_allocator(), &endpoint);
632     }
633 
634     client_id = aws_jni_byte_cursor_from_jstring_acquire(env, jni_client_id);
635     bool clean_session = jni_clean_session != 0;
636 
637     struct aws_mqtt_connection_options connect_options;
638     AWS_ZERO_STRUCT(connect_options);
639     connect_options.host_name = endpoint;
640     connect_options.port = port;
641     connect_options.socket_options = &connection->socket_options;
642     connect_options.tls_options = tls_options;
643     connect_options.client_id = client_id;
644     connect_options.keep_alive_time_secs = (uint16_t)keep_alive_secs;
645     connect_options.ping_timeout_ms = ping_timeout_ms;
646     connect_options.protocol_operation_timeout_ms = protocol_operation_timeout_ms;
647     connect_options.clean_session = clean_session;
648     connect_options.on_connection_complete = s_on_connection_complete;
649     connect_options.user_data = connect_callback;
650 
651     int result = aws_mqtt_client_connection_connect(connection->client_connection, &connect_options);
652     if (result != AWS_OP_SUCCESS) {
653         s_mqtt_jni_connection_release(connection);
654         s_mqtt_jni_async_callback_destroy(connect_callback, env);
655         aws_jni_throw_runtime_exception(
656             env, "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_connect failed");
657     }
658 
659 cleanup:
660     aws_jni_byte_cursor_from_jstring_release(env, jni_endpoint, endpoint);
661     aws_jni_byte_cursor_from_jstring_release(env, jni_client_id, client_id);
662 }
663 
664 /*******************************************************************************
665  * disconnect
666  ******************************************************************************/
667 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(JNIEnv * env,jclass jni_class,jlong jni_connection,jobject jni_ack)668 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(
669     JNIEnv *env,
670     jclass jni_class,
671     jlong jni_connection,
672     jobject jni_ack) {
673     (void)jni_class;
674     aws_cache_jni_ids(env);
675 
676     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
677     if (!connection) {
678         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Invalid connection");
679         return;
680     }
681 
682     struct mqtt_jni_async_callback *disconnect_callback = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
683     if (disconnect_callback == NULL) {
684         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Failed to create async callback");
685         return;
686     }
687 
688     if (aws_mqtt_client_connection_disconnect(
689             connection->client_connection, s_on_connection_disconnected, disconnect_callback) != AWS_OP_SUCCESS) {
690         int error = aws_last_error();
691         /*
692          * Disconnect invoked on a disconnected connection can happen under normal circumstances.  Invoke the callback
693          * manually since it won't get invoked otherwise.
694          */
695         AWS_LOGF_WARN(
696             AWS_LS_MQTT_CLIENT,
697             "MqttClientConnection.mqtt_disconnect: error calling disconnect - %d(%s)",
698             error,
699             aws_error_str(error));
700         s_on_connection_disconnected(connection->client_connection, disconnect_callback);
701     }
702 }
703 
704 /*******************************************************************************
705  * subscribe
706  ******************************************************************************/
707 /* called from any sub, unsub, or pub ack */
s_deliver_ack_success(struct mqtt_jni_async_callback * callback,JNIEnv * env)708 static void s_deliver_ack_success(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
709     AWS_FATAL_ASSERT(callback);
710     AWS_FATAL_ASSERT(callback->connection);
711 
712     if (callback->async_callback) {
713         (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_success);
714         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
715     }
716 }
717 
s_deliver_ack_failure(struct mqtt_jni_async_callback * callback,int error_code,JNIEnv * env)718 static void s_deliver_ack_failure(struct mqtt_jni_async_callback *callback, int error_code, JNIEnv *env) {
719     AWS_FATAL_ASSERT(callback);
720     AWS_FATAL_ASSERT(callback->connection);
721     AWS_FATAL_ASSERT(env);
722 
723     if (callback->async_callback) {
724         jobject jni_reason = s_new_mqtt_exception(env, error_code);
725         (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_failure, jni_reason);
726         (*env)->DeleteLocalRef(env, jni_reason);
727         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
728     }
729 }
730 
s_on_op_complete(struct aws_mqtt_client_connection * connection,uint16_t packet_id,int error_code,void * user_data)731 static void s_on_op_complete(
732     struct aws_mqtt_client_connection *connection,
733     uint16_t packet_id,
734     int error_code,
735     void *user_data) {
736     AWS_FATAL_ASSERT(connection);
737     (void)packet_id;
738 
739     struct mqtt_jni_async_callback *callback = user_data;
740     if (!callback) {
741         return;
742     }
743 
744     /********** JNI ENV ACQUIRE **********/
745     JavaVM *jvm = callback->connection->jvm;
746     JNIEnv *env = aws_jni_acquire_thread_env(jvm);
747     if (env == NULL) {
748         return;
749     }
750 
751     if (error_code) {
752         s_deliver_ack_failure(callback, error_code, env);
753     } else {
754         s_deliver_ack_success(callback, env);
755     }
756 
757     s_mqtt_jni_async_callback_destroy(callback, env);
758 
759     aws_jni_release_thread_env(jvm, env);
760     /********** JNI ENV RELEASE **********/
761 }
762 
s_is_qos_successful(enum aws_mqtt_qos qos)763 static bool s_is_qos_successful(enum aws_mqtt_qos qos) {
764     return qos < 128;
765 }
766 
s_on_ack(struct aws_mqtt_client_connection * connection,uint16_t packet_id,const struct aws_byte_cursor * topic,enum aws_mqtt_qos qos,int error_code,void * user_data)767 static void s_on_ack(
768     struct aws_mqtt_client_connection *connection,
769     uint16_t packet_id,
770     const struct aws_byte_cursor *topic,
771     enum aws_mqtt_qos qos,
772     int error_code,
773     void *user_data) {
774     (void)topic;
775 
776     // Handle a case when the server processed SUBSCRIBE request successfully, but rejected a subscription for some
777     // reason, i.e. error_code is 0 and qos is 0x80.
778     // This mostly applies to mqtt5to3adapter, as MQTT3 client will be disconnected on unsuccessful subscribe.
779     if (error_code == 0 && !s_is_qos_successful(qos)) {
780         error_code = AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE;
781     }
782 
783     s_on_op_complete(connection, packet_id, error_code, user_data);
784 }
785 
s_cleanup_handler(void * user_data)786 static void s_cleanup_handler(void *user_data) {
787     struct mqtt_jni_async_callback *handler = user_data;
788 
789     /********** JNI ENV ACQUIRE **********/
790     JavaVM *jvm = handler->connection->jvm;
791     JNIEnv *env = aws_jni_acquire_thread_env(jvm);
792     if (env == NULL) {
793         return;
794     }
795 
796     s_mqtt_jni_async_callback_destroy(handler, env);
797 
798     aws_jni_release_thread_env(jvm, env);
799     /********** JNI ENV RELEASE **********/
800 }
801 
s_on_subscription_delivered(struct aws_mqtt_client_connection * connection,const struct aws_byte_cursor * topic,const struct aws_byte_cursor * payload,bool dup,enum aws_mqtt_qos qos,bool retain,void * user_data)802 static void s_on_subscription_delivered(
803     struct aws_mqtt_client_connection *connection,
804     const struct aws_byte_cursor *topic,
805     const struct aws_byte_cursor *payload,
806     bool dup,
807     enum aws_mqtt_qos qos,
808     bool retain,
809     void *user_data) {
810 
811     AWS_FATAL_ASSERT(connection);
812     AWS_FATAL_ASSERT(topic);
813     AWS_FATAL_ASSERT(payload);
814     AWS_FATAL_ASSERT(user_data);
815 
816     struct mqtt_jni_async_callback *callback = user_data;
817     if (!callback->async_callback) {
818         return;
819     }
820 
821     /********** JNI ENV ACQUIRE **********/
822     JNIEnv *env = aws_jni_acquire_thread_env(callback->connection->jvm);
823     if (env == NULL) {
824         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
825         return;
826     }
827 
828     jbyteArray jni_payload = (*env)->NewByteArray(env, (jsize)payload->len);
829     (*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)payload->len, (const signed char *)payload->ptr);
830 
831     jstring jni_topic = aws_jni_string_from_cursor(env, topic);
832 
833     (*env)->CallVoidMethod(
834         env, callback->async_callback, message_handler_properties.deliver, jni_topic, jni_payload, dup, qos, retain);
835 
836     (*env)->DeleteLocalRef(env, jni_payload);
837     (*env)->DeleteLocalRef(env, jni_topic);
838 
839     AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
840 
841     aws_jni_release_thread_env(callback->connection->jvm, env);
842     /********** JNI ENV RELEASE **********/
843 }
844 
845 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jobject jni_handler,jobject jni_ack)846 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(
847     JNIEnv *env,
848     jclass jni_class,
849     jlong jni_connection,
850     jstring jni_topic,
851     jint jni_qos,
852     jobject jni_handler,
853     jobject jni_ack) {
854     (void)jni_class;
855     aws_cache_jni_ids(env);
856 
857     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
858     if (!connection) {
859         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Invalid connection");
860         return 0;
861     }
862 
863     struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
864     if (!handler) {
865         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate handler");
866         return 0;
867     }
868 
869     /* from here, any failure requires error_cleanup */
870     struct mqtt_jni_async_callback *sub_ack = NULL;
871     if (jni_ack) {
872         sub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
873         if (!sub_ack) {
874             aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate sub ack");
875             goto error_cleanup;
876         }
877     }
878 
879     struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
880     enum aws_mqtt_qos qos = jni_qos;
881 
882     uint16_t msg_id = aws_mqtt_client_connection_subscribe(
883         connection->client_connection,
884         &topic,
885         qos,
886         s_on_subscription_delivered,
887         handler,
888         s_cleanup_handler,
889         s_on_ack,
890         sub_ack);
891     aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
892     if (msg_id == 0) {
893         aws_jni_throw_runtime_exception(
894             env, "MqttClientConnection.mqtt_subscribe: aws_mqtt_client_connection_subscribe failed");
895         goto error_cleanup;
896     }
897 
898     return msg_id;
899 
900 error_cleanup:
901     if (handler) {
902         s_mqtt_jni_async_callback_destroy(handler, env);
903     }
904 
905     if (sub_ack) {
906         s_mqtt_jni_async_callback_destroy(sub_ack, env);
907     }
908 
909     return 0;
910 }
911 
912 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(JNIEnv * env,jclass jni_class,jlong jni_connection,jobject jni_handler)913 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(
914     JNIEnv *env,
915     jclass jni_class,
916     jlong jni_connection,
917     jobject jni_handler) {
918     (void)jni_class;
919     aws_cache_jni_ids(env);
920 
921     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
922     if (!connection) {
923         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid connection");
924         return;
925     }
926 
927     if (!jni_handler) {
928         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid handler");
929         return;
930     }
931 
932     struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
933     if (!handler) {
934         aws_jni_throw_runtime_exception(
935             env, "MqttClientConnection.mqttClientConnectionOnMessage: Unable to allocate handler");
936         return;
937     }
938 
939     if (aws_mqtt_client_connection_set_on_any_publish_handler(
940             connection->client_connection, s_on_subscription_delivered, handler)) {
941         aws_jni_throw_runtime_exception(
942             env, "MqttClientConnection.mqttClientConnectionOnMessage: Failed to install on_any_publish_handler");
943         goto error_cleanup;
944     }
945 
946     if (connection->on_message) {
947         s_mqtt_jni_async_callback_destroy(connection->on_message, env);
948     }
949 
950     connection->on_message = handler;
951 
952     return;
953 
954 error_cleanup:
955     if (handler) {
956         s_mqtt_jni_async_callback_destroy(handler, env);
957     }
958 }
959 
960 /*******************************************************************************
961  * unsubscribe
962  ******************************************************************************/
963 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jobject jni_ack)964 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(
965     JNIEnv *env,
966     jclass jni_class,
967     jlong jni_connection,
968     jstring jni_topic,
969     jobject jni_ack) {
970     (void)jni_class;
971     aws_cache_jni_ids(env);
972 
973     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
974     if (!connection) {
975         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Invalid connection");
976         return 0;
977     }
978 
979     struct mqtt_jni_async_callback *unsub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
980     if (!unsub_ack) {
981         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Unable to allocate unsub ack");
982         goto error_cleanup;
983     }
984 
985     struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
986     uint16_t msg_id =
987         aws_mqtt_client_connection_unsubscribe(connection->client_connection, &topic, s_on_op_complete, unsub_ack);
988     aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
989     if (msg_id == 0) {
990         aws_jni_throw_runtime_exception(
991             env, "MqttClientConnection.mqtt_unsubscribe: aws_mqtt_client_connection_unsubscribe failed");
992         goto error_cleanup;
993     }
994 
995     return msg_id;
996 
997 error_cleanup:
998     if (unsub_ack) {
999         s_mqtt_jni_async_callback_destroy(unsub_ack, env);
1000     }
1001     return 0;
1002 }
1003 
1004 /*******************************************************************************
1005  * publish
1006  ******************************************************************************/
1007 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jboolean jni_retain,jbyteArray jni_payload,jobject jni_ack)1008 jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(
1009     JNIEnv *env,
1010     jclass jni_class,
1011     jlong jni_connection,
1012     jstring jni_topic,
1013     jint jni_qos,
1014     jboolean jni_retain,
1015     jbyteArray jni_payload,
1016     jobject jni_ack) {
1017     (void)jni_class;
1018     aws_cache_jni_ids(env);
1019 
1020     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1021     if (!connection) {
1022         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid connection");
1023         return 0;
1024     }
1025 
1026     if (!jni_topic) {
1027         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid/null topic");
1028         return 0;
1029     }
1030 
1031     struct mqtt_jni_async_callback *pub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
1032     if (!pub_ack) {
1033         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Unable to allocate pub ack");
1034         goto error_cleanup;
1035     }
1036 
1037     struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
1038 
1039     struct aws_byte_cursor payload;
1040     AWS_ZERO_STRUCT(payload);
1041     if (jni_payload != NULL) {
1042         payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
1043     }
1044 
1045     enum aws_mqtt_qos qos = jni_qos;
1046     bool retain = jni_retain != 0;
1047 
1048     uint16_t msg_id = aws_mqtt_client_connection_publish(
1049         connection->client_connection, &topic, qos, retain, &payload, s_on_op_complete, pub_ack);
1050     aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
1051 
1052     if (jni_payload != NULL) {
1053         aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
1054     }
1055 
1056     if (msg_id == 0) {
1057         aws_jni_throw_runtime_exception(
1058             env, "MqttClientConnection.mqtt_publish: aws_mqtt_client_connection_publish failed");
1059         goto error_cleanup;
1060     }
1061 
1062     return msg_id;
1063 
1064 error_cleanup:
1065     if (pub_ack) {
1066         s_mqtt_jni_async_callback_destroy(pub_ack, env);
1067     }
1068 
1069     return 0;
1070 }
1071 
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_topic,jint jni_qos,jboolean jni_retain,jbyteArray jni_payload)1072 JNIEXPORT jboolean JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(
1073     JNIEnv *env,
1074     jclass jni_class,
1075     jlong jni_connection,
1076     jstring jni_topic,
1077     jint jni_qos,
1078     jboolean jni_retain,
1079     jbyteArray jni_payload) {
1080     (void)jni_class;
1081     aws_cache_jni_ids(env);
1082 
1083     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1084     if (!connection) {
1085         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Invalid connection");
1086         return false;
1087     }
1088 
1089     if (jni_topic == NULL) {
1090         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Topic must be non-null");
1091         return false;
1092     }
1093     struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
1094 
1095     struct aws_byte_cursor payload;
1096     AWS_ZERO_STRUCT(payload);
1097     if (jni_payload != NULL) {
1098         payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
1099     }
1100 
1101     enum aws_mqtt_qos qos = jni_qos;
1102     bool retain = jni_retain != 0;
1103 
1104     int result = aws_mqtt_client_connection_set_will(connection->client_connection, &topic, qos, retain, &payload);
1105     aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
1106 
1107     if (jni_payload != NULL) {
1108         aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
1109     }
1110 
1111     return (result == AWS_OP_SUCCESS);
1112 }
1113 
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(JNIEnv * env,jclass jni_class,jlong jni_connection,jstring jni_user,jstring jni_pass)1114 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(
1115     JNIEnv *env,
1116     jclass jni_class,
1117     jlong jni_connection,
1118     jstring jni_user,
1119     jstring jni_pass) {
1120     (void)jni_class;
1121     aws_cache_jni_ids(env);
1122 
1123     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1124     if (!connection) {
1125         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Invalid connection");
1126         return;
1127     }
1128 
1129     struct aws_byte_cursor username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_user);
1130     struct aws_byte_cursor password;
1131     struct aws_byte_cursor *password_ptr = NULL;
1132     AWS_ZERO_STRUCT(password);
1133     if (jni_pass != NULL) {
1134         password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_pass);
1135         password_ptr = &password;
1136     }
1137 
1138     if (aws_mqtt_client_connection_set_login(connection->client_connection, &username, password_ptr)) {
1139         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Failed to set login");
1140     }
1141 
1142     aws_jni_byte_cursor_from_jstring_release(env, jni_user, username);
1143 
1144     if (password.len > 0) {
1145         aws_jni_byte_cursor_from_jstring_release(env, jni_pass, password);
1146     }
1147 }
1148 
1149 JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(JNIEnv * env,jclass jni_class,jlong jni_connection,jlong jni_min_timeout,jlong jni_max_timeout)1150     Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(
1151         JNIEnv *env,
1152         jclass jni_class,
1153         jlong jni_connection,
1154         jlong jni_min_timeout,
1155         jlong jni_max_timeout) {
1156     (void)jni_class;
1157     aws_cache_jni_ids(env);
1158 
1159     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1160     if (!connection) {
1161         aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_reconnect_timeout: Invalid connection");
1162         return;
1163     }
1164 
1165     if (aws_mqtt_client_connection_set_reconnect_timeout(
1166             connection->client_connection, jni_min_timeout, jni_max_timeout)) {
1167         aws_jni_throw_runtime_exception(
1168             env, "MqttClientConnection.mqtt_reconnect_timeout: Failed to set reconnect timeout");
1169     }
1170 }
1171 
1172 ///////
s_ws_handshake_destroy(struct mqtt_jni_ws_handshake * ws_handshake)1173 static void s_ws_handshake_destroy(struct mqtt_jni_ws_handshake *ws_handshake) {
1174     if (!ws_handshake) {
1175         return;
1176     }
1177 
1178     s_mqtt_jni_connection_release(ws_handshake->connection);
1179     aws_mem_release(aws_jni_get_allocator(), ws_handshake);
1180 }
1181 
s_ws_handshake_transform(struct aws_http_message * request,void * user_data,aws_mqtt_transform_websocket_handshake_complete_fn * complete_fn,void * complete_ctx)1182 static void s_ws_handshake_transform(
1183     struct aws_http_message *request,
1184     void *user_data,
1185     aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
1186     void *complete_ctx) {
1187 
1188     struct mqtt_jni_connection *connection = user_data;
1189 
1190     /********** JNI ENV ACQUIRE **********/
1191     JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
1192     if (env == NULL) {
1193         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
1194         complete_fn(request, AWS_ERROR_INVALID_STATE, complete_ctx);
1195         return;
1196     }
1197 
1198     struct aws_allocator *alloc = aws_jni_get_allocator();
1199 
1200     struct mqtt_jni_ws_handshake *ws_handshake = aws_mem_calloc(alloc, 1, sizeof(struct mqtt_jni_ws_handshake));
1201     if (!ws_handshake) {
1202         goto error;
1203     }
1204 
1205     ws_handshake->connection = connection;
1206     s_mqtt_jni_connection_acquire(ws_handshake->connection);
1207 
1208     ws_handshake->complete_ctx = complete_ctx;
1209     ws_handshake->complete_fn = complete_fn;
1210     ws_handshake->http_request = request;
1211 
1212     jobject java_http_request = aws_java_http_request_from_native(env, request, NULL);
1213     if (!java_http_request) {
1214         aws_raise_error(AWS_ERROR_UNKNOWN); /* TODO: given java exception, choose appropriate aws error code */
1215         goto error;
1216     }
1217 
1218     jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
1219     if (mqtt_connection != NULL) {
1220         (*env)->CallVoidMethod(
1221             env, mqtt_connection, mqtt_connection_properties.on_websocket_handshake, java_http_request, ws_handshake);
1222 
1223         (*env)->DeleteLocalRef(env, mqtt_connection);
1224 
1225         AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
1226     }
1227 
1228     (*env)->DeleteLocalRef(env, java_http_request);
1229     aws_jni_release_thread_env(connection->jvm, env);
1230     /********** JNI ENV RELEASE SUCCESS PATH **********/
1231 
1232     return;
1233 
1234 error:;
1235 
1236     int error_code = aws_last_error();
1237     s_ws_handshake_destroy(ws_handshake);
1238     complete_fn(request, error_code, complete_ctx);
1239     aws_jni_release_thread_env(connection->jvm, env);
1240     /********** JNI ENV RELEASE FAILURE PATH **********/
1241 }
1242 
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(JNIEnv * env,jclass jni_class,jlong jni_connection)1243 JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(
1244     JNIEnv *env,
1245     jclass jni_class,
1246     jlong jni_connection) {
1247     (void)jni_class;
1248     aws_cache_jni_ids(env);
1249 
1250     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1251     if (!connection) {
1252         aws_raise_error(AWS_ERROR_INVALID_STATE);
1253         aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Invalid connection");
1254         return;
1255     }
1256 
1257     if (aws_mqtt_client_connection_use_websockets(
1258             connection->client_connection, s_ws_handshake_transform, connection, NULL, NULL)) {
1259         aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Failed to use websockets");
1260         return;
1261     }
1262 }
1263 
1264 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(JNIEnv * env,jclass jni_class,jlong jni_connection,jbyteArray jni_marshalled_request,jobject jni_throwable,jlong jni_user_data)1265 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(
1266     JNIEnv *env,
1267     jclass jni_class,
1268     jlong jni_connection,
1269     jbyteArray jni_marshalled_request,
1270     jobject jni_throwable,
1271     jlong jni_user_data) {
1272     (void)jni_class;
1273     (void)jni_connection;
1274     aws_cache_jni_ids(env);
1275 
1276     struct mqtt_jni_ws_handshake *ws_handshake = (void *)jni_user_data;
1277     int error_code = AWS_ERROR_SUCCESS;
1278 
1279     if (jni_throwable != NULL) {
1280         if ((*env)->IsInstanceOf(env, jni_throwable, crt_runtime_exception_properties.crt_runtime_exception_class)) {
1281             error_code = (*env)->GetIntField(env, jni_throwable, crt_runtime_exception_properties.error_code_field_id);
1282         }
1283 
1284         if (error_code == AWS_ERROR_SUCCESS) {
1285             error_code = AWS_ERROR_UNKNOWN; /* is there anything more that could be done here? */
1286         }
1287 
1288         goto done;
1289     }
1290 
1291     if (aws_apply_java_http_request_changes_to_native_request(
1292             env, jni_marshalled_request, NULL, ws_handshake->http_request)) {
1293         error_code = aws_last_error();
1294         goto done;
1295     }
1296 
1297 done:
1298     ws_handshake->complete_fn(ws_handshake->http_request, error_code, ws_handshake->complete_ctx);
1299     s_ws_handshake_destroy(ws_handshake);
1300 }
1301 
1302 JNIEXPORT
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(JNIEnv * env,jclass jni_class,jlong jni_connection,jint jni_proxy_connection_type,jstring jni_proxy_host,jint jni_proxy_port,jlong jni_proxy_tls_context,jint jni_proxy_authorization_type,jstring jni_proxy_authorization_username,jstring jni_proxy_authorization_password)1303 void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(
1304     JNIEnv *env,
1305     jclass jni_class,
1306     jlong jni_connection,
1307     jint jni_proxy_connection_type,
1308     jstring jni_proxy_host,
1309     jint jni_proxy_port,
1310     jlong jni_proxy_tls_context,
1311     jint jni_proxy_authorization_type,
1312     jstring jni_proxy_authorization_username,
1313     jstring jni_proxy_authorization_password) {
1314 
1315     (void)jni_class;
1316     aws_cache_jni_ids(env);
1317 
1318     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1319 
1320     struct aws_http_proxy_options proxy_options;
1321     AWS_ZERO_STRUCT(proxy_options);
1322 
1323     if (!jni_proxy_host) {
1324         aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: proxyHost must not be null.");
1325         return;
1326     }
1327 
1328     proxy_options.connection_type = (enum aws_http_proxy_connection_type)jni_proxy_connection_type;
1329 
1330     proxy_options.host = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_host);
1331     proxy_options.port = (uint32_t)jni_proxy_port;
1332 
1333     proxy_options.auth_type = (enum aws_http_proxy_authentication_type)jni_proxy_authorization_type;
1334 
1335     if (jni_proxy_authorization_username) {
1336         proxy_options.auth_username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_username);
1337     }
1338 
1339     if (jni_proxy_authorization_password) {
1340         proxy_options.auth_password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_password);
1341     }
1342 
1343     struct aws_tls_connection_options proxy_tls_conn_options;
1344     AWS_ZERO_STRUCT(proxy_tls_conn_options);
1345 
1346     if (jni_proxy_tls_context != 0) {
1347         struct aws_tls_ctx *proxy_tls_ctx = (struct aws_tls_ctx *)jni_proxy_tls_context;
1348         aws_tls_connection_options_init_from_ctx(&proxy_tls_conn_options, proxy_tls_ctx);
1349         aws_tls_connection_options_set_server_name(
1350             &proxy_tls_conn_options, aws_jni_get_allocator(), &proxy_options.host);
1351         proxy_options.tls_options = &proxy_tls_conn_options;
1352     }
1353 
1354     if (aws_mqtt_client_connection_set_http_proxy_options(connection->client_connection, &proxy_options)) {
1355         aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: Failed to set proxy options");
1356     }
1357 
1358     if (jni_proxy_authorization_password) {
1359         aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_password, proxy_options.auth_password);
1360     }
1361 
1362     if (jni_proxy_authorization_username) {
1363         aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_username, proxy_options.auth_username);
1364     }
1365 
1366     aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_host, proxy_options.host);
1367     aws_tls_connection_options_clean_up(&proxy_tls_conn_options);
1368 }
1369 
1370 JNIEXPORT jobject JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(JNIEnv * env,jclass jni_class,jlong jni_connection)1371     Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(
1372         JNIEnv *env,
1373         jclass jni_class,
1374         jlong jni_connection) {
1375     (void)jni_class;
1376     aws_cache_jni_ids(env);
1377 
1378     struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
1379     if (!connection) {
1380         aws_raise_error(AWS_ERROR_INVALID_STATE);
1381         aws_jni_throw_runtime_exception(env, "MqttClientConnection.getOperationStatistics: Invalid connection");
1382         return NULL;
1383     }
1384 
1385     /* Construct Java object */
1386     jobject jni_operation_statistics = (*env)->NewObject(
1387         env,
1388         mqtt_connection_operation_statistics_properties.statistics_class,
1389         mqtt_connection_operation_statistics_properties.statistics_constructor_id);
1390     if (jni_operation_statistics == NULL) {
1391         aws_raise_error(AWS_ERROR_INVALID_STATE);
1392         aws_jni_throw_runtime_exception(
1393             env, "MqttClientConnection.getOperationStatistics: Could not create operation statistics object");
1394         return NULL;
1395     }
1396 
1397     struct aws_mqtt_connection_operation_statistics connection_stats;
1398     aws_mqtt_client_connection_get_stats(connection->client_connection, &connection_stats);
1399 
1400     (*env)->SetLongField(
1401         env,
1402         jni_operation_statistics,
1403         mqtt_connection_operation_statistics_properties.incomplete_operation_count_field_id,
1404         (jlong)connection_stats.incomplete_operation_count);
1405     if (aws_jni_check_and_clear_exception(env)) {
1406         aws_raise_error(AWS_ERROR_INVALID_STATE);
1407         aws_jni_throw_runtime_exception(
1408             env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation count");
1409         return NULL;
1410     }
1411 
1412     (*env)->SetLongField(
1413         env,
1414         jni_operation_statistics,
1415         mqtt_connection_operation_statistics_properties.incomplete_operation_size_field_id,
1416         (jlong)connection_stats.incomplete_operation_size);
1417     if (aws_jni_check_and_clear_exception(env)) {
1418         aws_raise_error(AWS_ERROR_INVALID_STATE);
1419         aws_jni_throw_runtime_exception(
1420             env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation size");
1421         return NULL;
1422     }
1423 
1424     (*env)->SetLongField(
1425         env,
1426         jni_operation_statistics,
1427         mqtt_connection_operation_statistics_properties.unacked_operation_count_field_id,
1428         (jlong)connection_stats.unacked_operation_count);
1429     if (aws_jni_check_and_clear_exception(env)) {
1430         aws_raise_error(AWS_ERROR_INVALID_STATE);
1431         aws_jni_throw_runtime_exception(
1432             env, "MqttClientConnection.getOperationStatistics: could not create unacked operation count");
1433         return NULL;
1434     }
1435 
1436     (*env)->SetLongField(
1437         env,
1438         jni_operation_statistics,
1439         mqtt_connection_operation_statistics_properties.unacked_operation_size_field_id,
1440         (jlong)connection_stats.unacked_operation_size);
1441     if (aws_jni_check_and_clear_exception(env)) {
1442         aws_raise_error(AWS_ERROR_INVALID_STATE);
1443         aws_jni_throw_runtime_exception(
1444             env, "MqttClientConnection.getOperationStatistics: could not create unacked operation size");
1445         return NULL;
1446     }
1447 
1448     return jni_operation_statistics;
1449 }
1450 
1451 #if UINTPTR_MAX == 0xffffffff
1452 #    if defined(_MSC_VER)
1453 #        pragma warning(pop)
1454 #    else
1455 #        pragma GCC diagnostic pop
1456 #    endif
1457 #endif
1458