xref: /aosp_15_r20/external/mesa3d/src/gallium/frontends/rusticl/core/queue.rs (revision 6104692788411f58d303aa86923a9ff6ecaded22)
1 use crate::api::icd::*;
2 use crate::core::context::*;
3 use crate::core::device::*;
4 use crate::core::event::*;
5 use crate::core::platform::*;
6 use crate::impl_cl_type_trait;
7 
8 use mesa_rust::pipe::context::PipeContext;
9 use mesa_rust_util::properties::*;
10 use rusticl_opencl_gen::*;
11 
12 use std::cmp;
13 use std::mem;
14 use std::mem::ManuallyDrop;
15 use std::ops::Deref;
16 use std::sync::mpsc;
17 use std::sync::Arc;
18 use std::sync::Mutex;
19 use std::sync::Weak;
20 use std::thread;
21 use std::thread::JoinHandle;
22 
23 /// State tracking wrapper for [PipeContext]
24 ///
25 /// Used for tracking bound GPU state to lower CPU overhead and centralize state tracking
26 pub struct QueueContext {
27     // need to use ManuallyDrop so we can recycle the context without cloning
28     ctx: ManuallyDrop<PipeContext>,
29     dev: &'static Device,
30     use_stream: bool,
31 }
32 
33 impl QueueContext {
new_for(device: &'static Device) -> CLResult<Self>34     fn new_for(device: &'static Device) -> CLResult<Self> {
35         let ctx = device.create_context().ok_or(CL_OUT_OF_HOST_MEMORY)?;
36 
37         Ok(Self {
38             ctx: ManuallyDrop::new(ctx),
39             dev: device,
40             use_stream: device.prefers_real_buffer_in_cb0(),
41         })
42     }
43 
update_cb0(&self, data: &[u8]) -> CLResult<()>44     pub fn update_cb0(&self, data: &[u8]) -> CLResult<()> {
45         // only update if we actually bind data
46         if !data.is_empty() {
47             if self.use_stream {
48                 if !self.ctx.set_constant_buffer_stream(0, data) {
49                     return Err(CL_OUT_OF_RESOURCES);
50                 }
51             } else {
52                 self.ctx.set_constant_buffer(0, data);
53             }
54         }
55         Ok(())
56     }
57 }
58 
59 // This should go once we moved all state tracking into QueueContext
60 impl Deref for QueueContext {
61     type Target = PipeContext;
62 
deref(&self) -> &Self::Target63     fn deref(&self) -> &Self::Target {
64         &self.ctx
65     }
66 }
67 
68 impl Drop for QueueContext {
drop(&mut self)69     fn drop(&mut self) {
70         let ctx = unsafe { ManuallyDrop::take(&mut self.ctx) };
71         ctx.set_constant_buffer(0, &[]);
72         self.dev.recycle_context(ctx);
73     }
74 }
75 
76 struct QueueState {
77     pending: Vec<Arc<Event>>,
78     last: Weak<Event>,
79     // `Sync` on `Sender` was stabilized in 1.72, until then, put it into our Mutex.
80     // see https://github.com/rust-lang/rust/commit/5f56956b3c7edb9801585850d1f41b0aeb1888ff
81     chan_in: mpsc::Sender<Vec<Arc<Event>>>,
82 }
83 
84 pub struct Queue {
85     pub base: CLObjectBase<CL_INVALID_COMMAND_QUEUE>,
86     pub context: Arc<Context>,
87     pub device: &'static Device,
88     pub props: cl_command_queue_properties,
89     pub props_v2: Option<Properties<cl_queue_properties>>,
90     state: Mutex<QueueState>,
91     _thrd: JoinHandle<()>,
92 }
93 
94 impl_cl_type_trait!(cl_command_queue, Queue, CL_INVALID_COMMAND_QUEUE);
95 
flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext)96 fn flush_events(evs: &mut Vec<Arc<Event>>, pipe: &PipeContext) {
97     if !evs.is_empty() {
98         pipe.flush().wait();
99         evs.drain(..).for_each(|e| e.signal());
100     }
101 }
102 
103 impl Queue {
new( context: Arc<Context>, device: &'static Device, props: cl_command_queue_properties, props_v2: Option<Properties<cl_queue_properties>>, ) -> CLResult<Arc<Queue>>104     pub fn new(
105         context: Arc<Context>,
106         device: &'static Device,
107         props: cl_command_queue_properties,
108         props_v2: Option<Properties<cl_queue_properties>>,
109     ) -> CLResult<Arc<Queue>> {
110         // we assume that memory allocation is the only possible failure. Any other failure reason
111         // should be detected earlier (e.g.: checking for CAPs).
112         let ctx = QueueContext::new_for(device)?;
113         let (tx_q, rx_t) = mpsc::channel::<Vec<Arc<Event>>>();
114         Ok(Arc::new(Self {
115             base: CLObjectBase::new(RusticlTypes::Queue),
116             context: context,
117             device: device,
118             props: props,
119             props_v2: props_v2,
120             state: Mutex::new(QueueState {
121                 pending: Vec::new(),
122                 last: Weak::new(),
123                 chan_in: tx_q,
124             }),
125             _thrd: thread::Builder::new()
126                 .name("rusticl queue thread".into())
127                 .spawn(move || {
128                     // Track the error of all executed events. This is only needed for in-order
129                     // queues, so for out of order we'll need to update this.
130                     // Also, the OpenCL specification gives us enough freedom to do whatever we want
131                     // in case of any event running into an error while executing:
132                     //
133                     //   Unsuccessful completion results in abnormal termination of the command
134                     //   which is indicated by setting the event status to a negative value. In this
135                     //   case, the command-queue associated with the abnormally terminated command
136                     //   and all other command-queues in the same context may no longer be available
137                     //   and their behavior is implementation-defined.
138                     //
139                     // TODO: use pipe_context::set_device_reset_callback to get notified about gone
140                     //       GPU contexts
141                     let mut last_err = CL_SUCCESS as cl_int;
142                     loop {
143                         let r = rx_t.recv();
144                         if r.is_err() {
145                             break;
146                         }
147 
148                         let new_events = r.unwrap();
149                         let mut flushed = Vec::new();
150 
151                         for e in new_events {
152                             // If we hit any deps from another queue, flush so we don't risk a dead
153                             // lock.
154                             if e.deps.iter().any(|ev| ev.queue != e.queue) {
155                                 flush_events(&mut flushed, &ctx);
156                             }
157 
158                             // check if any dependency has an error
159                             for dep in &e.deps {
160                                 // We have to wait on user events or events from other queues.
161                                 let dep_err = if dep.is_user() || dep.queue != e.queue {
162                                     dep.wait()
163                                 } else {
164                                     dep.status()
165                                 };
166 
167                                 last_err = cmp::min(last_err, dep_err);
168                             }
169 
170                             if last_err < 0 {
171                                 // If a dependency failed, fail this event as well.
172                                 e.set_user_status(last_err);
173                                 continue;
174                             }
175 
176                             // if there is an execution error don't bother signaling it as the  context
177                             // might be in a broken state. How queues behave after any event hit an
178                             // error is entirely implementation defined.
179                             last_err = e.call(&ctx);
180                             if last_err < 0 {
181                                 continue;
182                             }
183 
184                             if e.is_user() {
185                                 // On each user event we flush our events as application might
186                                 // wait on them before signaling user events.
187                                 flush_events(&mut flushed, &ctx);
188 
189                                 // Wait on user events as they are synchronization points in the
190                                 // application's control.
191                                 e.wait();
192                             } else if Platform::dbg().sync_every_event {
193                                 flushed.push(e);
194                                 flush_events(&mut flushed, &ctx);
195                             } else {
196                                 flushed.push(e);
197                             }
198                         }
199 
200                         flush_events(&mut flushed, &ctx);
201                     }
202                 })
203                 .unwrap(),
204         }))
205     }
206 
queue(&self, e: Arc<Event>)207     pub fn queue(&self, e: Arc<Event>) {
208         if self.is_profiling_enabled() {
209             e.set_time(EventTimes::Queued, self.device.screen().get_timestamp());
210         }
211         self.state.lock().unwrap().pending.push(e);
212     }
213 
flush(&self, wait: bool) -> CLResult<()>214     pub fn flush(&self, wait: bool) -> CLResult<()> {
215         let mut state = self.state.lock().unwrap();
216         let events = mem::take(&mut state.pending);
217         let mut queues = Event::deep_unflushed_queues(&events);
218 
219         // Update last if and only if we get new events, this prevents breaking application code
220         // doing things like `clFlush(q); clFinish(q);`
221         if let Some(last) = events.last() {
222             state.last = Arc::downgrade(last);
223 
224             // This should never ever error, but if it does return an error
225             state
226                 .chan_in
227                 .send(events)
228                 .map_err(|_| CL_OUT_OF_HOST_MEMORY)?;
229         }
230 
231         let last = wait.then(|| state.last.clone());
232 
233         // We have to unlock before actually flushing otherwise we'll run into dead locks when a
234         // queue gets flushed concurrently.
235         drop(state);
236 
237         // We need to flush out other queues implicitly and this _has_ to happen after taking the
238         // pending events, otherwise we'll risk dead locks when waiting on events.
239         queues.remove(self);
240         for q in queues {
241             q.flush(false)?;
242         }
243 
244         if let Some(last) = last {
245             // Waiting on the last event is good enough here as the queue will process it in order
246             // It's not a problem if the weak ref is invalid as that means the work is already done
247             // and waiting isn't necessary anymore.
248             last.upgrade().map(|e| e.wait());
249         }
250         Ok(())
251     }
252 
is_profiling_enabled(&self) -> bool253     pub fn is_profiling_enabled(&self) -> bool {
254         (self.props & (CL_QUEUE_PROFILING_ENABLE as u64)) != 0
255     }
256 }
257 
258 impl Drop for Queue {
drop(&mut self)259     fn drop(&mut self) {
260         // when deleting the application side object, we have to flush
261         // From the OpenCL spec:
262         // clReleaseCommandQueue performs an implicit flush to issue any previously queued OpenCL
263         // commands in command_queue.
264         // TODO: maybe we have to do it on every release?
265         let _ = self.flush(true);
266     }
267 }
268