xref: /aosp_15_r20/external/kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt (revision 7a7160fed73afa6648ef8aa100d4a336fe921d9a)
1 package kotlinx.coroutines.channels
2 
3 import kotlinx.coroutines.*
4 import kotlin.coroutines.*
5 
6 /**
7  * Mode for [ticker] function.
8  *
9  * **Note: Ticker channels are not currently integrated with structured concurrency and their api will change in the future.**
10  */
11 @ObsoleteCoroutinesApi
12 public enum class TickerMode {
13     /**
14      * Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
15      * **This is a default mode.**
16      *
17      * ```
18      * val channel = ticker(delay = 100)
19      * delay(350) // 250 ms late
20      * println(channel.tryReceive().getOrNull()) // prints Unit
21      * println(channel.tryReceive().getOrNull()) // prints null
22      *
23      * delay(50)
24      * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted
25      * delay(50)
26      * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element
27      * ```
28      */
29     FIXED_PERIOD,
30 
31     /**
32      * Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
33      */
34     FIXED_DELAY
35 }
36 
37 /**
38  * Creates a channel that produces the first item after the given initial delay and subsequent items with the
39  * given delay between them.
40  *
41  * The resulting channel is a _rendezvous channel_. When receiver from this channel does not keep
42  * up with receiving the elements from this channel, they are not being sent due to backpressure. The actual
43  * timing behavior of ticker in this case is controlled by [mode] parameter which
44  * is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
45  *
46  * This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
47  *
48  * **Note** producer to this channel is dispatched via [Dispatchers.Unconfined] by default and started eagerly.
49  *
50  * **Note: Ticker channels are not currently integrated with structured concurrency and their api will change in the future.**
51  *
52  * @param delayMillis delay between each element in milliseconds.
53  * @param initialDelayMillis delay after which the first element will be produced (it is equal to [delayMillis] by default) in milliseconds.
54  * @param context context of the producing coroutine.
55  * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
56  */
57 @ObsoleteCoroutinesApi
tickernull58 public fun ticker(
59     delayMillis: Long,
60     initialDelayMillis: Long = delayMillis,
61     context: CoroutineContext = EmptyCoroutineContext,
62     mode: TickerMode = TickerMode.FIXED_PERIOD
63 ): ReceiveChannel<Unit> {
64     require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
65     require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
66     return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) {
67         when (mode) {
68             TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel)
69             TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel)
70         }
71     }
72 }
73 
fixedPeriodTickernull74 private suspend fun fixedPeriodTicker(
75     delayMillis: Long,
76     initialDelayMillis: Long,
77     channel: SendChannel<Unit>
78 ) {
79     var deadline = nanoTime() + delayToNanos(initialDelayMillis)
80     delay(initialDelayMillis)
81     val delayNs = delayToNanos(delayMillis)
82     while (true) {
83         deadline += delayNs
84         channel.send(Unit)
85         val now = nanoTime()
86         val nextDelay = (deadline - now).coerceAtLeast(0)
87         if (nextDelay == 0L && delayNs != 0L) {
88             val adjustedDelay = delayNs - (now - deadline) % delayNs
89             deadline = now + adjustedDelay
90             delay(delayNanosToMillis(adjustedDelay))
91         } else {
92             delay(delayNanosToMillis(nextDelay))
93         }
94     }
95 }
96 
fixedDelayTickernull97 private suspend fun fixedDelayTicker(
98     delayMillis: Long,
99     initialDelayMillis: Long,
100     channel: SendChannel<Unit>
101 ) {
102     delay(initialDelayMillis)
103     while (true) {
104         channel.send(Unit)
105         delay(delayMillis)
106     }
107 }
108