// Copyright 2020 The ChromiumOS Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. // This file makes several casts from u8 pointers into more-aligned pointer types. // We assume that the kernel will give us suitably aligned memory. #![allow(clippy::cast_ptr_alignment)] use std::collections::BTreeMap; use std::fs::File; use std::io; use std::os::unix::io::AsRawFd; use std::os::unix::io::FromRawFd; use std::os::unix::io::RawFd; use std::pin::Pin; use std::ptr::null; use std::sync::atomic::AtomicPtr; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; use base::AsRawDescriptor; use base::EventType; use base::IoBufMut; use base::MappedRegion; use base::MemoryMapping; use base::MemoryMappingBuilder; use base::Protection; use base::RawDescriptor; use libc::c_void; use remain::sorted; use sync::Mutex; use thiserror::Error as ThisError; use crate::bindings::*; use crate::syscalls::*; /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is /// for callers to identify each request. pub type UserData = u64; #[sorted] #[derive(Debug, ThisError)] pub enum Error { /// Failed to map the completion ring. #[error("Failed to mmap completion ring {0}")] MappingCompleteRing(base::MmapError), /// Failed to map submit entries. #[error("Failed to mmap submit entries {0}")] MappingSubmitEntries(base::MmapError), /// Failed to map the submit ring. #[error("Failed to mmap submit ring {0}")] MappingSubmitRing(base::MmapError), /// Too many ops are already queued. #[error("No space for more ring entries, try increasing the size passed to `new`")] NoSpace, /// The call to `io_uring_enter` failed with the given errno. #[error("Failed to enter io uring: {0}")] RingEnter(libc::c_int), /// The call to `io_uring_register` failed with the given errno. #[error("Failed to register operations for io uring: {0}")] RingRegister(libc::c_int), /// The call to `io_uring_setup` failed with the given errno. #[error("Failed to setup io uring {0}")] Setup(libc::c_int), } pub type Result = std::result::Result; impl From for io::Error { fn from(e: Error) -> Self { use Error::*; match e { RingEnter(errno) => io::Error::from_raw_os_error(errno), Setup(errno) => io::Error::from_raw_os_error(errno), e => io::Error::new(io::ErrorKind::Other, e), } } } pub struct SubmitQueue { submit_ring: SubmitQueueState, submit_queue_entries: SubmitQueueEntries, submitting: usize, // The number of ops in the process of being submitted. pub added: usize, // The number of ops added since the last call to `io_uring_enter`. num_sqes: usize, // The total number of sqes allocated in shared memory. } // Helper functions to set io_uring_sqe bindgen union members in a less verbose manner. impl io_uring_sqe { pub fn set_addr(&mut self, val: u64) { self.__bindgen_anon_2.addr = val; } pub fn set_off(&mut self, val: u64) { self.__bindgen_anon_1.off = val; } pub fn set_buf_index(&mut self, val: u16) { self.__bindgen_anon_4.buf_index = val; } pub fn set_rw_flags(&mut self, val: libc::c_int) { self.__bindgen_anon_3.rw_flags = val; } pub fn set_poll_events(&mut self, val: u32) { let val = if cfg!(target_endian = "big") { // Swap words on big-endian platforms to match the original ABI where poll_events was 16 // bits wide. val.rotate_left(16) } else { val }; self.__bindgen_anon_3.poll32_events = val; } } // Convert a file offset to the raw io_uring offset format. // Some => explicit offset // None => use current file position fn file_offset_to_raw_offset(offset: Option) -> u64 { // File offsets are interpretted as off64_t inside io_uring, with -1 representing the current // file position. const USE_CURRENT_FILE_POS: libc::off64_t = -1; offset.unwrap_or(USE_CURRENT_FILE_POS as u64) } impl SubmitQueue { // Call `f` with the next available sqe or return an error if none are available. // After `f` returns, the sqe is appended to the kernel's queue. fn prep_next_sqe(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe), { if self.added == self.num_sqes { return Err(Error::NoSpace); } // Find the next free submission entry in the submit ring and fill it with an iovec. // The below raw pointer derefs are safe because the memory the pointers use lives as long // as the mmap in self. let tail = self.submit_ring.pointers.tail(Ordering::Relaxed); let next_tail = tail.wrapping_add(1); if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) { return Err(Error::NoSpace); } // `tail` is the next sqe to use. let index = (tail & self.submit_ring.ring_mask) as usize; let sqe = self.submit_queue_entries.get_mut(index).unwrap(); f(sqe); // Tells the kernel to use the new index when processing the entry at that index. self.submit_ring.set_array_entry(index, index as u32); // Ensure the above writes to sqe are seen before the tail is updated. // set_tail uses Release ordering when storing to the ring. self.submit_ring.pointers.set_tail(next_tail); self.added += 1; Ok(()) } // Returns the number of entries that have been added to this SubmitQueue since the last time // `prepare_submit` was called. fn prepare_submit(&mut self) -> usize { let out = self.added - self.submitting; self.submitting = self.added; out } // Indicates that we failed to submit `count` entries to the kernel and that they should be // retried. fn fail_submit(&mut self, count: usize) { debug_assert!(count <= self.submitting); self.submitting -= count; } // Indicates that `count` entries have been submitted to the kernel and so the space may be // reused for new entries. fn complete_submit(&mut self, count: usize) { debug_assert!(count <= self.submitting); self.submitting -= count; self.added -= count; } } /// Enum to represent all io_uring operations #[repr(u32)] pub enum URingOperation { Nop = io_uring_op_IORING_OP_NOP, Readv = io_uring_op_IORING_OP_READV, Writev = io_uring_op_IORING_OP_WRITEV, Fsync = io_uring_op_IORING_OP_FSYNC, ReadFixed = io_uring_op_IORING_OP_READ_FIXED, WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED, PollAdd = io_uring_op_IORING_OP_POLL_ADD, PollRemove = io_uring_op_IORING_OP_POLL_REMOVE, SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE, Sendmsg = io_uring_op_IORING_OP_SENDMSG, Recvmsg = io_uring_op_IORING_OP_RECVMSG, Timeout = io_uring_op_IORING_OP_TIMEOUT, TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE, Accept = io_uring_op_IORING_OP_ACCEPT, AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL, LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT, Connect = io_uring_op_IORING_OP_CONNECT, Fallocate = io_uring_op_IORING_OP_FALLOCATE, Openat = io_uring_op_IORING_OP_OPENAT, Close = io_uring_op_IORING_OP_CLOSE, FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE, Statx = io_uring_op_IORING_OP_STATX, Read = io_uring_op_IORING_OP_READ, Write = io_uring_op_IORING_OP_WRITE, Fadvise = io_uring_op_IORING_OP_FADVISE, Madvise = io_uring_op_IORING_OP_MADVISE, Send = io_uring_op_IORING_OP_SEND, Recv = io_uring_op_IORING_OP_RECV, Openat2 = io_uring_op_IORING_OP_OPENAT2, EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL, Splice = io_uring_op_IORING_OP_SPLICE, ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS, RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS, Tee = io_uring_op_IORING_OP_TEE, Shutdown = io_uring_op_IORING_OP_SHUTDOWN, Renameat = io_uring_op_IORING_OP_RENAMEAT, Unlinkat = io_uring_op_IORING_OP_UNLINKAT, Mkdirat = io_uring_op_IORING_OP_MKDIRAT, Symlinkat = io_uring_op_IORING_OP_SYMLINKAT, Linkat = io_uring_op_IORING_OP_LINKAT, } /// Represents an allowlist of the restrictions to be registered to a uring. #[derive(Default)] pub struct URingAllowlist(Vec); impl URingAllowlist { /// Create a new `UringAllowList` which allows no operation. pub fn new() -> Self { URingAllowlist::default() } /// Allow `operation` to be submitted to the submit queue of the io_uring. pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self { self.0.push(io_uring_restriction { opcode: IORING_RESTRICTION_SQE_OP as u16, __bindgen_anon_1: io_uring_restriction__bindgen_ty_1 { sqe_op: operation as u8, }, ..Default::default() }); self } } /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations /// to the kernel and asynchronously handling the completion of these operations. /// Use the various `add_*` functions to configure operations, then call `wait` to start /// the operations and get any completed results. Each op is given a u64 user_data argument that is /// used to identify the result when returned in the iterator provided by `wait`. /// /// # Example polling an FD for readable status. /// /// ```no_run /// # use std::fs::File; /// # use std::os::unix::io::AsRawFd; /// # use std::path::Path; /// # use base::EventType; /// # use io_uring::URingContext; /// let f = File::open(Path::new("/dev/zero")).unwrap(); /// let uring = URingContext::new(16, None).unwrap(); /// uring /// .add_poll_fd(f.as_raw_fd(), EventType::Read, 454) /// .unwrap(); /// let (user_data, res) = uring.wait().unwrap().next().unwrap(); /// assert_eq!(user_data, 454 as io_uring::UserData); /// assert_eq!(res.unwrap(), 1 as u32); /// ``` pub struct URingContext { ring_file: File, // Holds the io_uring context FD returned from io_uring_setup. pub submit_ring: Mutex, pub complete_ring: CompleteQueueState, } impl URingContext { /// Creates a `URingContext` where the underlying uring has a space for `num_entries` /// simultaneous operations. If `allowlist` is given, all operations other /// than those explicitly permitted by `allowlist` are prohibited. pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result { let mut ring_params = io_uring_params::default(); if allowlist.is_some() { // To register restrictions, a uring must start in a disabled state. ring_params.flags |= IORING_SETUP_R_DISABLED; } // SAFETY: // The below unsafe block isolates the creation of the URingContext. Each step on it's own // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for // base addresses maintains safety guarantees assuming the kernel API guarantees are // trusted. unsafe { // Safe because the kernel is trusted to only modify params and `File` is created with // an FD that it takes complete ownership of. let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?; let ring_file = File::from_raw_fd(fd); // Register the restrictions if it's given if let Some(restrictions) = allowlist { // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and // `restrictions` contains a valid pointer and length. io_uring_register( fd, IORING_REGISTER_RESTRICTIONS, restrictions.0.as_ptr() as *const c_void, restrictions.0.len() as u32, ) .map_err(Error::RingRegister)?; // enables the URingContext since it was started in a disabled state. // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory io_uring_register(fd, IORING_REGISTER_ENABLE_RINGS, null::(), 0) .map_err(Error::RingRegister)?; } // Mmap the submit and completion queues. // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error // is checked. let submit_ring = SubmitQueueState::new( MemoryMappingBuilder::new( ring_params.sq_off.array as usize + ring_params.sq_entries as usize * std::mem::size_of::(), ) .from_file(&ring_file) .offset(u64::from(IORING_OFF_SQ_RING)) .protection(Protection::read_write()) .populate() .build() .map_err(Error::MappingSubmitRing)?, &ring_params, ); let num_sqe = ring_params.sq_entries as usize; let submit_queue_entries = SubmitQueueEntries { mmap: MemoryMappingBuilder::new( ring_params.sq_entries as usize * std::mem::size_of::(), ) .from_file(&ring_file) .offset(u64::from(IORING_OFF_SQES)) .protection(Protection::read_write()) .populate() .build() .map_err(Error::MappingSubmitEntries)?, len: num_sqe, }; let complete_ring = CompleteQueueState::new( MemoryMappingBuilder::new( ring_params.cq_off.cqes as usize + ring_params.cq_entries as usize * std::mem::size_of::(), ) .from_file(&ring_file) .offset(u64::from(IORING_OFF_CQ_RING)) .protection(Protection::read_write()) .populate() .build() .map_err(Error::MappingCompleteRing)?, &ring_params, ); Ok(URingContext { ring_file, submit_ring: Mutex::new(SubmitQueue { submit_ring, submit_queue_entries, submitting: 0, added: 0, num_sqes: ring_params.sq_entries as usize, }), complete_ring, }) } } /// # Safety /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in /// existence. pub unsafe fn add_writev_iter( &self, iovecs: I, fd: RawFd, offset: Option, user_data: UserData, ) -> Result<()> where I: Iterator, { self.add_writev( Pin::from( // Safe because the caller is required to guarantee that the memory pointed to by // `iovecs` lives until the transaction is complete and the completion has been // returned from `wait()`. iovecs .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) .collect::>() .into_boxed_slice(), ), fd, offset, user_data, ) } /// Asynchronously writes to `fd` from the addresses given in `iovecs`. /// # Safety /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller /// guarantees there are no other references to that memory and that the memory lives until the /// transaction is complete and that completion has been returned from the `wait` function. In /// addition there must not be any mutable references to the data pointed to by `iovecs` until /// the operation completes. Ensure that the fd remains open until the op completes as well. /// The iovecs reference must be kept alive until the op returns. pub unsafe fn add_writev( &self, iovecs: Pin]>>, fd: RawFd, offset: Option, user_data: UserData, ) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8; sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64); sqe.len = iovecs.len() as u32; sqe.set_off(file_offset_to_raw_offset(offset)); sqe.set_buf_index(0); sqe.ioprio = 0; sqe.user_data = user_data; sqe.flags = 0; sqe.fd = fd; })?; self.complete_ring.add_op_data(user_data, iovecs); Ok(()) } /// # Safety /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in /// existence. pub unsafe fn add_readv_iter( &self, iovecs: I, fd: RawFd, offset: Option, user_data: UserData, ) -> Result<()> where I: Iterator, { self.add_readv( Pin::from( // Safe because the caller is required to guarantee that the memory pointed to by // `iovecs` lives until the transaction is complete and the completion has been // returned from `wait()`. iovecs .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) .collect::>() .into_boxed_slice(), ), fd, offset, user_data, ) } /// Asynchronously reads from `fd` to the addresses given in `iovecs`. /// # Safety /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller /// guarantees there are no other references to that memory and that the memory lives until the /// transaction is complete and that completion has been returned from the `wait` function. In /// addition there must not be any references to the data pointed to by `iovecs` until the /// operation completes. Ensure that the fd remains open until the op completes as well. /// The iovecs reference must be kept alive until the op returns. pub unsafe fn add_readv( &self, iovecs: Pin]>>, fd: RawFd, offset: Option, user_data: UserData, ) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_READV as u8; sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64); sqe.len = iovecs.len() as u32; sqe.set_off(file_offset_to_raw_offset(offset)); sqe.set_buf_index(0); sqe.ioprio = 0; sqe.user_data = user_data; sqe.flags = 0; sqe.fd = fd; })?; self.complete_ring.add_op_data(user_data, iovecs); Ok(()) } /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the /// io_uring itself and for waking up a thread that's blocked inside a wait() call. pub fn add_nop(&self, user_data: UserData) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_NOP as u8; sqe.fd = -1; sqe.user_data = user_data; sqe.set_addr(0); sqe.len = 0; sqe.set_off(0); sqe.set_buf_index(0); sqe.set_rw_flags(0); sqe.ioprio = 0; sqe.flags = 0; }) } /// Syncs all completed operations, the ordering with in-flight async ops is not /// defined. pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8; sqe.fd = fd; sqe.user_data = user_data; sqe.set_addr(0); sqe.len = 0; sqe.set_off(0); sqe.set_buf_index(0); sqe.set_rw_flags(0); sqe.ioprio = 0; sqe.flags = 0; }) } /// See the usage of `fallocate`, this asynchronously performs the same operations. pub fn add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()> { // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the // len field. self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8; sqe.fd = fd; sqe.set_addr(len); sqe.len = mode; sqe.set_off(offset); sqe.user_data = user_data; sqe.set_buf_index(0); sqe.set_rw_flags(0); sqe.ioprio = 0; sqe.flags = 0; }) } /// Adds an FD to be polled based on the given flags. /// The user must keep the FD open until the operation completion is returned from /// `wait`. /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added /// to get future events. pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8; sqe.fd = fd; sqe.user_data = user_data; sqe.set_poll_events(events.into()); sqe.set_addr(0); sqe.len = 0; sqe.set_off(0); sqe.set_buf_index(0); sqe.ioprio = 0; sqe.flags = 0; }) } /// Removes an FD that was previously added with `add_poll_fd`. pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8; sqe.fd = fd; sqe.user_data = user_data; sqe.set_poll_events(events.into()); sqe.set_addr(0); sqe.len = 0; sqe.set_off(0); sqe.set_buf_index(0); sqe.ioprio = 0; sqe.flags = 0; }) } /// Attempt to cancel an already issued request. addr must contain the user_data field of the /// request that should be cancelled. The cancellation request will complete with one of the /// following results codes. If found, the res field of the cqe will contain 0. If not found, /// res will contain -ENOENT. If found and attempted cancelled, the res field will contain /// -EALREADY. In this case, the request may or may not terminate. In general, requests that /// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be /// cancelled if already started. pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> { self.submit_ring.lock().prep_next_sqe(|sqe| { sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8; sqe.user_data = user_data; sqe.set_addr(addr); sqe.len = 0; sqe.fd = 0; sqe.set_off(0); sqe.set_buf_index(0); sqe.ioprio = 0; sqe.flags = 0; }) } // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and // waiting for `wait_nr` operations to complete. fn enter(&self, wait_nr: u64) -> Result<()> { let added = self.submit_ring.lock().prepare_submit(); if added == 0 && wait_nr == 0 { return Ok(()); } let flags = if wait_nr > 0 { IORING_ENTER_GETEVENTS } else { 0 }; let res = // SAFETY: // Safe because the only memory modified is in the completion queue. unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) }; // An EINTR means we did successfully submit the events. if res.is_ok() || res == Err(libc::EINTR) { self.submit_ring.lock().complete_submit(added); } else { self.submit_ring.lock().fail_submit(added); } match res { Ok(()) => Ok(()), // EBUSY means that some completed events need to be processed before more can // be submitted, so wait for some sqes to finish without submitting new ones. // EINTR means we were interrupted while waiting, so start waiting again. Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => { loop { let res = // SAFETY: // Safe because the only memory modified is in the completion queue. unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) }; if res != Err(libc::EINTR) { return res.map_err(Error::RingEnter); } } } Err(e) => Err(Error::RingEnter(e)), } } /// Sends operations added with the `add_*` functions to the kernel. pub fn submit(&self) -> Result<()> { self.enter(0) } /// Sends operations added with the `add_*` functions to the kernel and return an iterator to /// any completed operations. `wait` blocks until at least one completion is ready. If /// called without any new events added, this simply waits for any existing events to /// complete and returns as soon an one or more is ready. pub fn wait(&self) -> Result)> + '_> { // We only want to wait for events if there aren't already events in the completion queue. let wait_nr = if self.complete_ring.num_ready() > 0 { 0 } else { 1 }; // The CompletionQueue will iterate all completed ops. match self.enter(wait_nr) { Ok(()) => Ok(&self.complete_ring), // If we cannot submit any more entries then we need to pull stuff out of the completion // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so // we know there are already entries in the completion queue. Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring), Err(e) => Err(e), } } } impl AsRawFd for URingContext { fn as_raw_fd(&self) -> RawFd { self.ring_file.as_raw_fd() } } impl AsRawDescriptor for URingContext { fn as_raw_descriptor(&self) -> RawDescriptor { self.ring_file.as_raw_descriptor() } } struct SubmitQueueEntries { mmap: MemoryMapping, len: usize, } impl SubmitQueueEntries { fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> { if index >= self.len { return None; } // SAFETY: // Safe because the mut borrow of self resticts to one mutable reference at a time and // we trust that the kernel has returned enough memory in io_uring_setup and mmap. let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) }; // Clear any state. *mut_ref = io_uring_sqe::default(); Some(mut_ref) } } struct SubmitQueueState { _mmap: MemoryMapping, pointers: QueuePointers, ring_mask: u32, array: AtomicPtr, } impl SubmitQueueState { // # Safety // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is // the params struct passed to io_uring_setup. unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState { let ptr = mmap.as_ptr(); // Transmutes are safe because a u32 is atomic on all supported architectures and the // pointer will live until after self is dropped because the mmap is owned. let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32; let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32; // This offset is guaranteed to be within the mmap so unwrap the result. let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap(); let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32); SubmitQueueState { _mmap: mmap, pointers: QueuePointers { head, tail }, ring_mask, array, } } // Sets the kernel's array entry at the given `index` to `value`. fn set_array_entry(&self, index: usize, value: u32) { // SAFETY: // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to written. unsafe { std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value); } } } #[derive(Default)] struct CompleteQueueData { //For ops that pass in arrays of iovecs, they need to be valid for the duration of the //operation because the kernel might read them at any time. pending_op_addrs: BTreeMap]>>>, } pub struct CompleteQueueState { mmap: MemoryMapping, pointers: QueuePointers, ring_mask: u32, cqes_offset: u32, data: Mutex, } impl CompleteQueueState { /// # Safety /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is /// the params struct passed to io_uring_setup. unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState { let ptr = mmap.as_ptr(); let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32; let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32; let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap(); CompleteQueueState { mmap, pointers: QueuePointers { head, tail }, ring_mask, cqes_offset: params.cq_off.cqes, data: Default::default(), } } fn add_op_data(&self, user_data: UserData, addrs: Pin]>>) { self.data.lock().pending_op_addrs.insert(user_data, addrs); } fn get_cqe(&self, head: u32) -> &io_uring_cqe { // SAFETY: // Safe because we trust that the kernel has returned enough memory in io_uring_setup // and mmap and index is checked within range by the ring_mask. unsafe { let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize) as *const io_uring_cqe; let index = head & self.ring_mask; &*cqes.add(index as usize) } } pub fn num_ready(&self) -> u32 { let tail = self.pointers.tail(Ordering::Acquire); let head = self.pointers.head(Ordering::Relaxed); tail.saturating_sub(head) } fn pop_front(&self) -> Option<(UserData, std::io::Result)> { // Take the lock on self.data first so that 2 threads don't try to pop the same completed op // from the queue. let mut data = self.data.lock(); // Safe because the pointers to the atomics are valid and the cqe must be in range // because the kernel provided mask is applied to the index. let head = self.pointers.head(Ordering::Relaxed); // Synchronize the read of tail after the read of head. if head == self.pointers.tail(Ordering::Acquire) { return None; } let cqe = self.get_cqe(head); let user_data = cqe.user_data; let res = cqe.res; // free the addrs saved for this op. let _ = data.pending_op_addrs.remove(&user_data); // Store the new head and ensure the reads above complete before the kernel sees the // update to head, `set_head` uses `Release` ordering let new_head = head.wrapping_add(1); self.pointers.set_head(new_head); let io_res = match res { r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)), r => Ok(r as u32), }; Some((user_data, io_res)) } } // Return the completed ops with their result. impl<'c> Iterator for &'c CompleteQueueState { type Item = (UserData, std::io::Result); fn next(&mut self) -> Option { self.pop_front() } } struct QueuePointers { head: *const AtomicU32, tail: *const AtomicU32, } // SAFETY: // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's // safe to send the pointers between threads or access them concurrently from multiple threads. unsafe impl Send for QueuePointers {} // SAFETY: See safety comments for impl Send unsafe impl Sync for QueuePointers {} impl QueuePointers { // Loads the tail pointer atomically with the given ordering. fn tail(&self, ordering: Ordering) -> u32 { // SAFETY: // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to read. unsafe { (*self.tail).load(ordering) } } // Stores the new value of the tail in the submit queue. This allows the kernel to start // processing entries that have been added up until the given tail pointer. // Always stores with release ordering as that is the only valid way to use the pointer. fn set_tail(&self, next_tail: u32) { // SAFETY: // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to read and it's used as an atomic to cover mutability concerns. unsafe { (*self.tail).store(next_tail, Ordering::Release) } } // Loads the head pointer atomically with the given ordering. fn head(&self, ordering: Ordering) -> u32 { // SAFETY: // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to read. unsafe { (*self.head).load(ordering) } } // Stores the new value of the head in the submit queue. This allows the kernel to start // processing entries that have been added up until the given head pointer. // Always stores with release ordering as that is the only valid way to use the pointer. fn set_head(&self, next_head: u32) { // SAFETY: // Safe because self being constructed from the correct mmap guaratees that the memory is // valid to read and it's used as an atomic to cover mutability concerns. unsafe { (*self.head).store(next_head, Ordering::Release) } } }