<lambda>null1 @file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
2
3 package kotlinx.coroutines.debug
4
5 import kotlinx.coroutines.scheduling.*
6 import reactor.blockhound.*
7 import reactor.blockhound.integration.*
8
9 /**
10 * @suppress
11 */
12 public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {
13
14 override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
15 allowBlockingCallsInPrimitiveImplementations()
16 allowBlockingWhenEnqueuingTasks()
17 allowServiceLoaderInvocationsOnInit()
18 allowBlockingCallsInReflectionImpl()
19 allowBlockingCallsInDebugProbes()
20 allowBlockingCallsInWorkQueue()
21 // Stacktrace recovery cache is guarded by lock
22 allowBlockingCallsInside("kotlinx.coroutines.internal.ExceptionsConstructorKt", "tryCopyException")
23 /* The predicates that define that BlockHound should only report blocking calls from threads that are part of
24 the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
25 addDynamicThreadPredicate { isSchedulerWorker(it) }
26 nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
27 }
28
29 /**
30 * Allows blocking calls in various coroutine structures, such as flows and channels.
31 *
32 * They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
33 * locking in such places doesn't affect the program liveness.
34 */
35 private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
36 allowBlockingCallsInJobSupport()
37 allowBlockingCallsInThreadSafeHeap()
38 allowBlockingCallsInFlow()
39 allowBlockingCallsInChannels()
40 }
41
42 /**
43 * Allows blocking inside [kotlinx.coroutines.JobSupport].
44 */
45 private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
46 for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
47 "tryMakeCompleting"))
48 {
49 allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
50 }
51 }
52
53 /**
54 * Allow blocking calls inside [kotlinx.coroutines.debug.internal.DebugProbesImpl].
55 */
56 private fun BlockHound.Builder.allowBlockingCallsInDebugProbes() {
57 for (method in listOf("install", "uninstall", "hierarchyToString", "dumpCoroutinesInfo", "dumpDebuggerInfo",
58 "dumpCoroutinesSynchronized", "updateRunningState", "updateState"))
59 {
60 allowBlockingCallsInside("kotlinx.coroutines.debug.internal.DebugProbesImpl", method)
61 }
62 }
63
64 /**
65 * Allow blocking calls inside [kotlinx.coroutines.scheduling.WorkQueue]
66 */
67 private fun BlockHound.Builder.allowBlockingCallsInWorkQueue() {
68 /** uses [Thread.yield] in a benign way. */
69 allowBlockingCallsInside("kotlinx.coroutines.scheduling.WorkQueue", "addLast")
70 }
71
72 /**
73 * Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
74 */
75 private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
76 for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
77 allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
78 }
79 }
80
81 private fun BlockHound.Builder.allowBlockingCallsInFlow() {
82 allowBlockingCallsInsideStateFlow()
83 allowBlockingCallsInsideSharedFlow()
84 }
85
86 /**
87 * Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
88 */
89 private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
90 allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
91 }
92
93 /**
94 * Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
95 */
96 private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
97 for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
98 "tryTakeValue", "resetReplayCache"))
99 {
100 allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
101 }
102 for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
103 allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
104 }
105 }
106
107 private fun BlockHound.Builder.allowBlockingCallsInChannels() {
108 allowBlockingCallsInBroadcastChannels()
109 allowBlockingCallsInConflatedChannels()
110 }
111
112 /**
113 * Allows blocking inside [kotlinx.coroutines.channels.BroadcastChannel].
114 */
115 private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannels() {
116 for (method in listOf("openSubscription", "removeSubscriber", "send", "trySend", "registerSelectForSend",
117 "close", "cancelImpl", "isClosedForSend", "value", "valueOrNull"))
118 {
119 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl", method)
120 }
121 for (method in listOf("cancelImpl")) {
122 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberConflated", method)
123 }
124 for (method in listOf("cancelImpl")) {
125 allowBlockingCallsInside("kotlinx.coroutines.channels.BroadcastChannelImpl\$SubscriberBuffered", method)
126 }
127 }
128
129 /**
130 * Allows blocking inside [kotlinx.coroutines.channels.ConflatedBufferedChannel].
131 */
132 private fun BlockHound.Builder.allowBlockingCallsInConflatedChannels() {
133 for (method in listOf("receive", "receiveCatching", "tryReceive", "registerSelectForReceive",
134 "send", "trySend", "sendBroadcast", "registerSelectForSend",
135 "close", "cancelImpl", "isClosedForSend", "isClosedForReceive", "isEmpty"))
136 {
137 allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel", method)
138 }
139 for (method in listOf("hasNext")) {
140 allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedBufferedChannel\$ConflatedChannelIterator", method)
141 }
142 }
143
144 /**
145 * Allows blocking when enqueuing tasks into a thread pool.
146 *
147 * Without this, the following code breaks:
148 * ```
149 * withContext(Dispatchers.Default) {
150 * withContext(newSingleThreadContext("singleThreadedContext")) {
151 * }
152 * }
153 * ```
154 */
155 private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
156 /* This method may block as part of its implementation, but is probably safe. */
157 allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
158 }
159
160 /**
161 * Allows instances of [java.util.ServiceLoader] being called.
162 *
163 * Each instance is listed separately; another approach could be to generally allow the operations performed by
164 * service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
165 * large API surface, with some methods being hidden as implementation details (in particular, the implementation of
166 * its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
167 * would be brittle, so here we only provide clearance rules for some specific instances.
168 */
169 private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
170 allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
171 allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
172 // not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
173 allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
174 }
175
176 /**
177 * Allows some blocking calls from the reflection API.
178 *
179 * The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
180 * simple examples work without problems.
181 */
182 private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
183 allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
184 }
185
186 }
187