1 package software.amazon.awssdk.crt.test; 2 3 4 import software.amazon.awssdk.crt.*; 5 6 import software.amazon.awssdk.crt.mqtt5.*; 7 8 import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.PublishEvents; 9 import software.amazon.awssdk.crt.mqtt5.packets.*; 10 11 12 import java.util.ArrayList; 13 import java.util.List; 14 import java.util.HashMap; 15 16 import java.util.concurrent.CompletableFuture; 17 import java.util.concurrent.Executors; 18 import java.util.concurrent.ScheduledExecutorService; 19 import java.util.concurrent.TimeUnit; 20 21 /* For environment variable setup, see SetupCrossCICrtEnvironment in the CRT builder */ 22 public class Mqtt5ClientTestFixture extends CrtTestFixture { 23 24 // MQTT5 Codebuild/Direct connections data 25 static final String AWS_TEST_MQTT5_DIRECT_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_HOST"); 26 static final String AWS_TEST_MQTT5_DIRECT_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_PORT"); 27 static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_HOST"); 28 static final String AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_BASIC_AUTH_PORT"); 29 static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_HOST"); 30 static final String AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_DIRECT_MQTT_TLS_PORT"); 31 // MQTT5 Codebuild/Websocket connections data 32 static final String AWS_TEST_MQTT5_WS_MQTT_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_HOST"); 33 static final String AWS_TEST_MQTT5_WS_MQTT_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_PORT"); 34 static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_HOST"); 35 static final String AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_BASIC_AUTH_PORT"); 36 static final String AWS_TEST_MQTT5_WS_MQTT_TLS_HOST = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_HOST"); 37 static final String AWS_TEST_MQTT5_WS_MQTT_TLS_PORT = System.getProperty("AWS_TEST_MQTT5_WS_MQTT_TLS_PORT"); 38 // MQTT5 Codebuild misc connections data 39 static final String AWS_TEST_MQTT5_BASIC_AUTH_USERNAME = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_USERNAME"); 40 static final String AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD = System.getProperty("AWS_TEST_MQTT5_BASIC_AUTH_PASSWORD"); 41 static final String AWS_TEST_MQTT5_CERTIFICATE_FILE = System.getProperty("AWS_TEST_MQTT5_CERTIFICATE_FILE"); 42 static final String AWS_TEST_MQTT5_KEY_FILE = System.getProperty("AWS_TEST_MQTT5_KEY_FILE"); 43 // MQTT5 Proxy 44 static final String AWS_TEST_MQTT5_PROXY_HOST = System.getProperty("AWS_TEST_MQTT5_PROXY_HOST"); 45 static final String AWS_TEST_MQTT5_PROXY_PORT = System.getProperty("AWS_TEST_MQTT5_PROXY_PORT"); 46 // MQTT5 Endpoint/Host credentials 47 static final String AWS_TEST_MQTT5_IOT_CORE_HOST = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_HOST"); 48 static final String AWS_TEST_MQTT5_IOT_CORE_REGION = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_REGION"); 49 static final String AWS_TEST_MQTT5_IOT_CORE_RSA_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT"); 50 static final String AWS_TEST_MQTT5_IOT_CORE_RSA_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY"); 51 // MQTT5 Static credential related 52 static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_ACCESS_KEY"); 53 static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SECRET_ACCESS_KEY"); 54 static final String AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN = System.getProperty("AWS_TEST_MQTT5_ROLE_CREDENTIAL_SESSION_TOKEN"); 55 // MQTT5 Cognito 56 static final String AWS_TEST_MQTT5_COGNITO_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_COGNITO_ENDPOINT"); 57 static final String AWS_TEST_MQTT5_COGNITO_IDENTITY = System.getProperty("AWS_TEST_MQTT5_COGNITO_IDENTITY"); 58 // MQTT5 Keystore 59 static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FORMAT"); 60 static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_FILE"); 61 static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_PASSWORD"); 62 static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_ALIAS"); 63 static final String AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_KEYSTORE_CERT_PASSWORD"); 64 // MQTT5 PKCS12 65 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY"); 66 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_PKCS12_KEY_PASSWORD"); 67 // MQTT5 PKCS11 68 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_LIB = System.getProperty("AWS_TEST_PKCS11_LIB"); 69 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_TOKEN_LABEL = System.getProperty("AWS_TEST_PKCS11_TOKEN_LABEL"); 70 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PIN = System.getProperty("AWS_TEST_PKCS11_PIN"); 71 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_PKEY_LABEL = System.getProperty("AWS_TEST_PKCS11_PKEY_LABEL"); 72 static final String AWS_TEST_MQTT5_IOT_CORE_PKCS11_CERT_FILE = System.getProperty("AWS_TEST_PKCS11_CERT_FILE"); 73 // MQTT5 X509 74 static final String AWS_TEST_MQTT5_IOT_CORE_X509_CERT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_CERT"); 75 static final String AWS_TEST_MQTT5_IOT_CORE_X509_KEY = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_KEY"); 76 static final String AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ENDPOINT"); 77 static final String AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_ROLE_ALIAS"); 78 static final String AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_X509_THING_NAME"); 79 // MQTT5 Windows Cert Store 80 static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_PFX_CERT_NO_PASS"); 81 static final String AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE = System.getProperty("AWS_TEST_MQTT5_IOT_CORE_WINDOWS_CERT_STORE"); 82 83 protected int OPERATION_TIMEOUT_TIME = 30; 84 Mqtt5ClientTestFixture()85 public Mqtt5ClientTestFixture() { 86 } 87 88 /** 89 * ============================================================ 90 * TEST HELPER FUNCTIONS 91 * ============================================================ 92 */ 93 94 static final class LifecycleEvents_Futured implements Mqtt5ClientOptions.LifecycleEvents { 95 CompletableFuture<Void> connectedFuture = new CompletableFuture<>(); 96 CompletableFuture<Void> stopFuture = new CompletableFuture<>(); 97 98 ConnAckPacket connectSuccessPacket = null; 99 NegotiatedSettings connectSuccessSettings = null; 100 101 int connectFailureCode = 0; 102 ConnAckPacket connectFailurePacket = null; 103 104 int disconnectFailureCode = 0; 105 DisconnectPacket disconnectPacket = null; 106 107 @Override onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn)108 public void onAttemptingConnect(Mqtt5Client client, OnAttemptingConnectReturn onAttemptingConnectReturn) {} 109 110 @Override onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn)111 public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn onConnectionSuccessReturn) { 112 ConnAckPacket connAckData = onConnectionSuccessReturn.getConnAckPacket(); 113 NegotiatedSettings negotiatedSettings = onConnectionSuccessReturn.getNegotiatedSettings(); 114 connectSuccessPacket = connAckData; 115 connectSuccessSettings = negotiatedSettings; 116 connectedFuture.complete(null); 117 } 118 119 @Override onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn)120 public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { 121 connectFailureCode = onConnectionFailureReturn.getErrorCode(); 122 connectFailurePacket = onConnectionFailureReturn.getConnAckPacket(); 123 connectedFuture.completeExceptionally(new Exception("Could not connect! Error name: " + CRT.awsErrorName(connectFailureCode))); 124 } 125 126 @Override onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn)127 public void onDisconnection(Mqtt5Client client, OnDisconnectionReturn onDisconnectionReturn) { 128 disconnectFailureCode = onDisconnectionReturn.getErrorCode(); 129 disconnectPacket = onDisconnectionReturn.getDisconnectPacket(); 130 } 131 132 @Override onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn)133 public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) { 134 stopFuture.complete(null); 135 } 136 } 137 138 static final class PublishEvents_Futured implements PublishEvents { 139 CompletableFuture<Void> publishReceivedFuture = new CompletableFuture<>(); 140 PublishPacket publishPacket = null; 141 142 @Override onMessageReceived(Mqtt5Client client, PublishReturn result)143 public void onMessageReceived(Mqtt5Client client, PublishReturn result) { 144 publishPacket = result.getPublishPacket(); 145 publishReceivedFuture.complete(null); 146 } 147 } 148 149 static final class PublishEvents_Futured_Counted implements PublishEvents { 150 static final ScheduledExecutorService TIMEOUT_SCHEDULER = Executors.newScheduledThreadPool(0); 151 152 // The "main" future which is intended to be used for waiting for all publishes. 153 CompletableFuture<Void> publishReceivedFuture = new CompletableFuture<>(); 154 // Additional future which helps with ensuring that no publishes beyond expected were received. 155 CompletableFuture<Void> afterCompletionFuture = new CompletableFuture<>(); 156 157 int currentPublishCount = 0; 158 int desiredPublishCount = 0; 159 int afterCompletionWaitSec = 1; 160 List<PublishPacket> publishPacketsReceived = new ArrayList<PublishPacket>(); 161 HashMap<Mqtt5Client, Integer> clientsReceived = new HashMap<Mqtt5Client, Integer>(); 162 163 @Override onMessageReceived(Mqtt5Client client, PublishReturn result)164 public void onMessageReceived(Mqtt5Client client, PublishReturn result) { 165 currentPublishCount += 1; 166 if (currentPublishCount == desiredPublishCount) { 167 publishReceivedFuture.complete(null); 168 // An optional wait to ensure that no packets beyond expectations are arrived. 169 TIMEOUT_SCHEDULER.schedule( 170 () -> afterCompletionFuture.complete(null), afterCompletionWaitSec, TimeUnit.SECONDS); 171 } else if (currentPublishCount > desiredPublishCount) { 172 afterCompletionFuture.completeExceptionally(new Throwable("Too many publish packets received")); 173 } 174 175 if (publishPacketsReceived.contains(result)) { 176 publishReceivedFuture.completeExceptionally(new Throwable("Duplicate publish packet received!")); 177 afterCompletionFuture.completeExceptionally(new Throwable("Duplicate publish packet received!")); 178 } 179 publishPacketsReceived.add(result.getPublishPacket()); 180 181 clientsReceived.merge(client, 1, Integer::sum); 182 } 183 } 184 } 185