1 use std::ffi::OsStr;
2 use std::io::{self, Read, Write};
3 use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
4 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
5 use std::sync::atomic::{AtomicBool, AtomicUsize};
6 use std::sync::{Arc, Mutex};
7 use std::{fmt, mem, slice};
8 
9 use windows_sys::Win32::Foundation::{
10     ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
11     ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
12 };
13 use windows_sys::Win32::Storage::FileSystem::{
14     ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
15 };
16 use windows_sys::Win32::System::Pipes::{
17     ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
18     PIPE_UNLIMITED_INSTANCES,
19 };
20 use windows_sys::Win32::System::IO::{
21     CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
22 };
23 
24 use crate::event::Source;
25 use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
26 use crate::sys::windows::{Event, Handle, Overlapped};
27 use crate::Registry;
28 use crate::{Interest, Token};
29 
30 /// Non-blocking windows named pipe.
31 ///
32 /// This structure internally contains a `HANDLE` which represents the named
33 /// pipe, and also maintains state associated with the mio event loop and active
34 /// I/O operations that have been scheduled to translate IOCP to a readiness
35 /// model.
36 ///
37 /// Note, IOCP is a *completion* based model whereas mio is a *readiness* based
38 /// model. To bridge this, `NamedPipe` performs internal buffering. Writes are
39 /// written to an internal buffer and the buffer is submitted to IOCP. IOCP
40 /// reads are submitted using internal buffers and `NamedPipe::read` reads from
41 /// this internal buffer.
42 ///
43 /// # Trait implementations
44 ///
45 /// The `Read` and `Write` traits are implemented for `NamedPipe` and for
46 /// `&NamedPipe`. This represents that a named pipe can be concurrently read and
47 /// written to and also can be read and written to at all. Typically a named
48 /// pipe needs to be connected to a client before it can be read or written,
49 /// however.
50 ///
51 /// Note that for I/O operations on a named pipe to succeed then the named pipe
52 /// needs to be associated with an event loop. Until this happens all I/O
53 /// operations will return a "would block" error.
54 ///
55 /// # Managing connections
56 ///
57 /// The `NamedPipe` type supports a `connect` method to connect to a client and
58 /// a `disconnect` method to disconnect from that client. These two methods only
59 /// work once a named pipe is associated with an event loop.
60 ///
61 /// The `connect` method will succeed asynchronously and a completion can be
62 /// detected once the object receives a writable notification.
63 ///
64 /// # Named pipe clients
65 ///
66 /// Currently to create a client of a named pipe server then you can use the
67 /// `OpenOptions` type in the standard library to create a `File` that connects
68 /// to a named pipe. Afterwards you can use the `into_raw_handle` method coupled
69 /// with the `NamedPipe::from_raw_handle` method to convert that to a named pipe
70 /// that can operate asynchronously. Don't forget to pass the
71 /// `FILE_FLAG_OVERLAPPED` flag when opening the `File`.
72 pub struct NamedPipe {
73     inner: Arc<Inner>,
74 }
75 
76 /// # Notes
77 ///
78 /// The memory layout of this structure must be fixed as the
79 /// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
80 #[repr(C)]
81 struct Inner {
82     // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
83     // methods depend on the layout!
84     connect: Overlapped,
85     read: Overlapped,
86     write: Overlapped,
87     event: Overlapped,
88     // END NOTE.
89     handle: Handle,
90     connecting: AtomicBool,
91     io: Mutex<Io>,
92     pool: Mutex<BufferPool>,
93 }
94 
95 impl Inner {
96     /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
97     ///
98     /// # Unsafety
99     ///
100     /// Caller must ensure `ptr` is pointing to `Inner.connect`.
ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner101     unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
102         // `connect` is the first field, so the pointer are the same.
103         ptr.cast()
104     }
105 
106     /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner107     unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
108         // `read` is after `connect: Overlapped`.
109         (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
110     }
111 
112     /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner113     unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
114         // `write` is after `connect: Overlapped` and `read: Overlapped`.
115         (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
116     }
117 
118     /// Same as [`ptr_from_conn_overlapped`] but for `Inner.event`.
ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner119     unsafe fn ptr_from_event_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
120         // `event` is after `connect: Overlapped`, `read: Overlapped`, and `write: Overlapped`.
121         (ptr as *mut Overlapped).wrapping_sub(3) as *const Inner
122     }
123 
124     /// Issue a connection request with the specified overlapped operation.
125     ///
126     /// This function will issue a request to connect a client to this server,
127     /// returning immediately after starting the overlapped operation.
128     ///
129     /// If this function immediately succeeds then `Ok(true)` is returned. If
130     /// the overlapped operation is enqueued and pending, then `Ok(false)` is
131     /// returned. Otherwise an error is returned indicating what went wrong.
132     ///
133     /// # Unsafety
134     ///
135     /// This function is unsafe because the kernel requires that the
136     /// `overlapped` pointer is valid until the end of the I/O operation. The
137     /// kernel also requires that `overlapped` is unique for this I/O operation
138     /// and is not in use for any other I/O.
139     ///
140     /// To safely use this function callers must ensure that this pointer is
141     /// valid until the I/O operation is completed, typically via completion
142     /// ports and waiting to receive the completion notification on the port.
connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool>143     pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
144         if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
145             return Ok(true);
146         }
147 
148         let err = io::Error::last_os_error();
149 
150         match err.raw_os_error().map(|e| e as u32) {
151             Some(ERROR_PIPE_CONNECTED) => Ok(true),
152             Some(ERROR_NO_DATA) => Ok(true),
153             Some(ERROR_IO_PENDING) => Ok(false),
154             _ => Err(err),
155         }
156     }
157 
158     /// Disconnects this named pipe from any connected client.
disconnect(&self) -> io::Result<()>159     pub fn disconnect(&self) -> io::Result<()> {
160         if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
161             Err(io::Error::last_os_error())
162         } else {
163             Ok(())
164         }
165     }
166 
167     /// Issues an overlapped read operation to occur on this pipe.
168     ///
169     /// This function will issue an asynchronous read to occur in an overlapped
170     /// fashion, returning immediately. The `buf` provided will be filled in
171     /// with data and the request is tracked by the `overlapped` function
172     /// provided.
173     ///
174     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
175     /// `n` is the number of bytes read. If an asynchronous operation is
176     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
177     /// it is returned.
178     ///
179     /// When this operation completes (or if it completes immediately), another
180     /// mechanism must be used to learn how many bytes were transferred (such as
181     /// looking at the filed in the IOCP status message).
182     ///
183     /// # Unsafety
184     ///
185     /// This function is unsafe because the kernel requires that the `buf` and
186     /// `overlapped` pointers to be valid until the end of the I/O operation.
187     /// The kernel also requires that `overlapped` is unique for this I/O
188     /// operation and is not in use for any other I/O.
189     ///
190     /// To safely use this function callers must ensure that the pointers are
191     /// valid until the I/O operation is completed, typically via completion
192     /// ports and waiting to receive the completion notification on the port.
read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>193     pub unsafe fn read_overlapped(
194         &self,
195         buf: &mut [u8],
196         overlapped: *mut OVERLAPPED,
197     ) -> io::Result<Option<usize>> {
198         let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
199         let res = ReadFile(
200             self.handle.raw(),
201             buf.as_mut_ptr() as *mut _,
202             len,
203             std::ptr::null_mut(),
204             overlapped,
205         );
206         if res == 0 {
207             let err = io::Error::last_os_error();
208             if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
209                 return Err(err);
210             }
211         }
212 
213         let mut bytes = 0;
214         let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
215         if res == 0 {
216             let err = io::Error::last_os_error();
217             if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
218                 Ok(None)
219             } else {
220                 Err(err)
221             }
222         } else {
223             Ok(Some(bytes as usize))
224         }
225     }
226 
227     /// Issues an overlapped write operation to occur on this pipe.
228     ///
229     /// This function will issue an asynchronous write to occur in an overlapped
230     /// fashion, returning immediately. The `buf` provided will be filled in
231     /// with data and the request is tracked by the `overlapped` function
232     /// provided.
233     ///
234     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
235     /// `n` is the number of bytes written. If an asynchronous operation is
236     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
237     /// it is returned.
238     ///
239     /// When this operation completes (or if it completes immediately), another
240     /// mechanism must be used to learn how many bytes were transferred (such as
241     /// looking at the filed in the IOCP status message).
242     ///
243     /// # Unsafety
244     ///
245     /// This function is unsafe because the kernel requires that the `buf` and
246     /// `overlapped` pointers to be valid until the end of the I/O operation.
247     /// The kernel also requires that `overlapped` is unique for this I/O
248     /// operation and is not in use for any other I/O.
249     ///
250     /// To safely use this function callers must ensure that the pointers are
251     /// valid until the I/O operation is completed, typically via completion
252     /// ports and waiting to receive the completion notification on the port.
write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>253     pub unsafe fn write_overlapped(
254         &self,
255         buf: &[u8],
256         overlapped: *mut OVERLAPPED,
257     ) -> io::Result<Option<usize>> {
258         let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
259         let res = WriteFile(
260             self.handle.raw(),
261             buf.as_ptr() as *const _,
262             len,
263             std::ptr::null_mut(),
264             overlapped,
265         );
266         if res == 0 {
267             let err = io::Error::last_os_error();
268             if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
269                 return Err(err);
270             }
271         }
272 
273         let mut bytes = 0;
274         let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
275         if res == 0 {
276             let err = io::Error::last_os_error();
277             if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
278                 Ok(None)
279             } else {
280                 Err(err)
281             }
282         } else {
283             Ok(Some(bytes as usize))
284         }
285     }
286 
287     /// Calls the `GetOverlappedResult` function to get the result of an
288     /// overlapped operation for this handle.
289     ///
290     /// This function takes the `OVERLAPPED` argument which must have been used
291     /// to initiate an overlapped I/O operation, and returns either the
292     /// successful number of bytes transferred during the operation or an error
293     /// if one occurred.
294     ///
295     /// # Unsafety
296     ///
297     /// This function is unsafe as `overlapped` must have previously been used
298     /// to execute an operation for this handle, and it must also be a valid
299     /// pointer to an `Overlapped` instance.
300     #[inline]
result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize>301     unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
302         let mut transferred = 0;
303         let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
304         if r == 0 {
305             Err(io::Error::last_os_error())
306         } else {
307             Ok(transferred as usize)
308         }
309     }
310 }
311 
312 #[test]
ptr_from()313 fn ptr_from() {
314     use std::mem::ManuallyDrop;
315     use std::ptr;
316 
317     let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
318     let inner: &Inner = &pipe.inner;
319     assert_eq!(
320         inner as *const Inner,
321         unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
322         "`ptr_from_conn_overlapped` incorrect"
323     );
324     assert_eq!(
325         inner as *const Inner,
326         unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
327         "`ptr_from_read_overlapped` incorrect"
328     );
329     assert_eq!(
330         inner as *const Inner,
331         unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
332         "`ptr_from_write_overlapped` incorrect"
333     );
334 }
335 
336 struct Io {
337     // Uniquely identifies the selector associated with this named pipe
338     cp: Option<Arc<CompletionPort>>,
339     // Token used to identify events
340     token: Option<Token>,
341     read: State,
342     write: State,
343     connect_error: Option<io::Error>,
344 }
345 
346 #[derive(Debug)]
347 enum State {
348     None,
349     Pending(Vec<u8>, usize),
350     Ok(Vec<u8>, usize),
351     Err(io::Error),
352 }
353 
354 // Odd tokens are for named pipes
355 static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(1);
356 
would_block() -> io::Error357 fn would_block() -> io::Error {
358     io::ErrorKind::WouldBlock.into()
359 }
360 
361 impl NamedPipe {
362     /// Creates a new named pipe at the specified `addr` given a "reasonable
363     /// set" of initial configuration options.
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>364     pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
365         use std::os::windows::ffi::OsStrExt;
366         let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
367 
368         // Safety: syscall
369         let h = unsafe {
370             CreateNamedPipeW(
371                 name.as_ptr(),
372                 PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
373                 PIPE_TYPE_BYTE,
374                 PIPE_UNLIMITED_INSTANCES,
375                 65536,
376                 65536,
377                 0,
378                 std::ptr::null_mut(),
379             )
380         };
381 
382         if h == INVALID_HANDLE_VALUE {
383             Err(io::Error::last_os_error())
384         } else {
385             // Safety: nothing actually unsafe about this. The trait fn includes
386             // `unsafe`.
387             Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
388         }
389     }
390 
391     /// Attempts to call `ConnectNamedPipe`, if possible.
392     ///
393     /// This function will attempt to connect this pipe to a client in an
394     /// asynchronous fashion. If the function immediately establishes a
395     /// connection to a client then `Ok(())` is returned. Otherwise if a
396     /// connection attempt was issued and is now in progress then a "would
397     /// block" error is returned.
398     ///
399     /// When the connection is finished then this object will be flagged as
400     /// being ready for a write, or otherwise in the writable state.
401     ///
402     /// # Errors
403     ///
404     /// This function will return a "would block" error if the pipe has not yet
405     /// been registered with an event loop, if the connection operation has
406     /// previously been issued but has not yet completed, or if the connect
407     /// itself was issued and didn't finish immediately.
408     ///
409     /// Normal I/O errors from the call to `ConnectNamedPipe` are returned
410     /// immediately.
connect(&self) -> io::Result<()>411     pub fn connect(&self) -> io::Result<()> {
412         // "Acquire the connecting lock" or otherwise just make sure we're the
413         // only operation that's using the `connect` overlapped instance.
414         if self.inner.connecting.swap(true, SeqCst) {
415             return Err(would_block());
416         }
417 
418         // Now that we've flagged ourselves in the connecting state, issue the
419         // connection attempt. Afterwards interpret the return value and set
420         // internal state accordingly.
421         let res = unsafe {
422             let overlapped = self.inner.connect.as_ptr() as *mut _;
423             self.inner.connect_overlapped(overlapped)
424         };
425 
426         match res {
427             // The connection operation finished immediately, so let's schedule
428             // reads/writes and such.
429             Ok(true) => {
430                 self.inner.connecting.store(false, SeqCst);
431                 Inner::post_register(&self.inner, None);
432                 Ok(())
433             }
434 
435             // If the overlapped operation was successful and didn't finish
436             // immediately then we forget a copy of the arc we hold
437             // internally. This ensures that when the completion status comes
438             // in for the I/O operation finishing it'll have a reference
439             // associated with it and our data will still be valid. The
440             // `connect_done` function will "reify" this forgotten pointer to
441             // drop the refcount on the other side.
442             Ok(false) => {
443                 mem::forget(self.inner.clone());
444                 Err(would_block())
445             }
446 
447             Err(e) => {
448                 self.inner.connecting.store(false, SeqCst);
449                 Err(e)
450             }
451         }
452     }
453 
454     /// Takes any internal error that has happened after the last I/O operation
455     /// which hasn't been retrieved yet.
456     ///
457     /// This is particularly useful when detecting failed attempts to `connect`.
458     /// After a completed `connect` flags this pipe as writable then callers
459     /// must invoke this method to determine whether the connection actually
460     /// succeeded. If this function returns `None` then a client is connected,
461     /// otherwise it returns an error of what happened and a client shouldn't be
462     /// connected.
take_error(&self) -> io::Result<Option<io::Error>>463     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
464         Ok(self.inner.io.lock().unwrap().connect_error.take())
465     }
466 
467     /// Disconnects this named pipe from a connected client.
468     ///
469     /// This function will disconnect the pipe from a connected client, if any,
470     /// transitively calling the `DisconnectNamedPipe` function.
471     ///
472     /// After a `disconnect` is issued, then a `connect` may be called again to
473     /// connect to another client.
disconnect(&self) -> io::Result<()>474     pub fn disconnect(&self) -> io::Result<()> {
475         self.inner.disconnect()
476     }
477 }
478 
479 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: RawHandle) -> NamedPipe480     unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
481         NamedPipe {
482             inner: Arc::new(Inner {
483                 handle: Handle::new(handle as HANDLE),
484                 connect: Overlapped::new(connect_done),
485                 connecting: AtomicBool::new(false),
486                 read: Overlapped::new(read_done),
487                 write: Overlapped::new(write_done),
488                 event: Overlapped::new(event_done),
489                 io: Mutex::new(Io {
490                     cp: None,
491                     token: None,
492                     read: State::None,
493                     write: State::None,
494                     connect_error: None,
495                 }),
496                 pool: Mutex::new(BufferPool::with_capacity(2)),
497             }),
498         }
499     }
500 }
501 
502 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>503     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
504         <&NamedPipe as Read>::read(&mut &*self, buf)
505     }
506 }
507 
508 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>509     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
510         <&NamedPipe as Write>::write(&mut &*self, buf)
511     }
512 
flush(&mut self) -> io::Result<()>513     fn flush(&mut self) -> io::Result<()> {
514         <&NamedPipe as Write>::flush(&mut &*self)
515     }
516 }
517 
518 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>519     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
520         let mut state = self.inner.io.lock().unwrap();
521 
522         if state.token.is_none() {
523             return Err(would_block());
524         }
525 
526         match mem::replace(&mut state.read, State::None) {
527             // In theory not possible with `token` checked above,
528             // but return would block for now.
529             State::None => Err(would_block()),
530 
531             // A read is in flight, still waiting for it to finish
532             State::Pending(buf, amt) => {
533                 state.read = State::Pending(buf, amt);
534                 Err(would_block())
535             }
536 
537             // We previously read something into `data`, try to copy out some
538             // data. If we copy out all the data schedule a new read and
539             // otherwise store the buffer to get read later.
540             State::Ok(data, cur) => {
541                 let n = {
542                     let mut remaining = &data[cur..];
543                     remaining.read(buf)?
544                 };
545                 let next = cur + n;
546                 if next != data.len() {
547                     state.read = State::Ok(data, next);
548                 } else {
549                     self.inner.put_buffer(data);
550                     Inner::schedule_read(&self.inner, &mut state, None);
551                 }
552                 Ok(n)
553             }
554 
555             // Looks like an in-flight read hit an error, return that here while
556             // we schedule a new one.
557             State::Err(e) => {
558                 Inner::schedule_read(&self.inner, &mut state, None);
559                 if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
560                     Ok(0)
561                 } else {
562                     Err(e)
563                 }
564             }
565         }
566     }
567 }
568 
569 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>570     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
571         // Make sure there's no writes pending
572         let mut io = self.inner.io.lock().unwrap();
573 
574         if io.token.is_none() {
575             return Err(would_block());
576         }
577 
578         match io.write {
579             State::None => {}
580             State::Err(_) => match mem::replace(&mut io.write, State::None) {
581                 State::Err(e) => return Err(e),
582                 // `io` is locked, so this branch is unreachable
583                 _ => unreachable!(),
584             },
585             // any other state should be handled in `write_done`
586             _ => {
587                 return Err(would_block());
588             }
589         }
590 
591         // Move `buf` onto the heap and fire off the write
592         let mut owned_buf = self.inner.get_buffer();
593         owned_buf.extend(buf);
594         match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
595             // Some bytes are written immediately
596             Some(n) => Ok(n),
597             // Write operation is anqueued for whole buffer
598             None => Ok(buf.len()),
599         }
600     }
601 
flush(&mut self) -> io::Result<()>602     fn flush(&mut self) -> io::Result<()> {
603         Ok(())
604     }
605 }
606 
607 impl Source for NamedPipe {
register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>608     fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
609         let mut io = self.inner.io.lock().unwrap();
610 
611         io.check_association(registry, false)?;
612 
613         if io.token.is_some() {
614             return Err(io::Error::new(
615                 io::ErrorKind::AlreadyExists,
616                 "I/O source already registered with a `Registry`",
617             ));
618         }
619 
620         if io.cp.is_none() {
621             let selector = registry.selector();
622 
623             io.cp = Some(selector.clone_port());
624 
625             let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
626             selector.inner.cp.add_handle(inner_token, self)?;
627         }
628 
629         io.token = Some(token);
630         drop(io);
631 
632         Inner::post_register(&self.inner, None);
633 
634         Ok(())
635     }
636 
reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>637     fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
638         let mut io = self.inner.io.lock().unwrap();
639 
640         io.check_association(registry, true)?;
641 
642         io.token = Some(token);
643         drop(io);
644 
645         Inner::post_register(&self.inner, None);
646 
647         Ok(())
648     }
649 
deregister(&mut self, registry: &Registry) -> io::Result<()>650     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
651         let mut io = self.inner.io.lock().unwrap();
652 
653         io.check_association(registry, true)?;
654 
655         if io.token.is_none() {
656             return Err(io::Error::new(
657                 io::ErrorKind::NotFound,
658                 "I/O source not registered with `Registry`",
659             ));
660         }
661 
662         io.token = None;
663         Ok(())
664     }
665 }
666 
667 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> RawHandle668     fn as_raw_handle(&self) -> RawHandle {
669         self.inner.handle.raw() as RawHandle
670     }
671 }
672 
673 impl fmt::Debug for NamedPipe {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result674     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
675         self.inner.handle.fmt(f)
676     }
677 }
678 
679 impl Drop for NamedPipe {
drop(&mut self)680     fn drop(&mut self) {
681         // Cancel pending reads/connects, but don't cancel writes to ensure that
682         // everything is flushed out.
683         unsafe {
684             if self.inner.connecting.load(SeqCst) {
685                 drop(cancel(&self.inner.handle, &self.inner.connect));
686             }
687 
688             let io = self.inner.io.lock().unwrap();
689             if let State::Pending(..) = io.read {
690                 drop(cancel(&self.inner.handle, &self.inner.read));
691             }
692         }
693     }
694 }
695 
696 impl Inner {
697     /// Schedules a read to happen in the background, executing an overlapped
698     /// operation.
699     ///
700     /// This function returns `true` if a normal error happens or if the read
701     /// is scheduled in the background. If the pipe is no longer connected
702     /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is
703     /// scheduled.
schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool704     fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
705         // Check to see if a read is already scheduled/completed
706         match io.read {
707             State::None => {}
708             _ => return true,
709         }
710 
711         // Allocate a buffer and schedule the read.
712         let mut buf = me.get_buffer();
713         let e = unsafe {
714             let overlapped = me.read.as_ptr() as *mut _;
715             let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
716             me.read_overlapped(slice, overlapped)
717         };
718 
719         match e {
720             // See `NamedPipe::connect` above for the rationale behind `forget`
721             Ok(_) => {
722                 io.read = State::Pending(buf, 0); // 0 is ignored on read side
723                 mem::forget(me.clone());
724                 true
725             }
726 
727             // If ERROR_PIPE_LISTENING happens then it's not a real read error,
728             // we just need to wait for a connect.
729             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false,
730 
731             // If some other error happened, though, we're now readable to give
732             // out the error.
733             Err(e) => {
734                 io.read = State::Err(e);
735                 io.notify_readable(me, events);
736                 true
737             }
738         }
739     }
740 
741     /// Maybe schedules overlapped write operation.
742     ///
743     /// * `None` means that overlapped operation was enqueued
744     /// * `Some(n)` means that `n` bytes was immediately written.
745     ///   Note, that `write_done` will fire anyway to clean up the state.
maybe_schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, ) -> io::Result<Option<usize>>746     fn maybe_schedule_write(
747         me: &Arc<Inner>,
748         buf: Vec<u8>,
749         pos: usize,
750         io: &mut Io,
751     ) -> io::Result<Option<usize>> {
752         // Very similar to `schedule_read` above, just done for the write half.
753         let e = unsafe {
754             let overlapped = me.write.as_ptr() as *mut _;
755             me.write_overlapped(&buf[pos..], overlapped)
756         };
757 
758         // See `connect` above for the rationale behind `forget`
759         match e {
760             // `n` bytes are written immediately
761             Ok(Some(n)) => {
762                 io.write = State::Ok(buf, pos);
763                 mem::forget(me.clone());
764                 Ok(Some(n))
765             }
766             // write operation is enqueued
767             Ok(None) => {
768                 io.write = State::Pending(buf, pos);
769                 mem::forget(me.clone());
770                 Ok(None)
771             }
772             Err(e) => Err(e),
773         }
774     }
775 
schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>, )776     fn schedule_write(
777         me: &Arc<Inner>,
778         buf: Vec<u8>,
779         pos: usize,
780         io: &mut Io,
781         events: Option<&mut Vec<Event>>,
782     ) {
783         match Inner::maybe_schedule_write(me, buf, pos, io) {
784             Ok(Some(_)) => {
785                 // immediate result will be handled in `write_done`,
786                 // so we'll reinterpret the `Ok` state
787                 let state = mem::replace(&mut io.write, State::None);
788                 io.write = match state {
789                     State::Ok(buf, pos) => State::Pending(buf, pos),
790                     // io is locked, so this branch is unreachable
791                     _ => unreachable!(),
792                 };
793                 mem::forget(me.clone());
794             }
795             Ok(None) => (),
796             Err(e) => {
797                 io.write = State::Err(e);
798                 io.notify_writable(me, events);
799             }
800         }
801     }
802 
post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>)803     fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
804         let mut io = me.io.lock().unwrap();
805         #[allow(clippy::needless_option_as_deref)]
806         if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
807             if let State::None = io.write {
808                 io.notify_writable(me, events);
809             }
810         }
811     }
812 
get_buffer(&self) -> Vec<u8>813     fn get_buffer(&self) -> Vec<u8> {
814         self.pool.lock().unwrap().get(4 * 1024)
815     }
816 
put_buffer(&self, buf: Vec<u8>)817     fn put_buffer(&self, buf: Vec<u8>) {
818         self.pool.lock().unwrap().put(buf)
819     }
820 }
821 
cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()>822 unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
823     let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
824     // `CancelIoEx` returns 0 on error:
825     // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
826     if ret == 0 {
827         Err(io::Error::last_os_error())
828     } else {
829         Ok(())
830     }
831 }
832 
connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)833 fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
834     let status = CompletionStatus::from_entry(status);
835 
836     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
837     // the refcount is available to us due to the `mem::forget` in
838     // `connect` above.
839     let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
840 
841     // Flag ourselves as no longer using the `connect` overlapped instances.
842     let prev = me.connecting.swap(false, SeqCst);
843     assert!(prev, "NamedPipe was not previously connecting");
844 
845     // Stash away our connect error if one happened
846     debug_assert_eq!(status.bytes_transferred(), 0);
847     unsafe {
848         match me.result(status.overlapped()) {
849             Ok(n) => debug_assert_eq!(n, 0),
850             Err(e) => me.io.lock().unwrap().connect_error = Some(e),
851         }
852     }
853 
854     // We essentially just finished a registration, so kick off a
855     // read and register write readiness.
856     Inner::post_register(&me, events);
857 }
858 
read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)859 fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
860     let status = CompletionStatus::from_entry(status);
861 
862     // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
863     // the refcount is available to us due to the `mem::forget` in
864     // `schedule_read` above.
865     let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
866 
867     // Move from the `Pending` to `Ok` state.
868     let mut io = me.io.lock().unwrap();
869     let mut buf = match mem::replace(&mut io.read, State::None) {
870         State::Pending(buf, _) => buf,
871         _ => unreachable!(),
872     };
873     unsafe {
874         match me.result(status.overlapped()) {
875             Ok(n) => {
876                 debug_assert_eq!(status.bytes_transferred() as usize, n);
877                 buf.set_len(status.bytes_transferred() as usize);
878                 io.read = State::Ok(buf, 0);
879             }
880             Err(e) => {
881                 debug_assert_eq!(status.bytes_transferred(), 0);
882                 io.read = State::Err(e);
883             }
884         }
885     }
886 
887     // Flag our readiness that we've got data.
888     io.notify_readable(&me, events);
889 }
890 
write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)891 fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
892     let status = CompletionStatus::from_entry(status);
893 
894     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
895     // the refcount is available to us due to the `mem::forget` in
896     // `schedule_write` above.
897     let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
898 
899     // Make the state change out of `Pending`. If we wrote the entire buffer
900     // then we're writable again and otherwise we schedule another write.
901     let mut io = me.io.lock().unwrap();
902     let (buf, pos) = match mem::replace(&mut io.write, State::None) {
903         // `Ok` here means, that the operation was completed immediately
904         // `bytes_transferred` is already reported to a client
905         State::Ok(..) => {
906             io.notify_writable(&me, events);
907             return;
908         }
909         State::Pending(buf, pos) => (buf, pos),
910         _ => unreachable!(),
911     };
912 
913     unsafe {
914         match me.result(status.overlapped()) {
915             Ok(n) => {
916                 debug_assert_eq!(status.bytes_transferred() as usize, n);
917                 let new_pos = pos + (status.bytes_transferred() as usize);
918                 if new_pos == buf.len() {
919                     me.put_buffer(buf);
920                     io.notify_writable(&me, events);
921                 } else {
922                     Inner::schedule_write(&me, buf, new_pos, &mut io, events);
923                 }
924             }
925             Err(e) => {
926                 debug_assert_eq!(status.bytes_transferred(), 0);
927                 io.write = State::Err(e);
928                 io.notify_writable(&me, events);
929             }
930         }
931     }
932 }
933 
event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)934 fn event_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
935     let status = CompletionStatus::from_entry(status);
936 
937     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
938     // the refcount is available to us due to the `mem::forget` in
939     // `schedule_write` above.
940     let me = unsafe { Arc::from_raw(Inner::ptr_from_event_overlapped(status.overlapped())) };
941 
942     let io = me.io.lock().unwrap();
943 
944     // Make sure the I/O handle is still registered with the selector
945     if io.token.is_some() {
946         // This method is also called during `Selector::drop` to perform
947         // cleanup. In this case, `events` is `None` and we don't need to track
948         // the event.
949         if let Some(events) = events {
950             let mut ev = Event::from_completion_status(&status);
951             // Reverse the `.data` alteration done in `schedule_event`. This
952             // alteration was done so the selector recognized the event as one from
953             // a named pipe.
954             ev.data >>= 1;
955             events.push(ev);
956         }
957     }
958 }
959 
960 impl Io {
check_association(&self, registry: &Registry, required: bool) -> io::Result<()>961     fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
962         match self.cp {
963             Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
964                 io::ErrorKind::AlreadyExists,
965                 "I/O source already registered with a different `Registry`",
966             )),
967             None if required => Err(io::Error::new(
968                 io::ErrorKind::NotFound,
969                 "I/O source not registered with `Registry`",
970             )),
971             _ => Ok(()),
972         }
973     }
974 
notify_readable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>)975     fn notify_readable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
976         if let Some(token) = self.token {
977             let mut ev = Event::new(token);
978             ev.set_readable();
979 
980             if let Some(events) = events {
981                 events.push(ev);
982             } else {
983                 self.schedule_event(me, ev);
984             }
985         }
986     }
987 
notify_writable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>)988     fn notify_writable(&self, me: &Arc<Inner>, events: Option<&mut Vec<Event>>) {
989         if let Some(token) = self.token {
990             let mut ev = Event::new(token);
991             ev.set_writable();
992 
993             if let Some(events) = events {
994                 events.push(ev);
995             } else {
996                 self.schedule_event(me, ev);
997             }
998         }
999     }
1000 
schedule_event(&self, me: &Arc<Inner>, mut event: Event)1001     fn schedule_event(&self, me: &Arc<Inner>, mut event: Event) {
1002         // Alter the token so that the selector will identify the IOCP event as
1003         // one for a named pipe. This will be reversed in `event_done`
1004         //
1005         // `data` for named pipes is an auto-incrementing counter. Because
1006         // `data` is `u64` we do not risk losing the most-significant bit
1007         // (unless a user creates 2^62 named pipes during the lifetime of the
1008         // process).
1009         event.data <<= 1;
1010         event.data += 1;
1011 
1012         let completion_status =
1013             event.to_completion_status_with_overlapped(me.event.as_ptr() as *mut _);
1014 
1015         match self.cp.as_ref().unwrap().post(completion_status) {
1016             Ok(_) => {
1017                 // Increase the ref count of `Inner` for the completion event.
1018                 mem::forget(me.clone());
1019             }
1020             Err(_) => {
1021                 // Nothing to do here
1022             }
1023         }
1024     }
1025 }
1026 
1027 struct BufferPool {
1028     pool: Vec<Vec<u8>>,
1029 }
1030 
1031 impl BufferPool {
with_capacity(cap: usize) -> BufferPool1032     fn with_capacity(cap: usize) -> BufferPool {
1033         BufferPool {
1034             pool: Vec::with_capacity(cap),
1035         }
1036     }
1037 
get(&mut self, default_cap: usize) -> Vec<u8>1038     fn get(&mut self, default_cap: usize) -> Vec<u8> {
1039         self.pool
1040             .pop()
1041             .unwrap_or_else(|| Vec::with_capacity(default_cap))
1042     }
1043 
put(&mut self, mut buf: Vec<u8>)1044     fn put(&mut self, mut buf: Vec<u8>) {
1045         if self.pool.len() < self.pool.capacity() {
1046             unsafe {
1047                 buf.set_len(0);
1048             }
1049             self.pool.push(buf);
1050         }
1051     }
1052 }
1053