xref: /aosp_15_r20/external/aws-crt-java/src/native/event_stream_rpc_client.c (revision 3c7ae9de214676c52d19f01067dc1a404272dc11)
1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <jni.h>
7 
8 #include <aws/event-stream/event_stream_rpc_client.h>
9 
10 #include <aws/common/string.h>
11 #include <aws/io/tls_channel_handler.h>
12 
13 #include "crt.h"
14 #include "event_stream_message.h"
15 #include "java_class_ids.h"
16 
17 #if defined(_MSC_VER)
18 #    pragma warning(disable : 4204) /* non-constant aggregate initializer */
19 #endif
20 
21 /* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
22 #if UINTPTR_MAX == 0xffffffff
23 #    if defined(_MSC_VER)
24 #        pragma warning(push)
25 #        pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
26 #    else
27 #        pragma GCC diagnostic push
28 #        pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
29 #        pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
30 #    endif
31 #endif
32 
33 struct connection_callback_data {
34     JavaVM *jvm;
35     jobject java_connection_handler;
36 };
37 
s_destroy_connection_callback_data(struct connection_callback_data * callback_data,JNIEnv * env)38 static void s_destroy_connection_callback_data(struct connection_callback_data *callback_data, JNIEnv *env) {
39     if (callback_data == NULL || env == NULL) {
40         return;
41     }
42 
43     if (callback_data->java_connection_handler) {
44         (*env)->DeleteGlobalRef(env, callback_data->java_connection_handler);
45     }
46 
47     aws_mem_release(aws_jni_get_allocator(), callback_data);
48 }
49 
s_on_connection_setup(struct aws_event_stream_rpc_client_connection * connection,int error_code,void * user_data)50 static void s_on_connection_setup(
51     struct aws_event_stream_rpc_client_connection *connection,
52     int error_code,
53     void *user_data) {
54     (void)connection;
55     struct connection_callback_data *callback_data = user_data;
56 
57     /********** JNI ENV ACQUIRE **********/
58     JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
59     if (env == NULL) {
60         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
61         return;
62     }
63 
64     (*env)->CallVoidMethod(
65         env,
66         callback_data->java_connection_handler,
67         event_stream_client_connection_handler_properties.onSetup,
68         (jlong)connection,
69         error_code);
70 
71     if (aws_jni_check_and_clear_exception(env) && !error_code) {
72         aws_event_stream_rpc_client_connection_close(connection, AWS_ERROR_UNKNOWN);
73     }
74 
75     JavaVM *jvm = callback_data->jvm;
76     if (error_code) {
77         s_destroy_connection_callback_data(callback_data, env);
78     }
79 
80     aws_jni_release_thread_env(jvm, env);
81     /********** JNI ENV RELEASE **********/
82 }
83 
s_on_connection_shutdown(struct aws_event_stream_rpc_client_connection * connection,int error_code,void * user_data)84 static void s_on_connection_shutdown(
85     struct aws_event_stream_rpc_client_connection *connection,
86     int error_code,
87     void *user_data) {
88     (void)connection;
89 
90     struct connection_callback_data *callback_data = user_data;
91 
92     /********** JNI ENV ACQUIRE **********/
93     JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
94     if (env == NULL) {
95         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
96         return;
97     }
98 
99     (*env)->CallVoidMethod(
100         env,
101         callback_data->java_connection_handler,
102         event_stream_client_connection_handler_properties.onClosed,
103         error_code);
104     aws_jni_check_and_clear_exception(env);
105 
106     JavaVM *jvm = callback_data->jvm;
107     s_destroy_connection_callback_data(callback_data, env);
108 
109     aws_jni_release_thread_env(jvm, env);
110     /********** JNI ENV RELEASE **********/
111 }
112 
s_connection_protocol_message(struct aws_event_stream_rpc_client_connection * connection,const struct aws_event_stream_rpc_message_args * message_args,void * user_data)113 static void s_connection_protocol_message(
114     struct aws_event_stream_rpc_client_connection *connection,
115     const struct aws_event_stream_rpc_message_args *message_args,
116     void *user_data) {
117     (void)connection;
118 
119     struct connection_callback_data *callback_data = user_data;
120 
121     /********** JNI ENV ACQUIRE **********/
122     JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
123     if (env == NULL) {
124         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
125         return;
126     }
127 
128     jbyteArray headers_array = aws_event_stream_rpc_marshall_headers_to_byteArray(
129         aws_jni_get_allocator(), env, message_args->headers, message_args->headers_count);
130 
131     struct aws_byte_cursor payload_cur = aws_byte_cursor_from_buf(message_args->payload);
132     jbyteArray payload_byte_array = aws_jni_byte_array_from_cursor(env, &payload_cur);
133 
134     (*env)->CallVoidMethod(
135         env,
136         callback_data->java_connection_handler,
137         event_stream_client_connection_handler_properties.onProtocolMessage,
138         headers_array,
139         payload_byte_array,
140         (jint)message_args->message_type,
141         (jint)message_args->message_flags);
142 
143     (*env)->DeleteLocalRef(env, payload_byte_array);
144     (*env)->DeleteLocalRef(env, headers_array);
145     aws_jni_check_and_clear_exception(env);
146 
147     aws_jni_release_thread_env(callback_data->jvm, env);
148     /********** JNI ENV RELEASE **********/
149 }
150 
151 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_clientConnect(JNIEnv * env,jclass jni_class,jbyteArray jni_host_name,jint port,jlong jni_socket_options,jlong jni_tls_ctx,jlong jni_client_bootstrap,jobject jni_client_connection_handler)152 jint JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_clientConnect(
153     JNIEnv *env,
154     jclass jni_class,
155     jbyteArray jni_host_name,
156     jint port,
157     jlong jni_socket_options,
158     jlong jni_tls_ctx,
159     jlong jni_client_bootstrap,
160     jobject jni_client_connection_handler) {
161     (void)jni_class;
162     aws_cache_jni_ids(env);
163 
164     struct aws_client_bootstrap *client_bootstrap = (struct aws_client_bootstrap *)jni_client_bootstrap;
165     struct aws_socket_options *socket_options = (struct aws_socket_options *)jni_socket_options;
166     struct aws_tls_ctx *tls_context = (struct aws_tls_ctx *)jni_tls_ctx;
167 
168     if (!client_bootstrap) {
169         aws_jni_throw_runtime_exception(env, "ClientConnection.clientConnect: Invalid ClientBootstrap");
170         return AWS_OP_ERR;
171     }
172 
173     if (!socket_options) {
174         aws_jni_throw_runtime_exception(env, "ClientConnection.clientConnect: Invalid SocketOptions");
175         return AWS_OP_ERR;
176     }
177 
178     struct aws_tls_connection_options connection_options;
179     AWS_ZERO_STRUCT(connection_options);
180     struct aws_tls_connection_options *conn_options_ptr = NULL;
181     struct aws_string *host_name_str = NULL;
182 
183     if (tls_context) {
184         aws_tls_connection_options_init_from_ctx(&connection_options, tls_context);
185         conn_options_ptr = &connection_options;
186     }
187 
188     struct aws_allocator *allocator = aws_jni_get_allocator();
189     struct connection_callback_data *callback_data =
190         aws_mem_calloc(allocator, 1, sizeof(struct connection_callback_data));
191 
192     if (!callback_data) {
193         goto error;
194     }
195 
196     jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm);
197     if (jvmresult != 0) {
198         aws_jni_throw_runtime_exception(env, "ClientConnection.clientConnect: Unable to get JVM");
199         goto error;
200     }
201 
202     callback_data->java_connection_handler = (*env)->NewGlobalRef(env, jni_client_connection_handler);
203     if (!callback_data->java_connection_handler) {
204         aws_jni_throw_runtime_exception(env, "ClientConnection.clientConnect: Unable to create global ref");
205         goto error;
206     }
207 
208     const size_t host_name_len = (*env)->GetArrayLength(env, jni_host_name);
209     jbyte *host_name = (*env)->GetPrimitiveArrayCritical(env, jni_host_name, NULL);
210     host_name_str = aws_string_new_from_array(allocator, (uint8_t *)host_name, host_name_len);
211     (*env)->ReleasePrimitiveArrayCritical(env, jni_host_name, host_name, 0);
212 
213     if (!host_name_str) {
214         aws_jni_throw_runtime_exception(env, "ServerListener.server_listener_new: Unable to allocate");
215         goto error;
216     }
217 
218     const char *c_str_host_name = aws_string_c_str(host_name_str);
219 
220     struct aws_event_stream_rpc_client_connection_options conn_options = {
221         .socket_options = socket_options,
222         .tls_options = conn_options_ptr,
223         .user_data = callback_data,
224         .port = (uint32_t)port,
225         .bootstrap = client_bootstrap,
226         .host_name = c_str_host_name,
227         .on_connection_setup = s_on_connection_setup,
228         .on_connection_shutdown = s_on_connection_shutdown,
229         .on_connection_protocol_message = s_connection_protocol_message,
230     };
231 
232     if (aws_event_stream_rpc_client_connection_connect(allocator, &conn_options)) {
233         goto error;
234     }
235 
236     aws_string_destroy(host_name_str);
237     aws_tls_connection_options_clean_up(&connection_options);
238 
239     return AWS_OP_SUCCESS;
240 
241 error:
242 
243     aws_string_destroy(host_name_str);
244 
245     s_destroy_connection_callback_data(callback_data, env);
246 
247     if (conn_options_ptr) {
248         aws_tls_connection_options_clean_up(conn_options_ptr);
249     }
250 
251     return AWS_OP_ERR;
252 }
253 
254 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_isClientConnectionOpen(JNIEnv * env,jclass jni_class,jlong jni_connection)255 jboolean JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_isClientConnectionOpen(
256     JNIEnv *env,
257     jclass jni_class,
258     jlong jni_connection) {
259     (void)env;
260     (void)jni_class;
261     aws_cache_jni_ids(env);
262 
263     struct aws_event_stream_rpc_client_connection *connection =
264         (struct aws_event_stream_rpc_client_connection *)jni_connection;
265     if (connection == NULL) {
266         return false;
267     }
268 
269     return aws_event_stream_rpc_client_connection_is_open(connection);
270 }
271 
272 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_closeClientConnection(JNIEnv * env,jclass jni_class,jlong jni_connection,jint error_code)273 void JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_closeClientConnection(
274     JNIEnv *env,
275     jclass jni_class,
276     jlong jni_connection,
277     jint error_code) {
278     (void)env;
279     (void)jni_class;
280     aws_cache_jni_ids(env);
281 
282     struct aws_event_stream_rpc_client_connection *connection =
283         (struct aws_event_stream_rpc_client_connection *)jni_connection;
284     if (connection == NULL) {
285         return;
286     }
287 
288     aws_event_stream_rpc_client_connection_close(connection, error_code);
289 }
290 
291 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_acquireClientConnection(JNIEnv * env,jclass jni_class,jlong jni_connection)292 void JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_acquireClientConnection(
293     JNIEnv *env,
294     jclass jni_class,
295     jlong jni_connection) {
296     (void)env;
297     (void)jni_class;
298     aws_cache_jni_ids(env);
299 
300     struct aws_event_stream_rpc_client_connection *connection =
301         (struct aws_event_stream_rpc_client_connection *)jni_connection;
302     if (connection == NULL) {
303         return;
304     }
305 
306     aws_event_stream_rpc_client_connection_acquire(connection);
307 }
308 
309 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_releaseClientConnection(JNIEnv * env,jclass jni_class,jlong jni_connection)310 void JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_releaseClientConnection(
311     JNIEnv *env,
312     jclass jni_class,
313     jlong jni_connection) {
314     (void)env;
315     (void)jni_class;
316     aws_cache_jni_ids(env);
317 
318     struct aws_event_stream_rpc_client_connection *connection =
319         (struct aws_event_stream_rpc_client_connection *)jni_connection;
320     if (connection == NULL) {
321         return;
322     }
323 
324     aws_event_stream_rpc_client_connection_release(connection);
325 }
326 
327 struct message_flush_callback_args {
328     JavaVM *jvm;
329     jobject callback;
330 };
331 
s_destroy_message_flush_callback_args(JNIEnv * env,struct message_flush_callback_args * callback_args)332 static void s_destroy_message_flush_callback_args(JNIEnv *env, struct message_flush_callback_args *callback_args) {
333     if (callback_args == NULL) {
334         return;
335     }
336 
337     if (callback_args->callback != NULL) {
338         (*env)->DeleteGlobalRef(env, callback_args->callback);
339     }
340 
341     aws_mem_release(aws_jni_get_allocator(), callback_args);
342 }
343 
s_message_flush_fn(int error_code,void * user_data)344 static void s_message_flush_fn(int error_code, void *user_data) {
345     struct message_flush_callback_args *callback_data = user_data;
346 
347     /********** JNI ENV ACQUIRE **********/
348     JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
349     if (env == NULL) {
350         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
351         return;
352     }
353 
354     (*env)->CallVoidMethod(
355         env, callback_data->callback, event_stream_server_message_flush_properties.callback, error_code);
356     aws_jni_check_and_clear_exception(env);
357 
358     JavaVM *jvm = callback_data->jvm;
359     s_destroy_message_flush_callback_args(env, callback_data);
360 
361     aws_jni_release_thread_env(jvm, env);
362     /********** JNI ENV RELEASE **********/
363 }
364 
365 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_sendProtocolMessage(JNIEnv * env,jclass jni_class,jlong jni_connection,jbyteArray headers,jbyteArray payload,jint message_type,jint message_flags,jobject callback)366 jint JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_sendProtocolMessage(
367     JNIEnv *env,
368     jclass jni_class,
369     jlong jni_connection,
370     jbyteArray headers,
371     jbyteArray payload,
372     jint message_type,
373     jint message_flags,
374     jobject callback) {
375     (void)jni_class;
376     aws_cache_jni_ids(env);
377 
378     struct aws_event_stream_rpc_client_connection *connection =
379         (struct aws_event_stream_rpc_client_connection *)jni_connection;
380 
381     struct message_flush_callback_args *callback_data = NULL;
382     int ret_val = AWS_OP_ERR;
383 
384     struct aws_event_stream_rpc_marshalled_message marshalled_message;
385     if (aws_event_stream_rpc_marshall_message_args_init(
386             &marshalled_message, aws_jni_get_allocator(), env, headers, payload, NULL, message_flags, message_type)) {
387         goto clean_up;
388     }
389 
390     if (connection == NULL) {
391         goto clean_up;
392     }
393 
394     callback_data = aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct message_flush_callback_args));
395 
396     if (!callback_data) {
397         aws_jni_throw_runtime_exception(env, "ClientConnection.sendProtocolMessage: allocation failed.");
398         goto clean_up;
399     }
400 
401     jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm);
402     if (jvmresult != 0) {
403         aws_jni_throw_runtime_exception(env, "ClientConnection.sendProtocolMessage: Unable to get JVM");
404         goto clean_up;
405     }
406 
407     callback_data->callback = (*env)->NewGlobalRef(env, callback);
408 
409     if (aws_event_stream_rpc_client_connection_send_protocol_message(
410             connection, &marshalled_message.message_args, s_message_flush_fn, callback_data)) {
411         aws_jni_throw_runtime_exception(env, "ClientConnection.sendProtocolMessage: send message failed");
412         goto clean_up;
413     }
414 
415     ret_val = AWS_OP_SUCCESS;
416 
417 clean_up:
418 
419     aws_event_stream_rpc_marshall_message_args_clean_up(&marshalled_message);
420     if (ret_val == AWS_OP_ERR) {
421         s_destroy_message_flush_callback_args(env, callback_data);
422     }
423 
424     return ret_val;
425 }
426 
427 struct continuation_callback_data {
428     JavaVM *jvm;
429     jobject java_continuation;
430     jobject java_continuation_handler;
431 };
432 
s_client_continuation_data_destroy(JNIEnv * env,struct continuation_callback_data * callback_data)433 static void s_client_continuation_data_destroy(JNIEnv *env, struct continuation_callback_data *callback_data) {
434     if (!callback_data) {
435         return;
436     }
437 
438     if (callback_data->java_continuation_handler) {
439         (*env)->DeleteGlobalRef(env, callback_data->java_continuation_handler);
440     }
441 
442     if (callback_data->java_continuation) {
443         (*env)->DeleteGlobalRef(env, callback_data->java_continuation);
444     }
445 
446     aws_mem_release(aws_jni_get_allocator(), callback_data);
447 }
448 
s_stream_continuation(struct aws_event_stream_rpc_client_continuation_token * token,const struct aws_event_stream_rpc_message_args * message_args,void * user_data)449 static void s_stream_continuation(
450     struct aws_event_stream_rpc_client_continuation_token *token,
451     const struct aws_event_stream_rpc_message_args *message_args,
452     void *user_data) {
453     (void)token;
454 
455     struct continuation_callback_data *callback_data = user_data;
456 
457     /********** JNI ENV ACQUIRE **********/
458     JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm);
459     if (env == NULL) {
460         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
461         return;
462     }
463 
464     jbyteArray headers_array = aws_event_stream_rpc_marshall_headers_to_byteArray(
465         aws_jni_get_allocator(), env, message_args->headers, message_args->headers_count);
466 
467     struct aws_byte_cursor payload_cur = aws_byte_cursor_from_buf(message_args->payload);
468     jbyteArray payload_byte_array = aws_jni_byte_array_from_cursor(env, &payload_cur);
469 
470     (*env)->CallVoidMethod(
471         env,
472         callback_data->java_continuation_handler,
473         event_stream_client_continuation_handler_properties.onContinuationMessage,
474         headers_array,
475         payload_byte_array,
476         (jint)message_args->message_type,
477         (jint)message_args->message_flags);
478 
479     (*env)->DeleteLocalRef(env, payload_byte_array);
480     (*env)->DeleteLocalRef(env, headers_array);
481     /* don't really care if they threw here, but we want to make the jvm happy that we checked */
482     aws_jni_check_and_clear_exception(env);
483 
484     aws_jni_release_thread_env(callback_data->jvm, env);
485     /********** JNI ENV RELEASE **********/
486 }
487 
s_stream_continuation_closed(struct aws_event_stream_rpc_client_continuation_token * token,void * user_data)488 static void s_stream_continuation_closed(
489     struct aws_event_stream_rpc_client_continuation_token *token,
490     void *user_data) {
491     (void)token;
492     struct continuation_callback_data *continuation_callback_data = user_data;
493 
494     /********** JNI ENV ACQUIRE **********/
495     JNIEnv *env = aws_jni_acquire_thread_env(continuation_callback_data->jvm);
496     if (env == NULL) {
497         /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
498         return;
499     }
500 
501     (*env)->CallVoidMethod(
502         env,
503         continuation_callback_data->java_continuation_handler,
504         event_stream_client_continuation_handler_properties.onContinuationClosed);
505     /* don't really care if they threw here, but we want to make the jvm happy that we checked */
506     aws_jni_check_and_clear_exception(env);
507 
508     JavaVM *jvm = continuation_callback_data->jvm;
509     s_client_continuation_data_destroy(env, continuation_callback_data);
510     aws_jni_release_thread_env(jvm, env);
511     /********** JNI ENV RELEASE **********/
512 }
513 
514 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnection_newClientStream(JNIEnv * env,jclass jni_class,jlong jni_connection,jobject continuation_handler)515 jlong JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnection_newClientStream(
516     JNIEnv *env,
517     jclass jni_class,
518     jlong jni_connection,
519     jobject continuation_handler) {
520     (void)jni_class;
521     aws_cache_jni_ids(env);
522 
523     struct aws_event_stream_rpc_client_connection *connection =
524         (struct aws_event_stream_rpc_client_connection *)jni_connection;
525 
526     struct continuation_callback_data *continuation_callback_data =
527         aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct continuation_callback_data));
528 
529     if (!continuation_callback_data || !connection) {
530         aws_event_stream_rpc_client_connection_close(connection, aws_last_error());
531         return (jlong)NULL;
532     }
533 
534     jint jvmresult = (*env)->GetJavaVM(env, &continuation_callback_data->jvm);
535     if (jvmresult != 0) {
536         aws_jni_throw_runtime_exception(env, "ClientConnection.newClientStream: Unable to get JVM");
537         goto error;
538     }
539 
540     continuation_callback_data->java_continuation_handler = (*env)->NewGlobalRef(env, continuation_handler);
541     if (!continuation_callback_data->java_continuation_handler) {
542         aws_jni_throw_runtime_exception(env, "ClientConnection.newClientStream: Unable to create reference");
543         goto error;
544     }
545 
546     struct aws_event_stream_rpc_client_stream_continuation_options continuation_options = {
547         .on_continuation_closed = s_stream_continuation_closed,
548         .on_continuation = s_stream_continuation,
549         .user_data = continuation_callback_data,
550     };
551 
552     struct aws_event_stream_rpc_client_continuation_token *token =
553         aws_event_stream_rpc_client_connection_new_stream(connection, &continuation_options);
554 
555     if (!token) {
556         aws_jni_throw_runtime_exception(env, "ClientConnection.newClientStream: Unable to create stream");
557         goto error;
558     }
559 
560     return (jlong)token;
561 
562 error:
563     s_client_continuation_data_destroy(env, continuation_callback_data);
564     return (jlong)NULL;
565 }
566 
567 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_activateContinuation(JNIEnv * env,jclass jni_class,jlong jni_continuation_ptr,jobject continuation,jbyteArray operation_name,jbyteArray headers,jbyteArray payload,jint message_type,jint message_flags,jobject callback)568 jint JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_activateContinuation(
569     JNIEnv *env,
570     jclass jni_class,
571     jlong jni_continuation_ptr,
572     jobject continuation,
573     jbyteArray operation_name,
574     jbyteArray headers,
575     jbyteArray payload,
576     jint message_type,
577     jint message_flags,
578     jobject callback) {
579     (void)jni_class;
580     aws_cache_jni_ids(env);
581 
582     struct aws_event_stream_rpc_client_continuation_token *continuation_token =
583         (struct aws_event_stream_rpc_client_continuation_token *)jni_continuation_ptr;
584 
585     struct continuation_callback_data *continuation_callback_data =
586         aws_event_stream_rpc_client_continuation_get_user_data(continuation_token);
587 
588     struct message_flush_callback_args *callback_data = NULL;
589 
590     int ret_val = AWS_OP_ERR;
591 
592     continuation_callback_data->java_continuation = (*env)->NewGlobalRef(env, continuation);
593 
594     if (!continuation_callback_data->java_continuation) {
595         aws_jni_throw_runtime_exception(
596             env, "ClientConnectionContinuation.activateContinuation: Unable to create reference");
597         goto clean_up;
598     }
599 
600     struct aws_event_stream_rpc_marshalled_message marshalled_message;
601     if (aws_event_stream_rpc_marshall_message_args_init(
602             &marshalled_message,
603             aws_jni_get_allocator(),
604             env,
605             headers,
606             payload,
607             operation_name,
608             message_flags,
609             message_type)) {
610         goto clean_up;
611     }
612 
613     callback_data = aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct message_flush_callback_args));
614 
615     if (!callback_data) {
616         aws_jni_throw_runtime_exception(env, "ClientConnectionContinuation.activateContinuation: allocation failed.");
617         goto clean_up;
618     }
619 
620     jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm);
621     if (jvmresult != 0) {
622         aws_jni_throw_runtime_exception(env, "ClientConnectionContinuation.activateContinuation: Unable to get JVM");
623         goto clean_up;
624     }
625 
626     callback_data->callback = (*env)->NewGlobalRef(env, callback);
627     if (callback_data->callback == NULL) {
628         aws_jni_throw_runtime_exception(
629             env, "ClientConnectionContinuation.activateContinuation: make global ref failed");
630         goto clean_up;
631     }
632 
633     struct aws_byte_cursor operation_cursor = aws_byte_cursor_from_buf(&marshalled_message.operation_buf);
634 
635     if (aws_event_stream_rpc_client_continuation_activate(
636             continuation_token,
637             operation_cursor,
638             &marshalled_message.message_args,
639             s_message_flush_fn,
640             callback_data)) {
641         aws_jni_throw_runtime_exception(env, "ClientConnectionContinuation.activateContinuation: send message failed");
642         goto clean_up;
643     }
644 
645     ret_val = AWS_OP_SUCCESS;
646 
647 clean_up:
648 
649     aws_event_stream_rpc_marshall_message_args_clean_up(&marshalled_message);
650 
651     if (ret_val == AWS_OP_ERR) {
652         s_destroy_message_flush_callback_args(env, callback_data);
653     }
654 
655     return ret_val;
656 }
657 
658 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_sendContinuationMessage(JNIEnv * env,jclass jni_class,jlong jni_continuation_ptr,jbyteArray headers,jbyteArray payload,jint message_type,jint message_flags,jobject callback)659 jint JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_sendContinuationMessage(
660     JNIEnv *env,
661     jclass jni_class,
662     jlong jni_continuation_ptr,
663     jbyteArray headers,
664     jbyteArray payload,
665     jint message_type,
666     jint message_flags,
667     jobject callback) {
668     (void)jni_class;
669     aws_cache_jni_ids(env);
670 
671     struct aws_event_stream_rpc_client_continuation_token *continuation_token =
672         (struct aws_event_stream_rpc_client_continuation_token *)jni_continuation_ptr;
673 
674     struct message_flush_callback_args *callback_data = NULL;
675 
676     int ret_val = AWS_OP_ERR;
677 
678     struct aws_event_stream_rpc_marshalled_message marshalled_message;
679     if (aws_event_stream_rpc_marshall_message_args_init(
680             &marshalled_message, aws_jni_get_allocator(), env, headers, payload, NULL, message_flags, message_type)) {
681         goto clean_up;
682     }
683 
684     if (continuation_token == NULL) {
685         goto clean_up;
686     }
687 
688     callback_data = aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct message_flush_callback_args));
689 
690     if (!callback_data) {
691         aws_jni_throw_runtime_exception(
692             env, "ClientConnectionContinuation.sendContinuationMessage: allocation failed.");
693         goto clean_up;
694     }
695 
696     jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm);
697     if (jvmresult != 0) {
698         aws_jni_throw_runtime_exception(env, "ClientConnectionContinuation.sendContinuationMessage: Unable to get JVM");
699         goto clean_up;
700     }
701 
702     callback_data->callback = (*env)->NewGlobalRef(env, callback);
703     if (callback_data->callback == NULL) {
704         aws_jni_throw_runtime_exception(
705             env, "ClientConnectionContinuation.sendContinuationMessage: make global ref failed");
706         goto clean_up;
707     }
708 
709     if (aws_event_stream_rpc_client_continuation_send_message(
710             continuation_token, &marshalled_message.message_args, s_message_flush_fn, callback_data)) {
711         aws_jni_throw_runtime_exception(
712             env, "ClientConnectionContinuation.sendContinuationMessage: send message failed");
713         goto clean_up;
714     }
715 
716     ret_val = AWS_OP_SUCCESS;
717 
718 clean_up:
719     aws_event_stream_rpc_marshall_message_args_clean_up(&marshalled_message);
720 
721     if (ret_val == AWS_OP_ERR) {
722         s_destroy_message_flush_callback_args(env, callback_data);
723     }
724 
725     return ret_val;
726 }
727 
728 JNIEXPORT
Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_releaseContinuation(JNIEnv * env,jclass jni_class,jlong jni_continuation_ptr)729 void JNICALL Java_software_amazon_awssdk_crt_eventstream_ClientConnectionContinuation_releaseContinuation(
730     JNIEnv *env,
731     jclass jni_class,
732     jlong jni_continuation_ptr) {
733     (void)env;
734     (void)jni_class;
735     aws_cache_jni_ids(env);
736 
737     struct aws_event_stream_rpc_client_continuation_token *continuation_token =
738         (struct aws_event_stream_rpc_client_continuation_token *)jni_continuation_ptr;
739 
740     aws_event_stream_rpc_client_continuation_release(continuation_token);
741 }
742