1 package software.amazon.awssdk.crt.http; 2 3 import software.amazon.awssdk.crt.CrtResource; 4 import software.amazon.awssdk.crt.CrtRuntimeException; 5 import software.amazon.awssdk.crt.io.ClientBootstrap; 6 import software.amazon.awssdk.crt.io.SocketOptions; 7 import software.amazon.awssdk.crt.io.TlsConnectionOptions; 8 import software.amazon.awssdk.crt.AsyncCallback; 9 import software.amazon.awssdk.crt.io.TlsContext; 10 11 import java.util.concurrent.CompletableFuture; 12 import java.net.URI; 13 import java.nio.charset.Charset; 14 15 /** 16 * Manages a Pool of HTTP/2 Streams. Creates and manages HTTP/2 connections 17 * under the hood. 18 */ 19 public class Http2StreamManager extends CrtResource { 20 21 private static final String HTTP = "http"; 22 private static final String HTTPS = "https"; 23 private static final int DEFAULT_HTTP_PORT = 80; 24 private static final int DEFAULT_HTTPS_PORT = 443; 25 private final static Charset UTF8 = java.nio.charset.StandardCharsets.UTF_8; 26 27 private final URI uri; 28 private final int port; 29 private final int maxConnections; 30 private final int idealConcurrentStreamsPerConnection; 31 private final int maxConcurrentStreamsPerConnection; 32 private final CompletableFuture<Void> shutdownComplete = new CompletableFuture<>(); 33 34 /** 35 * Factory function for Http2StreamManager instances 36 * 37 * @param options configuration options 38 * @return a new instance of an Http2StreamManager 39 */ create(Http2StreamManagerOptions options)40 public static Http2StreamManager create(Http2StreamManagerOptions options) { 41 return new Http2StreamManager(options); 42 } 43 Http2StreamManager(Http2StreamManagerOptions options)44 private Http2StreamManager(Http2StreamManagerOptions options) { 45 options.validateOptions(); 46 47 HttpClientConnectionManagerOptions connectionManagerOptions = options.getConnectionManagerOptions(); 48 URI uri = connectionManagerOptions.getUri(); 49 ClientBootstrap clientBootstrap = connectionManagerOptions.getClientBootstrap(); 50 SocketOptions socketOptions = connectionManagerOptions.getSocketOptions(); 51 boolean useTls = HTTPS.equals(uri.getScheme()); 52 TlsContext tlsContext = connectionManagerOptions.getTlsContext(); 53 TlsConnectionOptions tlsConnectionOptions = connectionManagerOptions.getTlsConnectionOptions(); 54 int maxConnections = connectionManagerOptions.getMaxConnections(); 55 int port = connectionManagerOptions.getPort(); 56 if (port == -1) { 57 port = uri.getPort(); 58 /* Pick a default port based on the scheme if one wasn't set */ 59 if (port == -1) { 60 if (HTTP.equals(uri.getScheme())) { port = DEFAULT_HTTP_PORT; } 61 if (HTTPS.equals(uri.getScheme())) { port = DEFAULT_HTTPS_PORT; } 62 } 63 } 64 65 int maxConcurrentStreamsPerConnection = options.getMaxConcurrentStreamsPerConnection(); 66 int idealConcurrentStreamsPerConnection = options.getIdealConcurrentStreamsPerConnection(); 67 68 this.uri = uri; 69 this.port = port; 70 this.maxConnections = maxConnections; 71 this.idealConcurrentStreamsPerConnection = idealConcurrentStreamsPerConnection; 72 this.maxConcurrentStreamsPerConnection = maxConcurrentStreamsPerConnection; 73 74 int proxyConnectionType = 0; 75 String proxyHost = null; 76 int proxyPort = 0; 77 TlsContext proxyTlsContext = null; 78 int proxyAuthorizationType = 0; 79 String proxyAuthorizationUsername = null; 80 String proxyAuthorizationPassword = null; 81 HttpProxyOptions proxyOptions = connectionManagerOptions.getProxyOptions(); 82 83 if (proxyOptions != null) { 84 proxyConnectionType = proxyOptions.getConnectionType().getValue(); 85 proxyHost = proxyOptions.getHost(); 86 proxyPort = proxyOptions.getPort(); 87 proxyTlsContext = proxyOptions.getTlsContext(); 88 proxyAuthorizationType = proxyOptions.getAuthorizationType().getValue(); 89 proxyAuthorizationUsername = proxyOptions.getAuthorizationUsername(); 90 proxyAuthorizationPassword = proxyOptions.getAuthorizationPassword(); 91 } 92 93 HttpMonitoringOptions monitoringOptions = connectionManagerOptions.getMonitoringOptions(); 94 long monitoringThroughputThresholdInBytesPerSecond = 0; 95 int monitoringFailureIntervalInSeconds = 0; 96 if (monitoringOptions != null) { 97 monitoringThroughputThresholdInBytesPerSecond = monitoringOptions.getMinThroughputBytesPerSecond(); 98 monitoringFailureIntervalInSeconds = monitoringOptions.getAllowableThroughputFailureIntervalSeconds(); 99 } 100 101 acquireNativeHandle(http2StreamManagerNew(this, 102 clientBootstrap.getNativeHandle(), 103 socketOptions.getNativeHandle(), 104 useTls && tlsContext!=null ? tlsContext.getNativeHandle() : 0, 105 useTls && tlsConnectionOptions!=null ? tlsConnectionOptions.getNativeHandle() : 0, 106 Http2ConnectionSetting.marshallSettingsForJNI(options.getInitialSettingsList()), 107 uri.getHost().getBytes(UTF8), 108 port, 109 proxyConnectionType, 110 proxyHost != null ? proxyHost.getBytes(UTF8) : null, 111 proxyPort, 112 proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0, 113 proxyAuthorizationType, 114 proxyAuthorizationUsername != null ? proxyAuthorizationUsername.getBytes(UTF8) : null, 115 proxyAuthorizationPassword != null ? proxyAuthorizationPassword.getBytes(UTF8) : null, 116 connectionManagerOptions.isManualWindowManagement(), 117 monitoringThroughputThresholdInBytesPerSecond, 118 monitoringFailureIntervalInSeconds, 119 maxConnections, 120 idealConcurrentStreamsPerConnection, 121 maxConcurrentStreamsPerConnection, 122 options.hasPriorKnowledge(), 123 options.shouldCloseConnectionOnServerError(), 124 options.getConnectionPingPeriodMs(), 125 options.getConnectionPingTimeoutMs())); 126 127 /* 128 * we don't need to add a reference to socketOptions since it's copied during 129 * connection manager construction 130 */ 131 addReferenceTo(clientBootstrap); 132 if (useTls) { 133 addReferenceTo(tlsContext); 134 } 135 } 136 137 /** 138 * Request a Http2Stream from StreamManager. 139 * 140 * @param request The Request to make to the Server. 141 * @param streamHandler The Stream Handler to be called from the Native 142 * EventLoop 143 * @return A future for a Http2Stream that will be completed when the stream is 144 * acquired. 145 */ acquireStream(Http2Request request, HttpStreamBaseResponseHandler streamHandler)146 public CompletableFuture<Http2Stream> acquireStream(Http2Request request, 147 HttpStreamBaseResponseHandler streamHandler) { 148 149 return this.acquireStream((HttpRequestBase) request, streamHandler); 150 } 151 acquireStream(HttpRequest request, HttpStreamBaseResponseHandler streamHandler)152 public CompletableFuture<Http2Stream> acquireStream(HttpRequest request, 153 HttpStreamBaseResponseHandler streamHandler) { 154 155 return this.acquireStream((HttpRequestBase) request, streamHandler); 156 } 157 acquireStream(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler)158 private CompletableFuture<Http2Stream> acquireStream(HttpRequestBase request, 159 HttpStreamBaseResponseHandler streamHandler) { 160 161 CompletableFuture<Http2Stream> completionFuture = new CompletableFuture<>(); 162 AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null); 163 if (isNull()) { 164 completionFuture.completeExceptionally(new IllegalStateException( 165 "Http2StreamManager has been closed, can't acquire new streams")); 166 return completionFuture; 167 } 168 try { 169 http2StreamManagerAcquireStream(this.getNativeHandle(), 170 request.marshalForJni(), 171 request.getBodyStream(), 172 new HttpStreamResponseHandlerNativeAdapter(streamHandler), 173 acquireStreamCompleted); 174 } catch (CrtRuntimeException ex) { 175 completionFuture.completeExceptionally(ex); 176 } 177 return completionFuture; 178 } 179 180 /** 181 * @return maximum number of connections this connection manager will pool 182 */ getMaxConnections()183 public int getMaxConnections() { 184 return maxConnections; 185 } 186 187 /** 188 * @return concurrency metrics for the current manager 189 */ getManagerMetrics()190 public HttpManagerMetrics getManagerMetrics() { 191 if (isNull()) { 192 throw new IllegalStateException("HttpClientConnectionManager has been closed, can't fetch metrics"); 193 } 194 return http2StreamManagerFetchMetrics(getNativeHandle()); 195 } 196 197 /** 198 * Called from Native when all Streams from this Stream manager have finished 199 * and underlying resources like connections opened under the hood has been 200 * cleaned up 201 * begin releasing Native Resources that Http2StreamManager depends on. 202 */ onShutdownComplete()203 private void onShutdownComplete() { 204 releaseReferences(); 205 206 this.shutdownComplete.complete(null); 207 } 208 209 /** 210 * Determines whether a resource releases its dependencies at the same time the 211 * native handle is released or if it waits. 212 * Resources that wait are responsible for calling releaseReferences() manually. 213 */ 214 @Override canReleaseReferencesImmediately()215 protected boolean canReleaseReferencesImmediately() { 216 return false; 217 } 218 219 /** 220 * Closes this Connection Pool and any pending Connection Acquisitions 221 */ 222 @Override releaseNativeHandle()223 protected void releaseNativeHandle() { 224 if (!isNull()) { 225 /* 226 * Release our Native pointer and schedule tasks on the Native Event Loop to 227 * start sending HTTP/TLS/TCP 228 * connection shutdown messages to peers for any open Connections. 229 */ 230 http2StreamManagerRelease(getNativeHandle()); 231 } 232 } 233 getShutdownCompleteFuture()234 public CompletableFuture<Void> getShutdownCompleteFuture() { 235 return shutdownComplete; 236 } 237 238 /******************************************************************************* 239 * Native methods 240 ******************************************************************************/ 241 http2StreamManagerNew(Http2StreamManager thisObj, long client_bootstrap, long socketOptions, long tlsContext, long tlsConnectionOptions, long[] marshalledSettings, byte[] endpoint, int port, int proxyConnectionType, byte[] proxyHost, int proxyPort, long proxyTlsContext, int proxyAuthorizationType, byte[] proxyAuthorizationUsername, byte[] proxyAuthorizationPassword, boolean isManualWindowManagement, long monitoringThroughputThresholdInBytesPerSecond, int monitoringFailureIntervalInSeconds, int maxConns, int ideal_concurrent_streams_per_connection, int max_concurrent_streams_per_connection, boolean priorKnowledge, boolean closeConnectionOnServerError, int connectionPingPeriodMs, int connectionPingTimeoutMs)242 private static native long http2StreamManagerNew(Http2StreamManager thisObj, 243 long client_bootstrap, 244 long socketOptions, 245 long tlsContext, 246 long tlsConnectionOptions, 247 long[] marshalledSettings, 248 byte[] endpoint, 249 int port, 250 int proxyConnectionType, 251 byte[] proxyHost, 252 int proxyPort, 253 long proxyTlsContext, 254 int proxyAuthorizationType, 255 byte[] proxyAuthorizationUsername, 256 byte[] proxyAuthorizationPassword, 257 boolean isManualWindowManagement, 258 long monitoringThroughputThresholdInBytesPerSecond, 259 int monitoringFailureIntervalInSeconds, 260 int maxConns, 261 int ideal_concurrent_streams_per_connection, 262 int max_concurrent_streams_per_connection, 263 boolean priorKnowledge, 264 boolean closeConnectionOnServerError, 265 int connectionPingPeriodMs, 266 int connectionPingTimeoutMs) throws CrtRuntimeException; 267 http2StreamManagerRelease(long stream_manager)268 private static native void http2StreamManagerRelease(long stream_manager) throws CrtRuntimeException; 269 http2StreamManagerAcquireStream(long stream_manager, byte[] marshalledRequest, HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler, AsyncCallback completedCallback)270 private static native void http2StreamManagerAcquireStream(long stream_manager, 271 byte[] marshalledRequest, 272 HttpRequestBodyStream bodyStream, 273 HttpStreamResponseHandlerNativeAdapter responseHandler, 274 AsyncCallback completedCallback) throws CrtRuntimeException; 275 http2StreamManagerFetchMetrics(long stream_manager)276 private static native HttpManagerMetrics http2StreamManagerFetchMetrics(long stream_manager) throws CrtRuntimeException; 277 } 278