xref: /aosp_15_r20/external/aws-crt-java/src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTestFixture.java (revision 3c7ae9de214676c52d19f01067dc1a404272dc11)
1 package software.amazon.awssdk.crt.test;
2 
3 
4 import software.amazon.awssdk.crt.*;
5 
6 import software.amazon.awssdk.crt.mqtt5.*;
7 
8 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.PublishEvents;
9 import software.amazon.awssdk.crt.mqtt5.packets.*;
10 
11 
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.HashMap;
15 
16 import java.util.concurrent.CompletableFuture;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.TimeUnit;
20 
21 /* For environment variable setup, see SetupCrossCICrtEnvironment in the CRT builder */
22 public class Mqtt5ClientTestFixture extends CrtTestFixture {
23 
24     // MQTT5 Codebuild/Direct connections data
25     static final String AWS_TEST_MQTT5_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_HOST");
26     static final String AWS_TEST_MQTT5_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_PORT");
27     static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST");
28     static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT");
29     static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST");
30     static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT");
31     // MQTT5 Codebuild/Websocket connections data
32     static final String AWS_TEST_MQTT5_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_HOST");
33     static final String AWS_TEST_MQTT5_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_PORT");
34     static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST");
35     static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT");
36     static final String AWS_TEST_MQTT5_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_HOST");
37     static final String AWS_TEST_MQTT5_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_PORT");
38     // MQTT5 Codebuild misc connections data
39     static final String AWS_TEST_MQTT5_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_USERNAME");
40     static final String AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD");
41     static final String AWS_TEST_MQTT5_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT5_CERTIFICATE_FILE");
42     static final String AWS_TEST_MQTT5_KEY_FILE = System.getProperty("AWS_TEST_MQTT5_KEY_FILE");
43     // MQTT5 Proxy
44     static final String AWS_TEST_MQTT5_PROXY_HOST = System.getProperty("AWS_TEST_MQTT5_PROXY_HOST");
45     static final String AWS_TEST_MQTT5_PROXY_PORT = System.getProperty("AWS_TEST_MQTT5_PROXY_PORT");
46     // MQTT5 Endpoint/Host credentials
47     static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST");
48     static final String AWS_TEST_MQTT5_IOT_CORE_REGION = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_REGION");
49     static final String AWS_TEST_MQTT5_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT");
50     static final String AWS_TEST_MQTT5_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY");
51     // MQTT5 Static credential related
52     static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY");
53     static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY");
54     static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN");
55     // MQTT5 Cognito
56     static final String AWS_TEST_MQTT5_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_COGNITO_ENDPOINT");
57     static final String AWS_TEST_MQTT5_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT5_COGNITO_IDENTITY");
58     // MQTT5 Keystore
59     static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT");
60     static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE");
61     static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD");
62     static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS");
63     static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD");
64     // MQTT5 PKCS12
65     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY");
66     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD");
67     // MQTT5 PKCS11
68     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB");
69     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL");
70     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN");
71     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL");
72     static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE");
73     // MQTT5 X509
74     static final String AWS_TEST_MQTT5_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_CERT");
75     static final String AWS_TEST_MQTT5_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_KEY");
76     static final String AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT");
77     static final String AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS");
78     static final String AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME");
79     // MQTT5 Windows Cert Store
80     static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS");
81     static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE");
82 
83     protected int OPERATION_TIMEOUT_TIME = 30;
84 
Mqtt5ClientTestFixture()85     public Mqtt5ClientTestFixture() {
86     }
87 
88     /**
89      * ============================================================
90      * TEST HELPER FUNCTIONS
91      * ============================================================
92      */
93 
94     static final class LifecycleEvents_Futured implements Mqtt5ClientOptions.LifecycleEvents {
95         CompletableFuture<Void> connectedFuture = new CompletableFuture<>();
96         CompletableFuture<Void> stopFuture = new CompletableFuture<>();
97 
98         ConnAckPacket connectSuccessPacket = null;
99         NegotiatedSettings connectSuccessSettings = null;
100 
101         int connectFailureCode = 0;
102         ConnAckPacket connectFailurePacket = null;
103 
104         int disconnectFailureCode = 0;
105         DisconnectPacket disconnectPacket = null;
106 
107         @Override
onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)108         public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {}
109 
110         @Override
onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)111         public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) {
112             ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket();
113             NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings();
114             connectSuccessPacket = connAckData;
115             connectSuccessSettings = negotiatedSettings;
116             connectedFuture.complete(null);
117         }
118 
119         @Override
onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)120         public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
121             connectFailureCode = onConnectionFailureReturn.getErrorCode();
122             connectFailurePacket = onConnectionFailureReturn.getConnAckPacket();
123             connectedFuture.completeExceptionally(new Exception("Could not connect! Error name: " + CRT.awsErrorName(connectFailureCode)));
124         }
125 
126         @Override
onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)127         public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) {
128             disconnectFailureCode = onDisconnectionReturn.getErrorCode();
129             disconnectPacket = onDisconnectionReturn.getDisconnectPacket();
130         }
131 
132         @Override
onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)133         public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
134             stopFuture.complete(null);
135         }
136     }
137 
138     static final class PublishEvents_Futured implements PublishEvents {
139         CompletableFuture<Void> publishReceivedFuture = new CompletableFuture<>();
140         PublishPacket publishPacket = null;
141 
142         @Override
onMessageReceived(Mqtt5Client client, PublishReturn result)143         public void onMessageReceived(Mqtt5Client client, PublishReturn result) {
144             publishPacket = result.getPublishPacket();
145             publishReceivedFuture.complete(null);
146         }
147     }
148 
149     static final class PublishEvents_Futured_Counted implements PublishEvents {
150         static final ScheduledExecutorService TIMEOUT_SCHEDULER = Executors.newScheduledThreadPool(0);
151 
152         // The "main" future which is intended to be used for waiting for all publishes.
153         CompletableFuture<Void> publishReceivedFuture = new CompletableFuture<>();
154         // Additional future which helps with ensuring that no publishes beyond expected were received.
155         CompletableFuture<Void> afterCompletionFuture = new CompletableFuture<>();
156 
157         int currentPublishCount = 0;
158         int desiredPublishCount = 0;
159         int afterCompletionWaitSec = 1;
160         List<PublishPacket> publishPacketsReceived = new ArrayList<PublishPacket>();
161         HashMap<Mqtt5Client, Integer> clientsReceived = new HashMap<Mqtt5Client, Integer>();
162 
163         @Override
onMessageReceived(Mqtt5Client client, PublishReturn result)164         public void onMessageReceived(Mqtt5Client client, PublishReturn result) {
165             currentPublishCount += 1;
166             if (currentPublishCount == desiredPublishCount) {
167                 publishReceivedFuture.complete(null);
168                 // An optional wait to ensure that no packets beyond expectations are arrived.
169                 TIMEOUT_SCHEDULER.schedule(
170                         () -> afterCompletionFuture.complete(null), afterCompletionWaitSec, TimeUnit.SECONDS);
171             } else if (currentPublishCount > desiredPublishCount) {
172                 afterCompletionFuture.completeExceptionally(new Throwable("Too many publish packets received"));
173             }
174 
175             if (publishPacketsReceived.contains(result)) {
176                 publishReceivedFuture.completeExceptionally(new Throwable("Duplicate publish packet received!"));
177                 afterCompletionFuture.completeExceptionally(new Throwable("Duplicate publish packet received!"));
178             }
179             publishPacketsReceived.add(result.getPublishPacket());
180 
181             clientsReceived.merge(client, 1, Integer::sum);
182         }
183     }
184 }
185