1 /* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package okio 17 18 import java.io.IOException 19 import java.io.InterruptedIOException 20 import java.util.concurrent.TimeUnit 21 import java.util.concurrent.locks.Condition 22 import java.util.concurrent.locks.ReentrantLock 23 import kotlin.concurrent.withLock 24 25 /** 26 * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to 27 * implement timeouts where they aren't supported natively, such as to sockets that are blocked on 28 * writing. 29 * 30 * Subclasses should override [timedOut] to take action when a timeout occurs. This method will be 31 * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise, 32 * we risk starving other timeouts from being triggered. 33 * 34 * Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the 35 * timeout to each operation on the wrapped stream. 36 * 37 * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward. 38 * The return value of [exit] indicates whether a timeout was triggered. Note that the call to 39 * [timedOut] is asynchronous, and may be called after [exit]. 40 */ 41 open class AsyncTimeout : Timeout() { 42 private var state = STATE_IDLE 43 44 /** The next node in the linked list. */ 45 private var next: AsyncTimeout? = null 46 47 /** If scheduled, this is the time that the watchdog should time this out. */ 48 private var timeoutAt = 0L 49 enternull50 fun enter() { 51 val timeoutNanos = timeoutNanos() 52 val hasDeadline = hasDeadline() 53 if (timeoutNanos == 0L && !hasDeadline) { 54 return // No timeout and no deadline? Don't bother with the queue. 55 } 56 57 lock.withLock { 58 check(state == STATE_IDLE) { "Unbalanced enter/exit" } 59 state = STATE_IN_QUEUE 60 insertIntoQueue(this, timeoutNanos, hasDeadline) 61 } 62 } 63 64 /** Returns true if the timeout occurred. */ exitnull65 fun exit(): Boolean { 66 lock.withLock { 67 val oldState = this.state 68 state = STATE_IDLE 69 70 if (oldState == STATE_IN_QUEUE) { 71 removeFromQueue(this) 72 return false 73 } else { 74 return oldState == STATE_TIMED_OUT 75 } 76 } 77 } 78 cancelnull79 override fun cancel() { 80 super.cancel() 81 82 lock.withLock { 83 if (state == STATE_IN_QUEUE) { 84 removeFromQueue(this) 85 state = STATE_CANCELED 86 } 87 } 88 } 89 90 /** 91 * Returns the amount of time left until the time out. This will be negative if the timeout has 92 * elapsed and the timeout should occur immediately. 93 */ remainingNanosnull94 private fun remainingNanos(now: Long) = timeoutAt - now 95 96 /** 97 * Invoked by the watchdog thread when the time between calls to [enter] and [exit] has exceeded 98 * the timeout. 99 */ 100 protected open fun timedOut() {} 101 102 /** 103 * Returns a new sink that delegates to [sink], using this to implement timeouts. This works 104 * best if [timedOut] is overridden to interrupt [sink]'s current operation. 105 */ sinknull106 fun sink(sink: Sink): Sink { 107 return object : Sink { 108 override fun write(source: Buffer, byteCount: Long) { 109 checkOffsetAndCount(source.size, 0, byteCount) 110 111 var remaining = byteCount 112 while (remaining > 0L) { 113 // Count how many bytes to write. This loop guarantees we split on a segment boundary. 114 var toWrite = 0L 115 var s = source.head!! 116 while (toWrite < TIMEOUT_WRITE_SIZE) { 117 val segmentSize = s.limit - s.pos 118 toWrite += segmentSize.toLong() 119 if (toWrite >= remaining) { 120 toWrite = remaining 121 break 122 } 123 s = s.next!! 124 } 125 126 // Emit one write. Only this section is subject to the timeout. 127 withTimeout { sink.write(source, toWrite) } 128 remaining -= toWrite 129 } 130 } 131 132 override fun flush() { 133 withTimeout { sink.flush() } 134 } 135 136 override fun close() { 137 withTimeout { sink.close() } 138 } 139 140 override fun timeout() = this@AsyncTimeout 141 142 override fun toString() = "AsyncTimeout.sink($sink)" 143 } 144 } 145 146 /** 147 * Returns a new source that delegates to [source], using this to implement timeouts. This works 148 * best if [timedOut] is overridden to interrupt [source]'s current operation. 149 */ sourcenull150 fun source(source: Source): Source { 151 return object : Source { 152 override fun read(sink: Buffer, byteCount: Long): Long { 153 return withTimeout { source.read(sink, byteCount) } 154 } 155 156 override fun close() { 157 withTimeout { source.close() } 158 } 159 160 override fun timeout() = this@AsyncTimeout 161 162 override fun toString() = "AsyncTimeout.source($source)" 163 } 164 } 165 166 /** 167 * Surrounds [block] with calls to [enter] and [exit], throwing an exception from 168 * [newTimeoutException] if a timeout occurred. 169 */ withTimeoutnull170 inline fun <T> withTimeout(block: () -> T): T { 171 var throwOnTimeout = false 172 enter() 173 try { 174 val result = block() 175 throwOnTimeout = true 176 return result 177 } catch (e: IOException) { 178 throw if (!exit()) e else `access$newTimeoutException`(e) 179 } finally { 180 val timedOut = exit() 181 if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null) 182 } 183 } 184 185 @PublishedApi // Binary compatible trampoline function access$newTimeoutExceptionnull186 internal fun `access$newTimeoutException`(cause: IOException?) = newTimeoutException(cause) 187 188 /** 189 * Returns an [IOException] to represent a timeout. By default this method returns 190 * [InterruptedIOException]. If [cause] is non-null it is set as the cause of the 191 * returned exception. 192 */ 193 protected open fun newTimeoutException(cause: IOException?): IOException { 194 val e = InterruptedIOException("timeout") 195 if (cause != null) { 196 e.initCause(cause) 197 } 198 return e 199 } 200 201 private class Watchdog : Thread("Okio Watchdog") { 202 init { 203 isDaemon = true 204 } 205 runnull206 override fun run() { 207 while (true) { 208 try { 209 var timedOut: AsyncTimeout? 210 lock.withLock { 211 timedOut = awaitTimeout() 212 213 // The queue is completely empty. Let this thread exit and let another watchdog thread 214 // get created on the next call to scheduleTimeout(). 215 if (timedOut === head) { 216 head = null 217 return 218 } 219 } 220 221 // Close the timed out node, if one was found. 222 timedOut?.timedOut() 223 } catch (ignored: InterruptedException) { 224 } 225 } 226 } 227 } 228 229 private companion object { 230 val lock: ReentrantLock = ReentrantLock() 231 val condition: Condition = lock.newCondition() 232 233 /** 234 * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow 235 * connections may suffer timeouts even when they're making (slow) progress. Without this, 236 * writing a single 1 MiB buffer may never succeed on a sufficiently slow connection. 237 */ 238 private const val TIMEOUT_WRITE_SIZE = 64 * 1024 239 240 /** Duration for the watchdog thread to be idle before it shuts itself down. */ 241 private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60) 242 private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS) 243 244 /* 245 * .-------------. 246 * | | 247 * .------------ exit() ------| CANCELED | 248 * | | | 249 * | '-------------' 250 * | ^ 251 * | | cancel() 252 * v | 253 * .-------------. .-------------. 254 * | |---- enter() ----->| | 255 * | IDLE | | IN QUEUE | 256 * | |<---- exit() ------| | 257 * '-------------' '-------------' 258 * ^ | 259 * | | time out 260 * | v 261 * | .-------------. 262 * | | | 263 * '------------ exit() ------| TIMED OUT | 264 * | | 265 * '-------------' 266 * 267 * Notes: 268 * * enter() crashes if called from a state other than IDLE. 269 * * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to 270 * track entered but not in the queue. 271 * * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for 272 * early implementations to support cases where enter() as a no-op. 273 * * cancel() is a no-op from every state but IN QUEUE. 274 */ 275 276 private const val STATE_IDLE = 0 277 private const val STATE_IN_QUEUE = 1 278 private const val STATE_TIMED_OUT = 2 279 private const val STATE_CANCELED = 3 280 281 /** 282 * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be 283 * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue. 284 * 285 * Head's 'next' points to the first element of the linked list. The first element is the next 286 * node to time out, or null if the queue is empty. The head is null until the watchdog thread 287 * is started and also after being idle for [AsyncTimeout.IDLE_TIMEOUT_MILLIS]. 288 */ 289 private var head: AsyncTimeout? = null 290 insertIntoQueuenull291 private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { 292 // Start the watchdog thread and create the head node when the first timeout is scheduled. 293 if (head == null) { 294 head = AsyncTimeout() 295 Watchdog().start() 296 } 297 298 val now = System.nanoTime() 299 if (timeoutNanos != 0L && hasDeadline) { 300 // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap 301 // around, minOf() is undefined for absolute values, but meaningful for relative ones. 302 node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) 303 } else if (timeoutNanos != 0L) { 304 node.timeoutAt = now + timeoutNanos 305 } else if (hasDeadline) { 306 node.timeoutAt = node.deadlineNanoTime() 307 } else { 308 throw AssertionError() 309 } 310 311 // Insert the node in sorted order. 312 val remainingNanos = node.remainingNanos(now) 313 var prev = head!! 314 while (true) { 315 if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { 316 node.next = prev.next 317 prev.next = node 318 if (prev === head) { 319 // Wake up the watchdog when inserting at the front. 320 condition.signal() 321 } 322 break 323 } 324 prev = prev.next!! 325 } 326 } 327 328 /** Returns true if the timeout occurred. */ removeFromQueuenull329 private fun removeFromQueue(node: AsyncTimeout) { 330 var prev = head 331 while (prev != null) { 332 if (prev.next === node) { 333 prev.next = node.next 334 node.next = null 335 return 336 } 337 prev = prev.next 338 } 339 340 error("node was not found in the queue") 341 } 342 343 /** 344 * Removes and returns the node at the head of the list, waiting for it to time out if 345 * necessary. This returns [head] if there was no node at the head of the list when starting, 346 * and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a 347 * new node was inserted while waiting. Otherwise, this returns the node being waited on that 348 * has been removed. 349 */ 350 @Throws(InterruptedException::class) awaitTimeoutnull351 fun awaitTimeout(): AsyncTimeout? { 352 // Get the next eligible node. 353 val node = head!!.next 354 355 // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. 356 if (node == null) { 357 val startNanos = System.nanoTime() 358 condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) 359 return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) { 360 head // The idle timeout elapsed. 361 } else { 362 null // The situation has changed. 363 } 364 } 365 366 val waitNanos = node.remainingNanos(System.nanoTime()) 367 368 // The head of the queue hasn't timed out yet. Await that. 369 if (waitNanos > 0) { 370 condition.await(waitNanos, TimeUnit.NANOSECONDS) 371 return null 372 } 373 374 // The head of the queue has timed out. Remove it. 375 head!!.next = node.next 376 node.next = null 377 node.state = STATE_TIMED_OUT 378 return node 379 } 380 } 381 } 382