xref: /aosp_15_r20/external/okio/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt (revision f9742813c14b702d71392179818a9e591da8620c)
1 /*
2  * Copyright (C) 2014 Square, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package okio
17 
18 import java.io.IOException
19 import java.io.InterruptedIOException
20 import java.util.concurrent.TimeUnit
21 import java.util.concurrent.locks.Condition
22 import java.util.concurrent.locks.ReentrantLock
23 import kotlin.concurrent.withLock
24 
25 /**
26  * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
27  * implement timeouts where they aren't supported natively, such as to sockets that are blocked on
28  * writing.
29  *
30  * Subclasses should override [timedOut] to take action when a timeout occurs. This method will be
31  * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise,
32  * we risk starving other timeouts from being triggered.
33  *
34  * Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the
35  * timeout to each operation on the wrapped stream.
36  *
37  * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward.
38  * The return value of [exit] indicates whether a timeout was triggered. Note that the call to
39  * [timedOut] is asynchronous, and may be called after [exit].
40  */
41 open class AsyncTimeout : Timeout() {
42   private var state = STATE_IDLE
43 
44   /** The next node in the linked list.  */
45   private var next: AsyncTimeout? = null
46 
47   /** If scheduled, this is the time that the watchdog should time this out.  */
48   private var timeoutAt = 0L
49 
enternull50   fun enter() {
51     val timeoutNanos = timeoutNanos()
52     val hasDeadline = hasDeadline()
53     if (timeoutNanos == 0L && !hasDeadline) {
54       return // No timeout and no deadline? Don't bother with the queue.
55     }
56 
57     lock.withLock {
58       check(state == STATE_IDLE) { "Unbalanced enter/exit" }
59       state = STATE_IN_QUEUE
60       insertIntoQueue(this, timeoutNanos, hasDeadline)
61     }
62   }
63 
64   /** Returns true if the timeout occurred.  */
exitnull65   fun exit(): Boolean {
66     lock.withLock {
67       val oldState = this.state
68       state = STATE_IDLE
69 
70       if (oldState == STATE_IN_QUEUE) {
71         removeFromQueue(this)
72         return false
73       } else {
74         return oldState == STATE_TIMED_OUT
75       }
76     }
77   }
78 
cancelnull79   override fun cancel() {
80     super.cancel()
81 
82     lock.withLock {
83       if (state == STATE_IN_QUEUE) {
84         removeFromQueue(this)
85         state = STATE_CANCELED
86       }
87     }
88   }
89 
90   /**
91    * Returns the amount of time left until the time out. This will be negative if the timeout has
92    * elapsed and the timeout should occur immediately.
93    */
remainingNanosnull94   private fun remainingNanos(now: Long) = timeoutAt - now
95 
96   /**
97    * Invoked by the watchdog thread when the time between calls to [enter] and [exit] has exceeded
98    * the timeout.
99    */
100   protected open fun timedOut() {}
101 
102   /**
103    * Returns a new sink that delegates to [sink], using this to implement timeouts. This works
104    * best if [timedOut] is overridden to interrupt [sink]'s current operation.
105    */
sinknull106   fun sink(sink: Sink): Sink {
107     return object : Sink {
108       override fun write(source: Buffer, byteCount: Long) {
109         checkOffsetAndCount(source.size, 0, byteCount)
110 
111         var remaining = byteCount
112         while (remaining > 0L) {
113           // Count how many bytes to write. This loop guarantees we split on a segment boundary.
114           var toWrite = 0L
115           var s = source.head!!
116           while (toWrite < TIMEOUT_WRITE_SIZE) {
117             val segmentSize = s.limit - s.pos
118             toWrite += segmentSize.toLong()
119             if (toWrite >= remaining) {
120               toWrite = remaining
121               break
122             }
123             s = s.next!!
124           }
125 
126           // Emit one write. Only this section is subject to the timeout.
127           withTimeout { sink.write(source, toWrite) }
128           remaining -= toWrite
129         }
130       }
131 
132       override fun flush() {
133         withTimeout { sink.flush() }
134       }
135 
136       override fun close() {
137         withTimeout { sink.close() }
138       }
139 
140       override fun timeout() = this@AsyncTimeout
141 
142       override fun toString() = "AsyncTimeout.sink($sink)"
143     }
144   }
145 
146   /**
147    * Returns a new source that delegates to [source], using this to implement timeouts. This works
148    * best if [timedOut] is overridden to interrupt [source]'s current operation.
149    */
sourcenull150   fun source(source: Source): Source {
151     return object : Source {
152       override fun read(sink: Buffer, byteCount: Long): Long {
153         return withTimeout { source.read(sink, byteCount) }
154       }
155 
156       override fun close() {
157         withTimeout { source.close() }
158       }
159 
160       override fun timeout() = this@AsyncTimeout
161 
162       override fun toString() = "AsyncTimeout.source($source)"
163     }
164   }
165 
166   /**
167    * Surrounds [block] with calls to [enter] and [exit], throwing an exception from
168    * [newTimeoutException] if a timeout occurred.
169    */
withTimeoutnull170   inline fun <T> withTimeout(block: () -> T): T {
171     var throwOnTimeout = false
172     enter()
173     try {
174       val result = block()
175       throwOnTimeout = true
176       return result
177     } catch (e: IOException) {
178       throw if (!exit()) e else `access$newTimeoutException`(e)
179     } finally {
180       val timedOut = exit()
181       if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null)
182     }
183   }
184 
185   @PublishedApi // Binary compatible trampoline function
access$newTimeoutExceptionnull186   internal fun `access$newTimeoutException`(cause: IOException?) = newTimeoutException(cause)
187 
188   /**
189    * Returns an [IOException] to represent a timeout. By default this method returns
190    * [InterruptedIOException]. If [cause] is non-null it is set as the cause of the
191    * returned exception.
192    */
193   protected open fun newTimeoutException(cause: IOException?): IOException {
194     val e = InterruptedIOException("timeout")
195     if (cause != null) {
196       e.initCause(cause)
197     }
198     return e
199   }
200 
201   private class Watchdog : Thread("Okio Watchdog") {
202     init {
203       isDaemon = true
204     }
205 
runnull206     override fun run() {
207       while (true) {
208         try {
209           var timedOut: AsyncTimeout?
210           lock.withLock {
211             timedOut = awaitTimeout()
212 
213             // The queue is completely empty. Let this thread exit and let another watchdog thread
214             // get created on the next call to scheduleTimeout().
215             if (timedOut === head) {
216               head = null
217               return
218             }
219           }
220 
221           // Close the timed out node, if one was found.
222           timedOut?.timedOut()
223         } catch (ignored: InterruptedException) {
224         }
225       }
226     }
227   }
228 
229   private companion object {
230     val lock: ReentrantLock = ReentrantLock()
231     val condition: Condition = lock.newCondition()
232 
233     /**
234      * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
235      * connections may suffer timeouts even when they're making (slow) progress. Without this,
236      * writing a single 1 MiB buffer may never succeed on a sufficiently slow connection.
237      */
238     private const val TIMEOUT_WRITE_SIZE = 64 * 1024
239 
240     /** Duration for the watchdog thread to be idle before it shuts itself down.  */
241     private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
242     private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
243 
244     /*
245      *                                       .-------------.
246      *                                       |             |
247      *            .------------ exit() ------|  CANCELED   |
248      *            |                          |             |
249      *            |                          '-------------'
250      *            |                                 ^
251      *            |                                 |  cancel()
252      *            v                                 |
253      *     .-------------.                   .-------------.
254      *     |             |---- enter() ----->|             |
255      *     |    IDLE     |                   |  IN QUEUE   |
256      *     |             |<---- exit() ------|             |
257      *     '-------------'                   '-------------'
258      *            ^                                 |
259      *            |                                 |  time out
260      *            |                                 v
261      *            |                          .-------------.
262      *            |                          |             |
263      *            '------------ exit() ------|  TIMED OUT  |
264      *                                       |             |
265      *                                       '-------------'
266      *
267      * Notes:
268      *  * enter() crashes if called from a state other than IDLE.
269      *  * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to
270      *    track entered but not in the queue.
271      *  * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for
272      *    early implementations to support cases where enter() as a no-op.
273      *  * cancel() is a no-op from every state but IN QUEUE.
274      */
275 
276     private const val STATE_IDLE = 0
277     private const val STATE_IN_QUEUE = 1
278     private const val STATE_TIMED_OUT = 2
279     private const val STATE_CANCELED = 3
280 
281     /**
282      * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
283      * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
284      *
285      * Head's 'next' points to the first element of the linked list. The first element is the next
286      * node to time out, or null if the queue is empty. The head is null until the watchdog thread
287      * is started and also after being idle for [AsyncTimeout.IDLE_TIMEOUT_MILLIS].
288      */
289     private var head: AsyncTimeout? = null
290 
insertIntoQueuenull291     private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
292       // Start the watchdog thread and create the head node when the first timeout is scheduled.
293       if (head == null) {
294         head = AsyncTimeout()
295         Watchdog().start()
296       }
297 
298       val now = System.nanoTime()
299       if (timeoutNanos != 0L && hasDeadline) {
300         // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
301         // around, minOf() is undefined for absolute values, but meaningful for relative ones.
302         node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
303       } else if (timeoutNanos != 0L) {
304         node.timeoutAt = now + timeoutNanos
305       } else if (hasDeadline) {
306         node.timeoutAt = node.deadlineNanoTime()
307       } else {
308         throw AssertionError()
309       }
310 
311       // Insert the node in sorted order.
312       val remainingNanos = node.remainingNanos(now)
313       var prev = head!!
314       while (true) {
315         if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
316           node.next = prev.next
317           prev.next = node
318           if (prev === head) {
319             // Wake up the watchdog when inserting at the front.
320             condition.signal()
321           }
322           break
323         }
324         prev = prev.next!!
325       }
326     }
327 
328     /** Returns true if the timeout occurred. */
removeFromQueuenull329     private fun removeFromQueue(node: AsyncTimeout) {
330       var prev = head
331       while (prev != null) {
332         if (prev.next === node) {
333           prev.next = node.next
334           node.next = null
335           return
336         }
337         prev = prev.next
338       }
339 
340       error("node was not found in the queue")
341     }
342 
343     /**
344      * Removes and returns the node at the head of the list, waiting for it to time out if
345      * necessary. This returns [head] if there was no node at the head of the list when starting,
346      * and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a
347      * new node was inserted while waiting. Otherwise, this returns the node being waited on that
348      * has been removed.
349      */
350     @Throws(InterruptedException::class)
awaitTimeoutnull351     fun awaitTimeout(): AsyncTimeout? {
352       // Get the next eligible node.
353       val node = head!!.next
354 
355       // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
356       if (node == null) {
357         val startNanos = System.nanoTime()
358         condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
359         return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
360           head // The idle timeout elapsed.
361         } else {
362           null // The situation has changed.
363         }
364       }
365 
366       val waitNanos = node.remainingNanos(System.nanoTime())
367 
368       // The head of the queue hasn't timed out yet. Await that.
369       if (waitNanos > 0) {
370         condition.await(waitNanos, TimeUnit.NANOSECONDS)
371         return null
372       }
373 
374       // The head of the queue has timed out. Remove it.
375       head!!.next = node.next
376       node.next = null
377       node.state = STATE_TIMED_OUT
378       return node
379     }
380   }
381 }
382