1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust_util::properties::*;
10 use rusticl_opencl_gen::*;
11
12 use std::cmp;
13 use std::mem;
14 use std::mem::ManuallyDrop;
15 use std::ops::Deref;
16 use std::sync::mpsc;
17 use std::sync::Arc;
18 use std::sync::Mutex;
19 use std::sync::Weak;
20 use std::thread;
21 use std::thread::JoinHandle;
22
23 /// State tracking wrapper for [PipeContext]
24 ///
25 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
26 pub struct QueueContext {
27 // need to use ManuallyDrop so we can recycle the context without cloning
28 ctx: ManuallyDrop<PipeContext>,
29 dev: &'static Device,
30 use_stream: bool,
31 }
32
33 impl QueueContext {
new_for(device: &'static Device) -> CLResult<Self>34 fn new_for(device: &'static Device) -> CLResult<Self> {
35 let ctx = device.create_context().ok_or(CL_OUT_OF_HOST_MEMORY)?;
36
37 Ok(Self {
38 ctx: ManuallyDrop::new(ctx),
39 dev: device,
40 use_stream: device.prefers_real_buffer_in_cb0(),
41 })
42 }
43
update_cb0(&self, data: &[u8]) -> CLResult<()>44 pub fn update_cb0(&self, data: &[u8]) -> CLResult<()> {
45 // only update if we actually bind data
46 if !data.is_empty() {
47 if self.use_stream {
48 if !self.ctx.set_constant_buffer_stream(0, data) {
49 return Err(CL_OUT_OF_RESOURCES);
50 }
51 } else {
52 self.ctx.set_constant_buffer(0, data);
53 }
54 }
55 Ok(())
56 }
57 }
58
59 // This should go once we moved all state tracking into QueueContext
60 impl Deref for QueueContext {
61 type Target = PipeContext;
62
deref(&self) -> &Self::Target63 fn deref(&self) -> &Self::Target {
64 &self.ctx
65 }
66 }
67
68 impl Drop for QueueContext {
drop(&mut self)69 fn drop(&mut self) {
70 let ctx = unsafe { ManuallyDrop::take(&mut self.ctx) };
71 ctx.set_constant_buffer(0, &[]);
72 self.dev.recycle_context(ctx);
73 }
74 }
75
76 struct QueueState {
77 pending: Vec<Arc<Event>>,
78 last: Weak<Event>,
79 // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
80 // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
81 chan_in: mpsc::Sender<Vec<Arc<Event>>>,
82 }
83
84 pub struct Queue {
85 pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
86 pub context: Arc<Context>,
87 pub device: &'static Device,
88 pub props: cl_command_queue_properties,
89 pub props_v2: Option<Properties<cl_queue_properties>>,
90 state: Mutex<QueueState>,
91 _thrd: JoinHandle<()>,
92 }
93
94 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
95
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext)96 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
97 if !evs.is_empty() {
98 pipe.flush().wait();
99 evs.drain(..).for_each(|e| e.signal());
100 }
101 }
102
103 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Option<Properties<cl_queue_properties>>, ) -> CLResult<Arc<Queue>>104 pub fn new(
105 context: Arc<Context>,
106 device: &'static Device,
107 props: cl_command_queue_properties,
108 props_v2: Option<Properties<cl_queue_properties>>,
109 ) -> CLResult<Arc<Queue>> {
110 // we assume that memory allocation is the only possible failure. Any other failure reason
111 // should be detected earlier (e.g.: checking for CAPs).
112 let ctx = QueueContext::new_for(device)?;
113 let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
114 Ok(Arc::new(Self {
115 base: CLObjectBase::new(RusticlTypes::Queue),
116 context: context,
117 device: device,
118 props: props,
119 props_v2: props_v2,
120 state: Mutex::new(QueueState {
121 pending: Vec::new(),
122 last: Weak::new(),
123 chan_in: tx_q,
124 }),
125 _thrd: thread::Builder::new()
126 .name("rusticl queue thread".into())
127 .spawn(move || {
128 // Track the error of all executed events. This is only needed for in-order
129 // queues, so for out of order we'll need to update this.
130 // Also, the OpenCL specification gives us enough freedom to do whatever we want
131 // in case of any event running into an error while executing:
132 //
133 // Unsuccessful completion results in abnormal termination of the command
134 // which is indicated by setting the event status to a negative value. In this
135 // case, the command-queue associated with the abnormally terminated command
136 // and all other command-queues in the same context may no longer be available
137 // and their behavior is implementation-defined.
138 //
139 // TODO: use pipe_context::set_device_reset_callback to get notified about gone
140 // GPU contexts
141 let mut last_err = CL_SUCCESS as cl_int;
142 loop {
143 let r = rx_t.recv();
144 if r.is_err() {
145 break;
146 }
147
148 let new_events = r.unwrap();
149 let mut flushed = Vec::new();
150
151 for e in new_events {
152 // If we hit any deps from another queue, flush so we don't risk a dead
153 // lock.
154 if e.deps.iter().any(|ev| ev.queue != e.queue) {
155 flush_events(&mut flushed, &ctx);
156 }
157
158 // check if any dependency has an error
159 for dep in &e.deps {
160 // We have to wait on user events or events from other queues.
161 let dep_err = if dep.is_user() || dep.queue != e.queue {
162 dep.wait()
163 } else {
164 dep.status()
165 };
166
167 last_err = cmp::min(last_err, dep_err);
168 }
169
170 if last_err < 0 {
171 // If a dependency failed, fail this event as well.
172 e.set_user_status(last_err);
173 continue;
174 }
175
176 // if there is an execution error don't bother signaling it as the context
177 // might be in a broken state. How queues behave after any event hit an
178 // error is entirely implementation defined.
179 last_err = e.call(&ctx);
180 if last_err < 0 {
181 continue;
182 }
183
184 if e.is_user() {
185 // On each user event we flush our events as application might
186 // wait on them before signaling user events.
187 flush_events(&mut flushed, &ctx);
188
189 // Wait on user events as they are synchronization points in the
190 // application's control.
191 e.wait();
192 } else if Platform::dbg().sync_every_event {
193 flushed.push(e);
194 flush_events(&mut flushed, &ctx);
195 } else {
196 flushed.push(e);
197 }
198 }
199
200 flush_events(&mut flushed, &ctx);
201 }
202 })
203 .unwrap(),
204 }))
205 }
206
queue(&self, e: Arc<Event>)207 pub fn queue(&self, e: Arc<Event>) {
208 if self.is_profiling_enabled() {
209 e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
210 }
211 self.state.lock().unwrap().pending.push(e);
212 }
213
flush(&self, wait: bool) -> CLResult<()>214 pub fn flush(&self, wait: bool) -> CLResult<()> {
215 let mut state = self.state.lock().unwrap();
216 let events = mem::take(&mut state.pending);
217 let mut queues = Event::deep_unflushed_queues(&events);
218
219 // Update last if and only if we get new events, this prevents breaking application code
220 // doing things like `clFlush(q); clFinish(q);`
221 if let Some(last) = events.last() {
222 state.last = Arc::downgrade(last);
223
224 // This should never ever error, but if it does return an error
225 state
226 .chan_in
227 .send(events)
228 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
229 }
230
231 let last = wait.then(|| state.last.clone());
232
233 // We have to unlock before actually flushing otherwise we'll run into dead locks when a
234 // queue gets flushed concurrently.
235 drop(state);
236
237 // We need to flush out other queues implicitly and this _has_ to happen after taking the
238 // pending events, otherwise we'll risk dead locks when waiting on events.
239 queues.remove(self);
240 for q in queues {
241 q.flush(false)?;
242 }
243
244 if let Some(last) = last {
245 // Waiting on the last event is good enough here as the queue will process it in order
246 // It's not a problem if the weak ref is invalid as that means the work is already done
247 // and waiting isn't necessary anymore.
248 last.upgrade().map(|e| e.wait());
249 }
250 Ok(())
251 }
252
is_profiling_enabled(&self) -> bool253 pub fn is_profiling_enabled(&self) -> bool {
254 (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
255 }
256 }
257
258 impl Drop for Queue {
drop(&mut self)259 fn drop(&mut self) {
260 // when deleting the application side object, we have to flush
261 // From the OpenCL spec:
262 // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
263 // commands in command_queue.
264 // TODO: maybe we have to do it on every release?
265 let _ = self.flush(true);
266 }
267 }
268