1 // Copyright 2015 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 package org.chromium.net.impl; 6 7 import android.content.Context; 8 import android.net.ConnectivityManager; 9 import android.net.Network; 10 import android.net.TrafficStats; 11 import android.os.Build; 12 import android.util.Log; 13 14 import androidx.annotation.Nullable; 15 import androidx.annotation.RequiresApi; 16 import androidx.annotation.VisibleForTesting; 17 18 import org.chromium.net.CronetException; 19 import org.chromium.net.InlineExecutionProhibitedException; 20 import org.chromium.net.NetworkException; 21 import org.chromium.net.ThreadStatsUid; 22 import org.chromium.net.UploadDataProvider; 23 import org.chromium.net.UrlResponseInfo; 24 import org.chromium.net.impl.CronetLogger.CronetTrafficInfo; 25 import org.chromium.net.impl.JavaUrlRequestUtils.CheckedRunnable; 26 import org.chromium.net.impl.JavaUrlRequestUtils.DirectPreventingExecutor; 27 import org.chromium.net.impl.JavaUrlRequestUtils.State; 28 29 import java.io.IOException; 30 import java.io.InputStream; 31 import java.io.OutputStream; 32 import java.net.HttpURLConnection; 33 import java.net.URI; 34 import java.net.URL; 35 import java.nio.ByteBuffer; 36 import java.nio.channels.Channels; 37 import java.nio.channels.ReadableByteChannel; 38 import java.nio.channels.WritableByteChannel; 39 import java.time.Duration; 40 import java.util.AbstractMap.SimpleEntry; 41 import java.util.ArrayDeque; 42 import java.util.ArrayList; 43 import java.util.Collections; 44 import java.util.List; 45 import java.util.Map; 46 import java.util.Objects; 47 import java.util.TreeMap; 48 import java.util.concurrent.Executor; 49 import java.util.concurrent.RejectedExecutionException; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 import java.util.concurrent.atomic.AtomicInteger; 52 53 import javax.annotation.concurrent.GuardedBy; 54 55 /** Pure java UrlRequest, backed by {@link HttpURLConnection}. */ 56 final class JavaUrlRequest extends UrlRequestBase { 57 private static final String X_ANDROID = "X-Android"; 58 private static final String X_ANDROID_SELECTED_TRANSPORT = "X-Android-Selected-Transport"; 59 private static final String TAG = JavaUrlRequest.class.getSimpleName(); 60 private static final int DEFAULT_CHUNK_LENGTH = 61 JavaUploadDataSinkBase.DEFAULT_UPLOAD_BUFFER_SIZE; 62 private static final String USER_AGENT = "User-Agent"; 63 private final AsyncUrlRequestCallback mCallbackAsync; 64 private final Executor mExecutor; 65 private final String mUserAgent; 66 private final Map<String, String> mRequestHeaders = 67 new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 68 private final List<String> mUrlChain = new ArrayList<>(); 69 70 /** 71 * This is the source of thread safety in this class - no other synchronization is performed. 72 * By compare-and-swapping from one state to another, we guarantee that operations aren't 73 * running concurrently. Only the winner of a CAS proceeds. 74 * 75 * <p>A caller can lose a CAS for three reasons - user error (two calls to read() without 76 * waiting for the read to succeed), runtime error (network code or user code throws an 77 * exception), or cancellation. 78 */ 79 private final AtomicInteger /* State */ mState = new AtomicInteger(State.NOT_STARTED); 80 81 private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false); 82 83 private final boolean mAllowDirectExecutor; 84 85 /* These don't change with redirects */ 86 private String mInitialMethod; 87 private VersionSafeCallbacks.UploadDataProviderWrapper mUploadDataProvider; 88 private Executor mUploadExecutor; 89 90 /** 91 * Holds a subset of StatusValues - {@link State#STARTED} can represent 92 * {@link Status#SENDING_REQUEST} or {@link Status#WAITING_FOR_RESPONSE}. While the distinction 93 * isn't needed to implement the logic in this class, it is needed to implement 94 * {@link #getStatus(StatusListener)}. 95 * 96 * <p>Concurrency notes - this value is not atomically updated with mState, so there is some 97 * risk that we'd get an inconsistent snapshot of both - however, it also happens that this 98 * value is only used with the STARTED state, so it's inconsequential. 99 */ 100 @StatusValues private volatile int mAdditionalStatusDetails = Status.INVALID; 101 102 /* These change with redirects. */ 103 private String mCurrentUrl; 104 @Nullable private ReadableByteChannel mResponseChannel; // Only accessed on mExecutor. 105 private UrlResponseInfoImpl mUrlResponseInfo; 106 private String mPendingRedirectUrl; 107 private HttpURLConnection mCurrentUrlConnection; // Only accessed on mExecutor. 108 private OutputStreamDataSink mOutputStreamDataSink; // Only accessed on mExecutor. 109 private final JavaCronetEngine mEngine; 110 private final int mCronetEngineId; 111 private final CronetLogger mLogger; 112 113 private final long mNetworkHandle; 114 115 // Executor that runs one task at a time on an underlying Executor. 116 // NOTE: Do not use to wrap user supplied Executor as lock is held while underlying execute() 117 // is called. 118 private static final class SerializingExecutor implements Executor { 119 private final Executor mUnderlyingExecutor; 120 private final Runnable mRunTasks = this::runTasks; 121 122 // Queue of tasks to run. Tasks are added to the end and taken from the front. 123 // Synchronized on itself. 124 @GuardedBy("mTaskQueue") 125 private final ArrayDeque<Runnable> mTaskQueue = new ArrayDeque<>(); 126 127 // Indicates if mRunTasks is actively running tasks. Synchronized on mTaskQueue. 128 @GuardedBy("mTaskQueue") 129 private boolean mRunning; 130 SerializingExecutor(Executor underlyingExecutor)131 SerializingExecutor(Executor underlyingExecutor) { 132 mUnderlyingExecutor = underlyingExecutor; 133 } 134 135 @Override execute(Runnable command)136 public void execute(Runnable command) { 137 synchronized (mTaskQueue) { 138 mTaskQueue.addLast(command); 139 try { 140 mUnderlyingExecutor.execute(mRunTasks); 141 } catch (RejectedExecutionException e) { 142 // If shutting down, do not add new tasks to the queue. 143 mTaskQueue.removeLast(); 144 } 145 } 146 } 147 runTasks()148 private void runTasks() { 149 Runnable task; 150 synchronized (mTaskQueue) { 151 if (mRunning) { 152 return; 153 } 154 task = mTaskQueue.pollFirst(); 155 mRunning = task != null; 156 } 157 while (task != null) { 158 boolean threw = true; 159 try { 160 task.run(); 161 threw = false; 162 } finally { 163 synchronized (mTaskQueue) { 164 if (threw) { 165 // If task.run() threw, this method will abort without 166 // looping again, so repost to keep running tasks. 167 mRunning = false; 168 try { 169 mUnderlyingExecutor.execute(mRunTasks); 170 } catch (RejectedExecutionException e) { 171 // Give up if a task run at shutdown throws. 172 } 173 } else { 174 task = mTaskQueue.pollFirst(); 175 mRunning = task != null; 176 } 177 } 178 } 179 } 180 } 181 } 182 183 /** 184 * @param executor The executor used for reading and writing from sockets 185 * @param userExecutor The executor used to dispatch to {@code callback} 186 */ JavaUrlRequest( JavaCronetEngine engine, Callback callback, final Executor executor, Executor userExecutor, String url, String userAgent, boolean allowDirectExecutor, boolean trafficStatsTagSet, int trafficStatsTag, final boolean trafficStatsUidSet, final int trafficStatsUid, long networkHandle)187 JavaUrlRequest( 188 JavaCronetEngine engine, 189 Callback callback, 190 final Executor executor, 191 Executor userExecutor, 192 String url, 193 String userAgent, 194 boolean allowDirectExecutor, 195 boolean trafficStatsTagSet, 196 int trafficStatsTag, 197 final boolean trafficStatsUidSet, 198 final int trafficStatsUid, 199 long networkHandle) { 200 Objects.requireNonNull(url, "URL is required"); 201 Objects.requireNonNull(callback, "Listener is required"); 202 Objects.requireNonNull(executor, "Executor is required"); 203 Objects.requireNonNull(userExecutor, "userExecutor is required"); 204 205 mAllowDirectExecutor = allowDirectExecutor; 206 mCallbackAsync = new AsyncUrlRequestCallback(callback, userExecutor); 207 final int trafficStatsTagToUse = 208 trafficStatsTagSet ? trafficStatsTag : TrafficStats.getThreadStatsTag(); 209 mExecutor = 210 new SerializingExecutor( 211 (command) -> { 212 executor.execute( 213 () -> { 214 int oldTag = TrafficStats.getThreadStatsTag(); 215 TrafficStats.setThreadStatsTag(trafficStatsTagToUse); 216 if (trafficStatsUidSet) { 217 ThreadStatsUid.set(trafficStatsUid); 218 } 219 try { 220 command.run(); 221 } finally { 222 if (trafficStatsUidSet) { 223 ThreadStatsUid.clear(); 224 } 225 TrafficStats.setThreadStatsTag(oldTag); 226 } 227 }); 228 }); 229 mEngine = engine; 230 mCronetEngineId = engine.getCronetEngineId(); 231 mLogger = engine.getCronetLogger(); 232 mCurrentUrl = url; 233 mUserAgent = userAgent; 234 mNetworkHandle = networkHandle; 235 } 236 237 @Override setHttpMethod(String method)238 public void setHttpMethod(String method) { 239 checkNotStarted(); 240 if (method == null) { 241 throw new NullPointerException("Method is required."); 242 } 243 if ("OPTIONS".equalsIgnoreCase(method) 244 || "GET".equalsIgnoreCase(method) 245 || "HEAD".equalsIgnoreCase(method) 246 || "POST".equalsIgnoreCase(method) 247 || "PUT".equalsIgnoreCase(method) 248 || "DELETE".equalsIgnoreCase(method) 249 || "TRACE".equalsIgnoreCase(method) 250 || "PATCH".equalsIgnoreCase(method)) { 251 mInitialMethod = method; 252 } else { 253 throw new IllegalArgumentException("Invalid http method " + method); 254 } 255 } 256 checkNotStarted()257 private void checkNotStarted() { 258 @State int state = mState.get(); 259 if (state != State.NOT_STARTED) { 260 throw new IllegalStateException("Request is already started. State is: " + state); 261 } 262 } 263 264 @Override addHeader(String header, String value)265 public void addHeader(String header, String value) { 266 checkNotStarted(); 267 if (!isValidHeaderName(header) || value.contains("\r\n")) { 268 throw new IllegalArgumentException("Invalid header with headername: " + header); 269 } 270 if (mRequestHeaders.containsKey(header)) { 271 mRequestHeaders.remove(header); 272 } 273 mRequestHeaders.put(header, value); 274 } 275 isValidHeaderName(String header)276 private boolean isValidHeaderName(String header) { 277 for (int i = 0; i < header.length(); i++) { 278 char c = header.charAt(i); 279 switch (c) { 280 case '(': 281 case ')': 282 case '<': 283 case '>': 284 case '@': 285 case ',': 286 case ';': 287 case ':': 288 case '\\': 289 case '\'': 290 case '/': 291 case '[': 292 case ']': 293 case '?': 294 case '=': 295 case '{': 296 case '}': 297 return false; 298 default: 299 if (Character.isISOControl(c) || Character.isWhitespace(c)) { 300 return false; 301 } 302 } 303 } 304 return true; 305 } 306 307 @Override setUploadDataProvider(UploadDataProvider uploadDataProvider, Executor executor)308 public void setUploadDataProvider(UploadDataProvider uploadDataProvider, Executor executor) { 309 if (uploadDataProvider == null) { 310 throw new NullPointerException("Invalid UploadDataProvider."); 311 } 312 if (!mRequestHeaders.containsKey("Content-Type")) { 313 throw new IllegalArgumentException( 314 "Requests with upload data must have a Content-Type."); 315 } 316 checkNotStarted(); 317 if (mInitialMethod == null) { 318 mInitialMethod = "POST"; 319 } 320 this.mUploadDataProvider = 321 new VersionSafeCallbacks.UploadDataProviderWrapper(uploadDataProvider); 322 if (mAllowDirectExecutor) { 323 this.mUploadExecutor = executor; 324 } else { 325 this.mUploadExecutor = new DirectPreventingExecutor(executor); 326 } 327 } 328 329 private final class OutputStreamDataSink extends JavaUploadDataSinkBase { 330 private final HttpURLConnection mUrlConnection; 331 private final AtomicBoolean mOutputChannelClosed = new AtomicBoolean(false); 332 private WritableByteChannel mOutputChannel; 333 private OutputStream mUrlConnectionOutputStream; 334 OutputStreamDataSink( final Executor userExecutor, Executor executor, HttpURLConnection urlConnection, VersionSafeCallbacks.UploadDataProviderWrapper provider)335 OutputStreamDataSink( 336 final Executor userExecutor, 337 Executor executor, 338 HttpURLConnection urlConnection, 339 VersionSafeCallbacks.UploadDataProviderWrapper provider) { 340 super(userExecutor, executor, provider); 341 mUrlConnection = urlConnection; 342 } 343 344 @Override initializeRead()345 protected void initializeRead() throws IOException { 346 if (mOutputChannel == null) { 347 mAdditionalStatusDetails = Status.CONNECTING; 348 mUrlConnection.setDoOutput(true); 349 mUrlConnection.connect(); 350 mAdditionalStatusDetails = Status.SENDING_REQUEST; 351 mUrlConnectionOutputStream = mUrlConnection.getOutputStream(); 352 mOutputChannel = Channels.newChannel(mUrlConnectionOutputStream); 353 } 354 } 355 closeOutputChannel()356 void closeOutputChannel() throws IOException { 357 if (mOutputChannel != null 358 && mOutputChannelClosed.compareAndSet( 359 /* expected= */ false, /* updated= */ true)) { 360 mOutputChannel.close(); 361 } 362 } 363 364 @Override finish()365 protected void finish() throws IOException { 366 closeOutputChannel(); 367 fireGetHeaders(); 368 } 369 370 @Override initializeStart(long totalBytes)371 protected void initializeStart(long totalBytes) { 372 if (totalBytes > 0) { 373 mUrlConnection.setFixedLengthStreamingMode(totalBytes); 374 } else { 375 mUrlConnection.setChunkedStreamingMode(DEFAULT_CHUNK_LENGTH); 376 } 377 } 378 379 @Override processSuccessfulRead(ByteBuffer buffer)380 protected int processSuccessfulRead(ByteBuffer buffer) throws IOException { 381 int totalBytesProcessed = 0; 382 while (buffer.hasRemaining()) { 383 totalBytesProcessed += mOutputChannel.write(buffer); 384 } 385 // Forces a chunk to be sent, rather than buffering to the DEFAULT_CHUNK_LENGTH. 386 // This allows clients to trickle-upload bytes as they become available without 387 // introducing latency due to buffering. 388 mUrlConnectionOutputStream.flush(); 389 return totalBytesProcessed; 390 } 391 392 @Override getErrorSettingRunnable(CheckedRunnable runnable)393 protected Runnable getErrorSettingRunnable(CheckedRunnable runnable) { 394 return errorSetting(runnable); 395 } 396 397 @Override getUploadErrorSettingRunnable(CheckedRunnable runnable)398 protected Runnable getUploadErrorSettingRunnable(CheckedRunnable runnable) { 399 return uploadErrorSetting(runnable); 400 } 401 402 @Override processUploadError(Throwable exception)403 protected void processUploadError(Throwable exception) { 404 enterUploadErrorState(exception); 405 } 406 } 407 408 @Override start()409 public void start() { 410 mAdditionalStatusDetails = Status.CONNECTING; 411 mEngine.incrementActiveRequestCount(); 412 transitionStates( 413 State.NOT_STARTED, 414 State.STARTED, 415 () -> { 416 mUrlChain.add(mCurrentUrl); 417 fireOpenConnection(); 418 }); 419 } 420 enterErrorState(final CronetException error)421 private void enterErrorState(final CronetException error) { 422 if (setTerminalState(State.ERROR)) { 423 fireDisconnect(); 424 fireCloseUploadDataProvider(); 425 mCallbackAsync.onFailed(mUrlResponseInfo, error); 426 } 427 } 428 setTerminalState(@tate int error)429 private boolean setTerminalState(@State int error) { 430 while (true) { 431 @State int oldState = mState.get(); 432 switch (oldState) { 433 case State.NOT_STARTED: 434 throw new IllegalStateException("Can't enter error state before start"); 435 case State.ERROR: // fallthrough 436 case State.COMPLETE: // fallthrough 437 case State.CANCELLED: 438 return false; // Already in a terminal state 439 default: 440 if (mState.compareAndSet(/* expected= */ oldState, /* updated= */ error)) { 441 return true; 442 } 443 } 444 } 445 } 446 447 /** Ends the request with an error, caused by an exception thrown from user code. */ enterUserErrorState(final Throwable error)448 private void enterUserErrorState(final Throwable error) { 449 enterErrorState( 450 new CallbackExceptionImpl("Exception received from UrlRequest.Callback", error)); 451 } 452 453 /** Ends the request with an error, caused by an exception thrown from user code. */ enterUploadErrorState(final Throwable error)454 private void enterUploadErrorState(final Throwable error) { 455 enterErrorState( 456 new CallbackExceptionImpl("Exception received from UploadDataProvider", error)); 457 } 458 enterCronetErrorState(final Throwable error)459 private void enterCronetErrorState(final Throwable error) { 460 // TODO(clm) mapping from Java exception (UnknownHostException, for example) to net error 461 // code goes here. 462 enterErrorState(new CronetExceptionImpl("System error", error)); 463 } 464 465 /** 466 * Atomically swaps from the expected state to a new state. If the swap fails, and it's not 467 * due to an earlier error or cancellation, throws an exception. 468 * 469 * @param afterTransition Callback to run after transition completes successfully. 470 */ transitionStates( @tate int expected, @State int newState, Runnable afterTransition)471 private void transitionStates( 472 @State int expected, @State int newState, Runnable afterTransition) { 473 if (!mState.compareAndSet(expected, newState)) { 474 @State int state = mState.get(); 475 if (!(state == State.CANCELLED || state == State.ERROR)) { 476 throw new IllegalStateException( 477 "Invalid state transition - expected " + expected + " but was " + state); 478 } 479 } else { 480 afterTransition.run(); 481 } 482 } 483 484 @Override followRedirect()485 public void followRedirect() { 486 transitionStates( 487 State.AWAITING_FOLLOW_REDIRECT, 488 State.STARTED, 489 new Runnable() { 490 @Override 491 public void run() { 492 mCurrentUrl = mPendingRedirectUrl; 493 mPendingRedirectUrl = null; 494 fireOpenConnection(); 495 } 496 }); 497 } 498 fireGetHeaders()499 private void fireGetHeaders() { 500 mAdditionalStatusDetails = Status.WAITING_FOR_RESPONSE; 501 mExecutor.execute( 502 errorSetting( 503 () -> { 504 if (mCurrentUrlConnection == null) { 505 return; // We've been cancelled 506 } 507 final List<Map.Entry<String, String>> headerList = new ArrayList<>(); 508 String selectedTransport = "http/1.1"; 509 String headerKey; 510 for (int i = 0; 511 (headerKey = mCurrentUrlConnection.getHeaderFieldKey(i)) 512 != null; 513 i++) { 514 if (X_ANDROID_SELECTED_TRANSPORT.equalsIgnoreCase(headerKey)) { 515 selectedTransport = mCurrentUrlConnection.getHeaderField(i); 516 } 517 if (!headerKey.startsWith(X_ANDROID)) { 518 headerList.add( 519 new SimpleEntry<>( 520 headerKey, 521 mCurrentUrlConnection.getHeaderField(i))); 522 } 523 } 524 525 int responseCode = mCurrentUrlConnection.getResponseCode(); 526 // Important to copy the list here, because although we never 527 // concurrently modify the list ourselves, user code might iterate 528 // over it while we're redirecting, and that would throw 529 // ConcurrentModificationException. 530 mUrlResponseInfo = 531 new UrlResponseInfoImpl( 532 new ArrayList<>(mUrlChain), 533 responseCode, 534 mCurrentUrlConnection.getResponseMessage(), 535 Collections.unmodifiableList(headerList), 536 false, 537 selectedTransport, 538 "", 539 0); 540 // TODO(clm) actual redirect handling? post -> get and whatnot? 541 if (responseCode >= 300 && responseCode < 400) { 542 List<String> locationFields = 543 mUrlResponseInfo.getAllHeaders().get("location"); 544 if (locationFields != null) { 545 fireRedirectReceived(locationFields.get(0)); 546 return; 547 } 548 } 549 fireCloseUploadDataProvider(); 550 if (responseCode >= 400) { 551 InputStream inputStream = mCurrentUrlConnection.getErrorStream(); 552 mResponseChannel = 553 inputStream == null 554 ? null 555 : InputStreamChannel.wrap(inputStream); 556 mCallbackAsync.onResponseStarted(mUrlResponseInfo); 557 } else { 558 mResponseChannel = 559 InputStreamChannel.wrap( 560 mCurrentUrlConnection.getInputStream()); 561 mCallbackAsync.onResponseStarted(mUrlResponseInfo); 562 } 563 })); 564 } 565 fireCloseUploadDataProvider()566 private void fireCloseUploadDataProvider() { 567 if (mUploadDataProvider != null 568 && mUploadProviderClosed.compareAndSet( 569 /* expected= */ false, /* updated= */ true)) { 570 try { 571 mUploadExecutor.execute(uploadErrorSetting(mUploadDataProvider::close)); 572 } catch (RejectedExecutionException e) { 573 Log.e(TAG, "Exception when closing uploadDataProvider", e); 574 } 575 } 576 } 577 fireRedirectReceived(final String locationField)578 private void fireRedirectReceived(final String locationField) { 579 transitionStates( 580 State.STARTED, 581 State.REDIRECT_RECEIVED, 582 () -> { 583 mPendingRedirectUrl = URI.create(mCurrentUrl).resolve(locationField).toString(); 584 mUrlChain.add(mPendingRedirectUrl); 585 transitionStates( 586 State.REDIRECT_RECEIVED, 587 State.AWAITING_FOLLOW_REDIRECT, 588 () -> { 589 mCallbackAsync.onRedirectReceived( 590 mUrlResponseInfo, mPendingRedirectUrl); 591 }); 592 }); 593 } 594 fireOpenConnection()595 private void fireOpenConnection() { 596 mExecutor.execute( 597 errorSetting( 598 () -> { 599 // If we're cancelled, then our old connection will be disconnected 600 // for us and we shouldn't open a new one. 601 if (mState.get() == State.CANCELLED) { 602 return; 603 } 604 605 final URL url = new URL(mCurrentUrl); 606 if (mCurrentUrlConnection != null) { 607 mCurrentUrlConnection.disconnect(); 608 mCurrentUrlConnection = null; 609 } 610 611 if (mNetworkHandle == CronetEngineBase.DEFAULT_NETWORK_HANDLE 612 || Build.VERSION.SDK_INT < Build.VERSION_CODES.M) { 613 mCurrentUrlConnection = (HttpURLConnection) url.openConnection(); 614 } else { 615 Network network = getNetworkFromHandle(mNetworkHandle); 616 if (network == null) { 617 throw new NetworkExceptionImpl( 618 "Network bound to request not found", 619 NetworkException.ERROR_ADDRESS_UNREACHABLE, 620 -4 /*Invalid argument*/); 621 } 622 mCurrentUrlConnection = 623 (HttpURLConnection) network.openConnection(url); 624 } 625 mCurrentUrlConnection.setInstanceFollowRedirects(false); 626 if (!mRequestHeaders.containsKey(USER_AGENT)) { 627 mRequestHeaders.put(USER_AGENT, mUserAgent); 628 } 629 for (Map.Entry<String, String> entry : mRequestHeaders.entrySet()) { 630 mCurrentUrlConnection.setRequestProperty( 631 entry.getKey(), entry.getValue()); 632 } 633 if (mInitialMethod == null) { 634 mInitialMethod = "GET"; 635 } 636 mCurrentUrlConnection.setRequestMethod(mInitialMethod); 637 if (mUploadDataProvider != null) { 638 mOutputStreamDataSink = 639 new OutputStreamDataSink( 640 mUploadExecutor, 641 mExecutor, 642 mCurrentUrlConnection, 643 mUploadDataProvider); 644 mOutputStreamDataSink.start(mUrlChain.size() == 1); 645 } else { 646 mAdditionalStatusDetails = Status.CONNECTING; 647 mCurrentUrlConnection.connect(); 648 fireGetHeaders(); 649 } 650 })); 651 } 652 errorSetting(final CheckedRunnable delegate)653 private Runnable errorSetting(final CheckedRunnable delegate) { 654 return () -> { 655 try { 656 delegate.run(); 657 } catch (Throwable t) { 658 enterCronetErrorState(t); 659 } 660 }; 661 } 662 663 private Runnable userErrorSetting(final CheckedRunnable delegate) { 664 return () -> { 665 try { 666 delegate.run(); 667 } catch (Throwable t) { 668 enterUserErrorState(t); 669 } 670 }; 671 } 672 673 private Runnable uploadErrorSetting(final CheckedRunnable delegate) { 674 return () -> { 675 try { 676 delegate.run(); 677 } catch (Throwable t) { 678 enterUploadErrorState(t); 679 } 680 }; 681 } 682 683 @Override 684 public void read(final ByteBuffer buffer) { 685 Preconditions.checkDirect(buffer); 686 Preconditions.checkHasRemaining(buffer); 687 CheckedRunnable doRead = 688 () -> { 689 int read = mResponseChannel == null ? -1 : mResponseChannel.read(buffer); 690 processReadResult(read, buffer); 691 }; 692 transitionStates( 693 State.AWAITING_READ, 694 State.READING, 695 () -> { 696 mExecutor.execute(errorSetting(doRead)); 697 }); 698 } 699 700 private void processReadResult(int read, final ByteBuffer buffer) throws IOException { 701 if (read != -1) { 702 mCallbackAsync.onReadCompleted(mUrlResponseInfo, buffer); 703 } else { 704 if (mResponseChannel != null) { 705 mResponseChannel.close(); 706 } 707 if (mState.compareAndSet( 708 /* expected= */ State.READING, /* updated= */ State.COMPLETE)) { 709 fireDisconnect(); 710 mCallbackAsync.onSucceeded(mUrlResponseInfo); 711 } 712 } 713 } 714 715 private void fireDisconnect() { 716 mExecutor.execute( 717 () -> { 718 if (mOutputStreamDataSink != null) { 719 try { 720 mOutputStreamDataSink.closeOutputChannel(); 721 } catch (IOException e) { 722 Log.e(TAG, "Exception when closing OutputChannel", e); 723 } 724 } 725 if (mCurrentUrlConnection != null) { 726 mCurrentUrlConnection.disconnect(); 727 mCurrentUrlConnection = null; 728 } 729 }); 730 } 731 732 @Override 733 public void cancel() { 734 @State int oldState = mState.getAndSet(State.CANCELLED); 735 switch (oldState) { 736 // We've just scheduled some user code to run. When they perform their next 737 // operation, they'll observe it and fail. However, if user code is cancelling in 738 // response to one of these callbacks, we'll never actually cancel! 739 // TODO(clm) figure out if it's possible to avoid concurrency in user callbacks. 740 case State.REDIRECT_RECEIVED: 741 case State.AWAITING_FOLLOW_REDIRECT: 742 case State.AWAITING_READ: 743 744 // User code is waiting on us - cancel away! 745 case State.STARTED: 746 case State.READING: 747 fireDisconnect(); 748 fireCloseUploadDataProvider(); 749 mCallbackAsync.onCanceled(mUrlResponseInfo); 750 break; 751 // The rest are all termination cases - we're too late to cancel. 752 case State.ERROR: 753 case State.COMPLETE: 754 case State.CANCELLED: 755 break; 756 default: 757 break; 758 } 759 } 760 761 @Override 762 public boolean isDone() { 763 @State int state = mState.get(); 764 return state == State.COMPLETE || state == State.ERROR || state == State.CANCELLED; 765 } 766 767 /** 768 * Estimates the byte size of the headers in their on-wire format. 769 * We are not really interested in their specific size but something which is close enough. 770 */ 771 @VisibleForTesting 772 static long estimateHeadersSizeInBytesList(Map<String, List<String>> headers) { 773 if (headers == null) return 0; 774 775 long responseHeaderSizeInBytes = 0; 776 for (Map.Entry<String, List<String>> entry : headers.entrySet()) { 777 String key = entry.getKey(); 778 if (key != null) responseHeaderSizeInBytes += key.length(); 779 if (entry.getValue() == null) continue; 780 781 for (String content : entry.getValue()) { 782 if (content != null) responseHeaderSizeInBytes += content.length(); 783 } 784 } 785 return responseHeaderSizeInBytes; 786 } 787 788 /** 789 * Estimates the byte size of the headers in their on-wire format. 790 * We are not really interested in their specific size but something which is close enough. 791 */ 792 @VisibleForTesting 793 static long estimateHeadersSizeInBytes(Map<String, String> headers) { 794 if (headers == null) return 0; 795 long responseHeaderSizeInBytes = 0; 796 for (Map.Entry<String, String> entry : headers.entrySet()) { 797 String key = entry.getKey(); 798 if (key != null) responseHeaderSizeInBytes += key.length(); 799 String value = entry.getValue(); 800 if (value != null) responseHeaderSizeInBytes += value.length(); 801 } 802 return responseHeaderSizeInBytes; 803 } 804 805 private static long parseContentLengthString(String contentLength) { 806 try { 807 return Long.parseLong(contentLength); 808 } catch (NumberFormatException e) { 809 return 0; 810 } 811 } 812 813 @Override 814 public void getStatus(StatusListener listener) { 815 @State int state = mState.get(); 816 int extraStatus = this.mAdditionalStatusDetails; 817 818 @StatusValues final int status; 819 switch (state) { 820 case State.ERROR: 821 case State.COMPLETE: 822 case State.CANCELLED: 823 case State.NOT_STARTED: 824 status = Status.INVALID; 825 break; 826 case State.STARTED: 827 status = extraStatus; 828 break; 829 case State.REDIRECT_RECEIVED: 830 case State.AWAITING_FOLLOW_REDIRECT: 831 case State.AWAITING_READ: 832 status = Status.IDLE; 833 break; 834 case State.READING: 835 status = Status.READING_RESPONSE; 836 break; 837 default: 838 throw new IllegalStateException("Switch is exhaustive: " + state); 839 } 840 841 mCallbackAsync.sendStatus( 842 new VersionSafeCallbacks.UrlRequestStatusListener(listener), status); 843 } 844 845 /** This wrapper ensures that callbacks are always called on the correct executor */ 846 private final class AsyncUrlRequestCallback { 847 final VersionSafeCallbacks.UrlRequestCallback mCallback; 848 final Executor mUserExecutor; 849 final Executor mFallbackExecutor; 850 851 AsyncUrlRequestCallback(Callback callback, final Executor userExecutor) { 852 this.mCallback = new VersionSafeCallbacks.UrlRequestCallback(callback); 853 if (mAllowDirectExecutor) { 854 this.mUserExecutor = userExecutor; 855 this.mFallbackExecutor = null; 856 } else { 857 mUserExecutor = new DirectPreventingExecutor(userExecutor); 858 mFallbackExecutor = userExecutor; 859 } 860 } 861 862 void sendStatus( 863 final VersionSafeCallbacks.UrlRequestStatusListener listener, final int status) { 864 mUserExecutor.execute( 865 () -> { 866 listener.onStatus(status); 867 }); 868 } 869 870 void execute(CheckedRunnable runnable) { 871 try { 872 mUserExecutor.execute(userErrorSetting(runnable)); 873 } catch (RejectedExecutionException e) { 874 enterErrorState(new CronetExceptionImpl("Exception posting task to executor", e)); 875 } 876 } 877 878 void onRedirectReceived(final UrlResponseInfo info, final String newLocationUrl) { 879 execute( 880 () -> { 881 mCallback.onRedirectReceived(JavaUrlRequest.this, info, newLocationUrl); 882 }); 883 } 884 885 void onResponseStarted(UrlResponseInfo info) { 886 execute( 887 () -> { 888 if (mState.compareAndSet( 889 /* expected= */ State.STARTED, 890 /* updated= */ State.AWAITING_READ)) { 891 mCallback.onResponseStarted(JavaUrlRequest.this, mUrlResponseInfo); 892 } 893 }); 894 } 895 896 void onReadCompleted(final UrlResponseInfo info, final ByteBuffer byteBuffer) { 897 execute( 898 () -> { 899 if (mState.compareAndSet( 900 /* expected= */ State.READING, 901 /* updated= */ State.AWAITING_READ)) { 902 mCallback.onReadCompleted(JavaUrlRequest.this, info, byteBuffer); 903 } 904 }); 905 } 906 907 /** 908 * Builds the {@link CronetTrafficInfo} associated to this request internal state. 909 * This helper methods makes strong assumptions about the state of the request. For this 910 * reason it should only be called within {@link JavaUrlRequest#maybeReportMetrics} where 911 * these assumptions are guaranteed to be true. 912 * @return the {@link CronetTrafficInfo} associated to this request internal state 913 */ 914 @RequiresApi(Build.VERSION_CODES.O) 915 private CronetTrafficInfo buildCronetTrafficInfo() { 916 assert mRequestHeaders != null; 917 918 // Most of the CronetTrafficInfo fields have similar names/semantics. To avoid bugs due 919 // to typos everything is final, this means that things have to initialized through an 920 // if/else. 921 final Map<String, List<String>> responseHeaders; 922 final String negotiatedProtocol; 923 final int httpStatusCode; 924 final boolean wasCached; 925 if (mUrlResponseInfo != null) { 926 responseHeaders = mUrlResponseInfo.getAllHeaders(); 927 negotiatedProtocol = mUrlResponseInfo.getNegotiatedProtocol(); 928 httpStatusCode = mUrlResponseInfo.getHttpStatusCode(); 929 wasCached = mUrlResponseInfo.wasCached(); 930 } else { 931 responseHeaders = Collections.emptyMap(); 932 negotiatedProtocol = ""; 933 httpStatusCode = 0; 934 wasCached = false; 935 } 936 937 final long requestHeaderSizeInBytes; 938 final long requestBodySizeInBytes; 939 if (wasCached) { 940 requestHeaderSizeInBytes = 0; 941 requestBodySizeInBytes = 0; 942 } else { 943 requestHeaderSizeInBytes = estimateHeadersSizeInBytes(mRequestHeaders); 944 // TODO(stefanoduo): Add logic to keep track of request body size. 945 requestBodySizeInBytes = -1; 946 } 947 948 final long responseBodySizeInBytes; 949 final long responseHeaderSizeInBytes; 950 if (wasCached) { 951 responseHeaderSizeInBytes = 0; 952 responseBodySizeInBytes = 0; 953 } else { 954 responseHeaderSizeInBytes = estimateHeadersSizeInBytesList(responseHeaders); 955 // Content-Length is not mandatory, if missing report a non-valid response body size 956 // for the time being. 957 if (responseHeaders.containsKey("Content-Length")) { 958 responseBodySizeInBytes = 959 parseContentLengthString(responseHeaders.get("Content-Length").get(0)); 960 } else { 961 // TODO(stefanoduo): Add logic to keep track of response body size. 962 responseBodySizeInBytes = -1; 963 } 964 } 965 966 final Duration headersLatency = Duration.ofSeconds(0); 967 final Duration totalLatency = Duration.ofSeconds(0); 968 969 @State int state = mState.get(); 970 CronetTrafficInfo.RequestTerminalState requestTerminalState; 971 switch (state) { 972 case State.COMPLETE: 973 requestTerminalState = CronetTrafficInfo.RequestTerminalState.SUCCEEDED; 974 break; 975 case State.ERROR: 976 requestTerminalState = CronetTrafficInfo.RequestTerminalState.ERROR; 977 break; 978 case State.CANCELLED: 979 requestTerminalState = CronetTrafficInfo.RequestTerminalState.CANCELLED; 980 break; 981 default: 982 throw new IllegalStateException( 983 "Internal Cronet error: attempted to report metrics but current state (" 984 + state 985 + ") is not a done state!"); 986 } 987 988 return new CronetTrafficInfo( 989 requestHeaderSizeInBytes, 990 requestBodySizeInBytes, 991 responseHeaderSizeInBytes, 992 responseBodySizeInBytes, 993 httpStatusCode, 994 headersLatency, 995 totalLatency, 996 negotiatedProtocol, 997 // There is no connection migration for the fallback implementation. 998 false, // wasConnectionMigrationAttempted 999 false, // didConnectionMigrationSucceed 1000 requestTerminalState); 1001 } 1002 1003 // Maybe report metrics. This method should only be called on Callback's executor thread and 1004 // after Callback's onSucceeded, onFailed and onCanceled. 1005 private void maybeReportMetrics() { 1006 if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { 1007 try { 1008 mLogger.logCronetTrafficInfo(mCronetEngineId, buildCronetTrafficInfo()); 1009 } catch (RuntimeException e) { 1010 // Handle any issue gracefully, we should never crash due failures while 1011 // logging. 1012 Log.i(TAG, "Error while trying to log CronetTrafficInfo: ", e); 1013 } 1014 } 1015 } 1016 1017 void onCanceled(final UrlResponseInfo info) { 1018 closeResponseChannel(); 1019 mUserExecutor.execute( 1020 () -> { 1021 try { 1022 mCallback.onCanceled(JavaUrlRequest.this, info); 1023 } catch (Exception exception) { 1024 Log.e(TAG, "Exception in onCanceled method", exception); 1025 } 1026 maybeReportMetrics(); 1027 mEngine.decrementActiveRequestCount(); 1028 }); 1029 } 1030 1031 void onSucceeded(final UrlResponseInfo info) { 1032 mUserExecutor.execute( 1033 () -> { 1034 try { 1035 mCallback.onSucceeded(JavaUrlRequest.this, info); 1036 } catch (Exception exception) { 1037 Log.e(TAG, "Exception in onSucceeded method", exception); 1038 } 1039 maybeReportMetrics(); 1040 mEngine.decrementActiveRequestCount(); 1041 }); 1042 } 1043 1044 void onFailed(final UrlResponseInfo urlResponseInfo, final CronetException e) { 1045 closeResponseChannel(); 1046 Runnable runnable = 1047 () -> { 1048 try { 1049 mCallback.onFailed(JavaUrlRequest.this, urlResponseInfo, e); 1050 } catch (Exception exception) { 1051 Log.e(TAG, "Exception in onFailed method", exception); 1052 } 1053 maybeReportMetrics(); 1054 mEngine.decrementActiveRequestCount(); 1055 }; 1056 try { 1057 mUserExecutor.execute(runnable); 1058 } catch (InlineExecutionProhibitedException wasDirect) { 1059 if (mFallbackExecutor != null) { 1060 mFallbackExecutor.execute(runnable); 1061 } 1062 } 1063 } 1064 } 1065 1066 private void closeResponseChannel() { 1067 mExecutor.execute( 1068 () -> { 1069 if (mResponseChannel != null) { 1070 try { 1071 mResponseChannel.close(); 1072 } catch (IOException e) { 1073 e.printStackTrace(); 1074 } 1075 mResponseChannel = null; 1076 } 1077 }); 1078 } 1079 1080 private Network getNetworkFromHandle(long networkHandle) { 1081 Network[] networks = 1082 ((ConnectivityManager) 1083 mEngine.getContext().getSystemService(Context.CONNECTIVITY_SERVICE)) 1084 .getAllNetworks(); 1085 1086 for (Network network : networks) { 1087 if (network.getNetworkHandle() == networkHandle) return network; 1088 } 1089 1090 return null; 1091 } 1092 1093 @Override 1094 public int getTrafficStatsUid() { 1095 throw new IllegalStateException("Not Implemented"); 1096 } 1097 1098 @Override 1099 public int getPriority() { 1100 throw new IllegalStateException("Not Implemented"); 1101 } 1102 1103 @Override 1104 public boolean hasTrafficStatsTag() { 1105 throw new IllegalStateException("Not Implemented"); 1106 } 1107 1108 @Override 1109 public boolean hasTrafficStatsUid() { 1110 throw new IllegalStateException("Not Implemented"); 1111 } 1112 1113 @Override 1114 public int getTrafficStatsTag() { 1115 throw new IllegalStateException("Not Implemented"); 1116 } 1117 1118 @Override 1119 public boolean isDirectExecutorAllowed() { 1120 throw new IllegalStateException("Not Implemented"); 1121 } 1122 1123 @Override 1124 public boolean isCacheDisabled() { 1125 throw new IllegalStateException("Not Implemented"); 1126 } 1127 1128 @Override 1129 public UrlResponseInfo.HeaderBlock getHeaders() { 1130 throw new IllegalStateException("Not Implemented"); 1131 } 1132 1133 @Override 1134 public String getHttpMethod() { 1135 throw new IllegalStateException("Not Implemented"); 1136 } 1137 } 1138