1 package kotlinx.coroutines.channels
2
3 import kotlinx.coroutines.*
4 import kotlin.coroutines.*
5
6 /**
7 * Mode for [ticker] function.
8 *
9 * **Note: Ticker channels are not currently integrated with structured concurrency and their api will change in the future.**
10 */
11 @ObsoleteCoroutinesApi
12 public enum class TickerMode {
13 /**
14 * Adjust delay to maintain fixed period if consumer cannot keep up or is otherwise slow.
15 * **This is a default mode.**
16 *
17 * ```
18 * val channel = ticker(delay = 100)
19 * delay(350) // 250 ms late
20 * println(channel.tryReceive().getOrNull()) // prints Unit
21 * println(channel.tryReceive().getOrNull()) // prints null
22 *
23 * delay(50)
24 * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted
25 * delay(50)
26 * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element
27 * ```
28 */
29 FIXED_PERIOD,
30
31 /**
32 * Maintains fixed delay between produced elements if consumer cannot keep up or it otherwise slow.
33 */
34 FIXED_DELAY
35 }
36
37 /**
38 * Creates a channel that produces the first item after the given initial delay and subsequent items with the
39 * given delay between them.
40 *
41 * The resulting channel is a _rendezvous channel_. When receiver from this channel does not keep
42 * up with receiving the elements from this channel, they are not being sent due to backpressure. The actual
43 * timing behavior of ticker in this case is controlled by [mode] parameter which
44 * is set to [TickerMode.FIXED_PERIOD] by default. See [TickerMode] for other details.
45 *
46 * This channel stops producing elements immediately after [ReceiveChannel.cancel] invocation.
47 *
48 * **Note** producer to this channel is dispatched via [Dispatchers.Unconfined] by default and started eagerly.
49 *
50 * **Note: Ticker channels are not currently integrated with structured concurrency and their api will change in the future.**
51 *
52 * @param delayMillis delay between each element in milliseconds.
53 * @param initialDelayMillis delay after which the first element will be produced (it is equal to [delayMillis] by default) in milliseconds.
54 * @param context context of the producing coroutine.
55 * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default).
56 */
57 @ObsoleteCoroutinesApi
tickernull58 public fun ticker(
59 delayMillis: Long,
60 initialDelayMillis: Long = delayMillis,
61 context: CoroutineContext = EmptyCoroutineContext,
62 mode: TickerMode = TickerMode.FIXED_PERIOD
63 ): ReceiveChannel<Unit> {
64 require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
65 require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
66 return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) {
67 when (mode) {
68 TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel)
69 TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel)
70 }
71 }
72 }
73
fixedPeriodTickernull74 private suspend fun fixedPeriodTicker(
75 delayMillis: Long,
76 initialDelayMillis: Long,
77 channel: SendChannel<Unit>
78 ) {
79 var deadline = nanoTime() + delayToNanos(initialDelayMillis)
80 delay(initialDelayMillis)
81 val delayNs = delayToNanos(delayMillis)
82 while (true) {
83 deadline += delayNs
84 channel.send(Unit)
85 val now = nanoTime()
86 val nextDelay = (deadline - now).coerceAtLeast(0)
87 if (nextDelay == 0L && delayNs != 0L) {
88 val adjustedDelay = delayNs - (now - deadline) % delayNs
89 deadline = now + adjustedDelay
90 delay(delayNanosToMillis(adjustedDelay))
91 } else {
92 delay(delayNanosToMillis(nextDelay))
93 }
94 }
95 }
96
fixedDelayTickernull97 private suspend fun fixedDelayTicker(
98 delayMillis: Long,
99 initialDelayMillis: Long,
100 channel: SendChannel<Unit>
101 ) {
102 delay(initialDelayMillis)
103 while (true) {
104 channel.send(Unit)
105 delay(delayMillis)
106 }
107 }
108