1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4 
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11 
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 // Hide imports which are not used when generating documentation.
20 #[cfg(windows)]
21 mod doc {
22     pub(super) use crate::os::windows::ffi::OsStrExt;
23     pub(super) mod windows_sys {
24         pub(crate) use windows_sys::{
25             Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26             Win32::System::SystemServices::*,
27         };
28     }
29     pub(super) use mio::windows as mio_windows;
30 }
31 
32 // NB: none of these shows up in public API, so don't document them.
33 #[cfg(not(windows))]
34 mod doc {
35     pub(super) mod mio_windows {
36         pub type NamedPipe = crate::doc::NotDefinedHere;
37     }
38 }
39 
40 use self::doc::*;
41 
42 /// A [Windows named pipe] server.
43 ///
44 /// Accepting client connections involves creating a server with
45 /// [`ServerOptions::create`] and waiting for clients to connect using
46 /// [`NamedPipeServer::connect`].
47 ///
48 /// To avoid having clients sporadically fail with
49 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50 /// ensure that at least one server instance is available at all times. This
51 /// means that the typical listen loop for a server is a bit involved, because
52 /// we have to ensure that we never drop a server accidentally while a client
53 /// might connect.
54 ///
55 /// So a correctly implemented server looks like this:
56 ///
57 /// ```no_run
58 /// use std::io;
59 /// use tokio::net::windows::named_pipe::ServerOptions;
60 ///
61 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62 ///
63 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
64 /// // The first server needs to be constructed early so that clients can
65 /// // be correctly connected. Otherwise calling .wait will cause the client to
66 /// // error.
67 /// //
68 /// // Here we also make use of `first_pipe_instance`, which will ensure that
69 /// // there are no other servers up and running already.
70 /// let mut server = ServerOptions::new()
71 ///     .first_pipe_instance(true)
72 ///     .create(PIPE_NAME)?;
73 ///
74 /// // Spawn the server loop.
75 /// let server = tokio::spawn(async move {
76 ///     loop {
77 ///         // Wait for a client to connect.
78 ///         server.connect().await?;
79 ///         let connected_client = server;
80 ///
81 ///         // Construct the next server to be connected before sending the one
82 ///         // we already have of onto a task. This ensures that the server
83 ///         // isn't closed (after it's done in the task) before a new one is
84 ///         // available. Otherwise the client might error with
85 ///         // `io::ErrorKind::NotFound`.
86 ///         server = ServerOptions::new().create(PIPE_NAME)?;
87 ///
88 ///         let client = tokio::spawn(async move {
89 ///             /* use the connected client */
90 /// #           Ok::<_, std::io::Error>(())
91 ///         });
92 /// #       if true { break } // needed for type inference to work
93 ///     }
94 ///
95 ///     Ok::<_, io::Error>(())
96 /// });
97 ///
98 /// /* do something else not server related here */
99 /// # Ok(()) }
100 /// ```
101 ///
102 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
103 #[derive(Debug)]
104 pub struct NamedPipeServer {
105     io: PollEvented<mio_windows::NamedPipe>,
106 }
107 
108 impl NamedPipeServer {
109     /// Constructs a new named pipe server from the specified raw handle.
110     ///
111     /// This function will consume ownership of the handle given, passing
112     /// responsibility for closing the handle to the returned object.
113     ///
114     /// This function is also unsafe as the primitives currently returned have
115     /// the contract that they are the sole owner of the file descriptor they
116     /// are wrapping. Usage of this function could accidentally allow violating
117     /// this contract which can cause memory unsafety in code that relies on it
118     /// being true.
119     ///
120     /// # Errors
121     ///
122     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
123     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
124     ///
125     /// [Tokio Runtime]: crate::runtime::Runtime
126     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>127     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
128         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
129 
130         Ok(Self {
131             io: PollEvented::new(named_pipe)?,
132         })
133     }
134 
135     /// Retrieves information about the named pipe the server is associated
136     /// with.
137     ///
138     /// ```no_run
139     /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
140     ///
141     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
142     ///
143     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
144     /// let server = ServerOptions::new()
145     ///     .pipe_mode(PipeMode::Message)
146     ///     .max_instances(5)
147     ///     .create(PIPE_NAME)?;
148     ///
149     /// let server_info = server.info()?;
150     ///
151     /// assert_eq!(server_info.end, PipeEnd::Server);
152     /// assert_eq!(server_info.mode, PipeMode::Message);
153     /// assert_eq!(server_info.max_instances, 5);
154     /// # Ok(()) }
155     /// ```
info(&self) -> io::Result<PipeInfo>156     pub fn info(&self) -> io::Result<PipeInfo> {
157         // Safety: we're ensuring the lifetime of the named pipe.
158         unsafe { named_pipe_info(self.io.as_raw_handle()) }
159     }
160 
161     /// Enables a named pipe server process to wait for a client process to
162     /// connect to an instance of a named pipe. A client process connects by
163     /// creating a named pipe with the same name.
164     ///
165     /// This corresponds to the [`ConnectNamedPipe`] system call.
166     ///
167     /// # Cancel safety
168     ///
169     /// This method is cancellation safe in the sense that if it is used as the
170     /// event in a [`select!`](crate::select) statement and some other branch
171     /// completes first, then no connection events have been lost.
172     ///
173     /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
174     ///
175     /// # Example
176     ///
177     /// ```no_run
178     /// use tokio::net::windows::named_pipe::ServerOptions;
179     ///
180     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
181     ///
182     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
183     /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
184     ///
185     /// // Wait for a client to connect.
186     /// pipe.connect().await?;
187     ///
188     /// // Use the connected client...
189     /// # Ok(()) }
190     /// ```
connect(&self) -> io::Result<()>191     pub async fn connect(&self) -> io::Result<()> {
192         match self.io.connect() {
193             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
194                 self.io
195                     .registration()
196                     .async_io(Interest::WRITABLE, || self.io.connect())
197                     .await
198             }
199             x => x,
200         }
201     }
202 
203     /// Disconnects the server end of a named pipe instance from a client
204     /// process.
205     ///
206     /// ```
207     /// use tokio::io::AsyncWriteExt;
208     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
209     /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
210     ///
211     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
212     ///
213     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
214     /// let server = ServerOptions::new()
215     ///     .create(PIPE_NAME)?;
216     ///
217     /// let mut client = ClientOptions::new()
218     ///     .open(PIPE_NAME)?;
219     ///
220     /// // Wait for a client to become connected.
221     /// server.connect().await?;
222     ///
223     /// // Forcibly disconnect the client.
224     /// server.disconnect()?;
225     ///
226     /// // Write fails with an OS-specific error after client has been
227     /// // disconnected.
228     /// let e = client.write(b"ping").await.unwrap_err();
229     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
230     /// # Ok(()) }
231     /// ```
disconnect(&self) -> io::Result<()>232     pub fn disconnect(&self) -> io::Result<()> {
233         self.io.disconnect()
234     }
235 
236     /// Waits for any of the requested ready states.
237     ///
238     /// This function is usually paired with `try_read()` or `try_write()`. It
239     /// can be used to concurrently read / write to the same pipe on a single
240     /// task without splitting the pipe.
241     ///
242     /// The function may complete without the pipe being ready. This is a
243     /// false-positive and attempting an operation will return with
244     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
245     /// [`Ready`] set, so you should always check the returned value and possibly
246     /// wait again if the requested states are not set.
247     ///
248     /// # Examples
249     ///
250     /// Concurrently read and write to the pipe on the same task without
251     /// splitting.
252     ///
253     /// ```no_run
254     /// use tokio::io::Interest;
255     /// use tokio::net::windows::named_pipe;
256     /// use std::error::Error;
257     /// use std::io;
258     ///
259     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
260     ///
261     /// #[tokio::main]
262     /// async fn main() -> Result<(), Box<dyn Error>> {
263     ///     let server = named_pipe::ServerOptions::new()
264     ///         .create(PIPE_NAME)?;
265     ///
266     ///     loop {
267     ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
268     ///
269     ///         if ready.is_readable() {
270     ///             let mut data = vec![0; 1024];
271     ///             // Try to read data, this may still fail with `WouldBlock`
272     ///             // if the readiness event is a false positive.
273     ///             match server.try_read(&mut data) {
274     ///                 Ok(n) => {
275     ///                     println!("read {} bytes", n);
276     ///                 }
277     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
278     ///                     continue;
279     ///                 }
280     ///                 Err(e) => {
281     ///                     return Err(e.into());
282     ///                 }
283     ///             }
284     ///         }
285     ///
286     ///         if ready.is_writable() {
287     ///             // Try to write data, this may still fail with `WouldBlock`
288     ///             // if the readiness event is a false positive.
289     ///             match server.try_write(b"hello world") {
290     ///                 Ok(n) => {
291     ///                     println!("write {} bytes", n);
292     ///                 }
293     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
294     ///                     continue;
295     ///                 }
296     ///                 Err(e) => {
297     ///                     return Err(e.into());
298     ///                 }
299     ///             }
300     ///         }
301     ///     }
302     /// }
303     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>304     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
305         let event = self.io.registration().readiness(interest).await?;
306         Ok(event.ready)
307     }
308 
309     /// Waits for the pipe to become readable.
310     ///
311     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
312     /// paired with `try_read()`.
313     ///
314     /// # Examples
315     ///
316     /// ```no_run
317     /// use tokio::net::windows::named_pipe;
318     /// use std::error::Error;
319     /// use std::io;
320     ///
321     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
322     ///
323     /// #[tokio::main]
324     /// async fn main() -> Result<(), Box<dyn Error>> {
325     ///     let server = named_pipe::ServerOptions::new()
326     ///         .create(PIPE_NAME)?;
327     ///
328     ///     let mut msg = vec![0; 1024];
329     ///
330     ///     loop {
331     ///         // Wait for the pipe to be readable
332     ///         server.readable().await?;
333     ///
334     ///         // Try to read data, this may still fail with `WouldBlock`
335     ///         // if the readiness event is a false positive.
336     ///         match server.try_read(&mut msg) {
337     ///             Ok(n) => {
338     ///                 msg.truncate(n);
339     ///                 break;
340     ///             }
341     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
342     ///                 continue;
343     ///             }
344     ///             Err(e) => {
345     ///                 return Err(e.into());
346     ///             }
347     ///         }
348     ///     }
349     ///
350     ///     println!("GOT = {:?}", msg);
351     ///     Ok(())
352     /// }
353     /// ```
readable(&self) -> io::Result<()>354     pub async fn readable(&self) -> io::Result<()> {
355         self.ready(Interest::READABLE).await?;
356         Ok(())
357     }
358 
359     /// Polls for read readiness.
360     ///
361     /// If the pipe is not currently ready for reading, this method will
362     /// store a clone of the `Waker` from the provided `Context`. When the pipe
363     /// becomes ready for reading, `Waker::wake` will be called on the waker.
364     ///
365     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
366     /// the `Waker` from the `Context` passed to the most recent call is
367     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
368     /// second, independent waker.)
369     ///
370     /// This function is intended for cases where creating and pinning a future
371     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
372     /// preferred, as this supports polling from multiple tasks at once.
373     ///
374     /// # Return value
375     ///
376     /// The function returns:
377     ///
378     /// * `Poll::Pending` if the pipe is not ready for reading.
379     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
380     /// * `Poll::Ready(Err(e))` if an error is encountered.
381     ///
382     /// # Errors
383     ///
384     /// This function may encounter any standard I/O error except `WouldBlock`.
385     ///
386     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>387     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
388         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
389     }
390 
391     /// Tries to read data from the pipe into the provided buffer, returning how
392     /// many bytes were read.
393     ///
394     /// Receives any pending data from the pipe but does not wait for new data
395     /// to arrive. On success, returns the number of bytes read. Because
396     /// `try_read()` is non-blocking, the buffer does not have to be stored by
397     /// the async task and can exist entirely on the stack.
398     ///
399     /// Usually, [`readable()`] or [`ready()`] is used with this function.
400     ///
401     /// [`readable()`]: NamedPipeServer::readable()
402     /// [`ready()`]: NamedPipeServer::ready()
403     ///
404     /// # Return
405     ///
406     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
407     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
408     ///
409     /// 1. The pipe's read half is closed and will no longer yield data.
410     /// 2. The specified buffer was 0 bytes in length.
411     ///
412     /// If the pipe is not ready to read data,
413     /// `Err(io::ErrorKind::WouldBlock)` is returned.
414     ///
415     /// # Examples
416     ///
417     /// ```no_run
418     /// use tokio::net::windows::named_pipe;
419     /// use std::error::Error;
420     /// use std::io;
421     ///
422     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
423     ///
424     /// #[tokio::main]
425     /// async fn main() -> Result<(), Box<dyn Error>> {
426     ///     let server = named_pipe::ServerOptions::new()
427     ///         .create(PIPE_NAME)?;
428     ///
429     ///     loop {
430     ///         // Wait for the pipe to be readable
431     ///         server.readable().await?;
432     ///
433     ///         // Creating the buffer **after** the `await` prevents it from
434     ///         // being stored in the async task.
435     ///         let mut buf = [0; 4096];
436     ///
437     ///         // Try to read data, this may still fail with `WouldBlock`
438     ///         // if the readiness event is a false positive.
439     ///         match server.try_read(&mut buf) {
440     ///             Ok(0) => break,
441     ///             Ok(n) => {
442     ///                 println!("read {} bytes", n);
443     ///             }
444     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
445     ///                 continue;
446     ///             }
447     ///             Err(e) => {
448     ///                 return Err(e.into());
449     ///             }
450     ///         }
451     ///     }
452     ///
453     ///     Ok(())
454     /// }
455     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>456     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
457         self.io
458             .registration()
459             .try_io(Interest::READABLE, || (&*self.io).read(buf))
460     }
461 
462     /// Tries to read data from the pipe into the provided buffers, returning
463     /// how many bytes were read.
464     ///
465     /// Data is copied to fill each buffer in order, with the final buffer
466     /// written to possibly being only partially filled. This method behaves
467     /// equivalently to a single call to [`try_read()`] with concatenated
468     /// buffers.
469     ///
470     /// Receives any pending data from the pipe but does not wait for new data
471     /// to arrive. On success, returns the number of bytes read. Because
472     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
473     /// stored by the async task and can exist entirely on the stack.
474     ///
475     /// Usually, [`readable()`] or [`ready()`] is used with this function.
476     ///
477     /// [`try_read()`]: NamedPipeServer::try_read()
478     /// [`readable()`]: NamedPipeServer::readable()
479     /// [`ready()`]: NamedPipeServer::ready()
480     ///
481     /// # Return
482     ///
483     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
484     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
485     /// and will no longer yield data. If the pipe is not ready to read data
486     /// `Err(io::ErrorKind::WouldBlock)` is returned.
487     ///
488     /// # Examples
489     ///
490     /// ```no_run
491     /// use tokio::net::windows::named_pipe;
492     /// use std::error::Error;
493     /// use std::io::{self, IoSliceMut};
494     ///
495     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
496     ///
497     /// #[tokio::main]
498     /// async fn main() -> Result<(), Box<dyn Error>> {
499     ///     let server = named_pipe::ServerOptions::new()
500     ///         .create(PIPE_NAME)?;
501     ///
502     ///     loop {
503     ///         // Wait for the pipe to be readable
504     ///         server.readable().await?;
505     ///
506     ///         // Creating the buffer **after** the `await` prevents it from
507     ///         // being stored in the async task.
508     ///         let mut buf_a = [0; 512];
509     ///         let mut buf_b = [0; 1024];
510     ///         let mut bufs = [
511     ///             IoSliceMut::new(&mut buf_a),
512     ///             IoSliceMut::new(&mut buf_b),
513     ///         ];
514     ///
515     ///         // Try to read data, this may still fail with `WouldBlock`
516     ///         // if the readiness event is a false positive.
517     ///         match server.try_read_vectored(&mut bufs) {
518     ///             Ok(0) => break,
519     ///             Ok(n) => {
520     ///                 println!("read {} bytes", n);
521     ///             }
522     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
523     ///                 continue;
524     ///             }
525     ///             Err(e) => {
526     ///                 return Err(e.into());
527     ///             }
528     ///         }
529     ///     }
530     ///
531     ///     Ok(())
532     /// }
533     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>534     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
535         self.io
536             .registration()
537             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
538     }
539 
540     cfg_io_util! {
541         /// Tries to read data from the stream into the provided buffer, advancing the
542         /// buffer's internal cursor, returning how many bytes were read.
543         ///
544         /// Receives any pending data from the pipe but does not wait for new data
545         /// to arrive. On success, returns the number of bytes read. Because
546         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
547         /// the async task and can exist entirely on the stack.
548         ///
549         /// Usually, [`readable()`] or [`ready()`] is used with this function.
550         ///
551         /// [`readable()`]: NamedPipeServer::readable()
552         /// [`ready()`]: NamedPipeServer::ready()
553         ///
554         /// # Return
555         ///
556         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
557         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
558         /// and will no longer yield data. If the stream is not ready to read data
559         /// `Err(io::ErrorKind::WouldBlock)` is returned.
560         ///
561         /// # Examples
562         ///
563         /// ```no_run
564         /// use tokio::net::windows::named_pipe;
565         /// use std::error::Error;
566         /// use std::io;
567         ///
568         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
569         ///
570         /// #[tokio::main]
571         /// async fn main() -> Result<(), Box<dyn Error>> {
572         ///     let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
573         ///
574         ///     loop {
575         ///         // Wait for the pipe to be readable
576         ///         server.readable().await?;
577         ///
578         ///         let mut buf = Vec::with_capacity(4096);
579         ///
580         ///         // Try to read data, this may still fail with `WouldBlock`
581         ///         // if the readiness event is a false positive.
582         ///         match server.try_read_buf(&mut buf) {
583         ///             Ok(0) => break,
584         ///             Ok(n) => {
585         ///                 println!("read {} bytes", n);
586         ///             }
587         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
588         ///                 continue;
589         ///             }
590         ///             Err(e) => {
591         ///                 return Err(e.into());
592         ///             }
593         ///         }
594         ///     }
595         ///
596         ///     Ok(())
597         /// }
598         /// ```
599         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
600             self.io.registration().try_io(Interest::READABLE, || {
601                 use std::io::Read;
602 
603                 let dst = buf.chunk_mut();
604                 let dst =
605                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
606 
607                 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
608                 // buffer.
609                 let n = (&*self.io).read(dst)?;
610 
611                 unsafe {
612                     buf.advance_mut(n);
613                 }
614 
615                 Ok(n)
616             })
617         }
618     }
619 
620     /// Waits for the pipe to become writable.
621     ///
622     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
623     /// paired with `try_write()`.
624     ///
625     /// # Examples
626     ///
627     /// ```no_run
628     /// use tokio::net::windows::named_pipe;
629     /// use std::error::Error;
630     /// use std::io;
631     ///
632     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
633     ///
634     /// #[tokio::main]
635     /// async fn main() -> Result<(), Box<dyn Error>> {
636     ///     let server = named_pipe::ServerOptions::new()
637     ///         .create(PIPE_NAME)?;
638     ///
639     ///     loop {
640     ///         // Wait for the pipe to be writable
641     ///         server.writable().await?;
642     ///
643     ///         // Try to write data, this may still fail with `WouldBlock`
644     ///         // if the readiness event is a false positive.
645     ///         match server.try_write(b"hello world") {
646     ///             Ok(n) => {
647     ///                 break;
648     ///             }
649     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650     ///                 continue;
651     ///             }
652     ///             Err(e) => {
653     ///                 return Err(e.into());
654     ///             }
655     ///         }
656     ///     }
657     ///
658     ///     Ok(())
659     /// }
660     /// ```
writable(&self) -> io::Result<()>661     pub async fn writable(&self) -> io::Result<()> {
662         self.ready(Interest::WRITABLE).await?;
663         Ok(())
664     }
665 
666     /// Polls for write readiness.
667     ///
668     /// If the pipe is not currently ready for writing, this method will
669     /// store a clone of the `Waker` from the provided `Context`. When the pipe
670     /// becomes ready for writing, `Waker::wake` will be called on the waker.
671     ///
672     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
673     /// the `Waker` from the `Context` passed to the most recent call is
674     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
675     /// second, independent waker.)
676     ///
677     /// This function is intended for cases where creating and pinning a future
678     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
679     /// preferred, as this supports polling from multiple tasks at once.
680     ///
681     /// # Return value
682     ///
683     /// The function returns:
684     ///
685     /// * `Poll::Pending` if the pipe is not ready for writing.
686     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
687     /// * `Poll::Ready(Err(e))` if an error is encountered.
688     ///
689     /// # Errors
690     ///
691     /// This function may encounter any standard I/O error except `WouldBlock`.
692     ///
693     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>694     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
695         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
696     }
697 
698     /// Tries to write a buffer to the pipe, returning how many bytes were
699     /// written.
700     ///
701     /// The function will attempt to write the entire contents of `buf`, but
702     /// only part of the buffer may be written.
703     ///
704     /// This function is usually paired with `writable()`.
705     ///
706     /// # Return
707     ///
708     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
709     /// number of bytes written. If the pipe is not ready to write data,
710     /// `Err(io::ErrorKind::WouldBlock)` is returned.
711     ///
712     /// # Examples
713     ///
714     /// ```no_run
715     /// use tokio::net::windows::named_pipe;
716     /// use std::error::Error;
717     /// use std::io;
718     ///
719     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
720     ///
721     /// #[tokio::main]
722     /// async fn main() -> Result<(), Box<dyn Error>> {
723     ///     let server = named_pipe::ServerOptions::new()
724     ///         .create(PIPE_NAME)?;
725     ///
726     ///     loop {
727     ///         // Wait for the pipe to be writable
728     ///         server.writable().await?;
729     ///
730     ///         // Try to write data, this may still fail with `WouldBlock`
731     ///         // if the readiness event is a false positive.
732     ///         match server.try_write(b"hello world") {
733     ///             Ok(n) => {
734     ///                 break;
735     ///             }
736     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
737     ///                 continue;
738     ///             }
739     ///             Err(e) => {
740     ///                 return Err(e.into());
741     ///             }
742     ///         }
743     ///     }
744     ///
745     ///     Ok(())
746     /// }
747     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>748     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
749         self.io
750             .registration()
751             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
752     }
753 
754     /// Tries to write several buffers to the pipe, returning how many bytes
755     /// were written.
756     ///
757     /// Data is written from each buffer in order, with the final buffer read
758     /// from possible being only partially consumed. This method behaves
759     /// equivalently to a single call to [`try_write()`] with concatenated
760     /// buffers.
761     ///
762     /// This function is usually paired with `writable()`.
763     ///
764     /// [`try_write()`]: NamedPipeServer::try_write()
765     ///
766     /// # Return
767     ///
768     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
769     /// number of bytes written. If the pipe is not ready to write data,
770     /// `Err(io::ErrorKind::WouldBlock)` is returned.
771     ///
772     /// # Examples
773     ///
774     /// ```no_run
775     /// use tokio::net::windows::named_pipe;
776     /// use std::error::Error;
777     /// use std::io;
778     ///
779     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
780     ///
781     /// #[tokio::main]
782     /// async fn main() -> Result<(), Box<dyn Error>> {
783     ///     let server = named_pipe::ServerOptions::new()
784     ///         .create(PIPE_NAME)?;
785     ///
786     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
787     ///
788     ///     loop {
789     ///         // Wait for the pipe to be writable
790     ///         server.writable().await?;
791     ///
792     ///         // Try to write data, this may still fail with `WouldBlock`
793     ///         // if the readiness event is a false positive.
794     ///         match server.try_write_vectored(&bufs) {
795     ///             Ok(n) => {
796     ///                 break;
797     ///             }
798     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
799     ///                 continue;
800     ///             }
801     ///             Err(e) => {
802     ///                 return Err(e.into());
803     ///             }
804     ///         }
805     ///     }
806     ///
807     ///     Ok(())
808     /// }
809     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>810     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
811         self.io
812             .registration()
813             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
814     }
815 
816     /// Tries to read or write from the pipe using a user-provided IO operation.
817     ///
818     /// If the pipe is ready, the provided closure is called. The closure
819     /// should attempt to perform IO operation from the pipe by manually
820     /// calling the appropriate syscall. If the operation fails because the
821     /// pipe is not actually ready, then the closure should return a
822     /// `WouldBlock` error and the readiness flag is cleared. The return value
823     /// of the closure is then returned by `try_io`.
824     ///
825     /// If the pipe is not ready, then the closure is not called
826     /// and a `WouldBlock` error is returned.
827     ///
828     /// The closure should only return a `WouldBlock` error if it has performed
829     /// an IO operation on the pipe that failed due to the pipe not being
830     /// ready. Returning a `WouldBlock` error in any other situation will
831     /// incorrectly clear the readiness flag, which can cause the pipe to
832     /// behave incorrectly.
833     ///
834     /// The closure should not perform the IO operation using any of the
835     /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
836     /// the readiness flag and can cause the pipe to behave incorrectly.
837     ///
838     /// This method is not intended to be used with combined interests.
839     /// The closure should perform only one type of IO operation, so it should not
840     /// require more than one ready state. This method may panic or sleep forever
841     /// if it is called with a combined interest.
842     ///
843     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
844     ///
845     /// [`readable()`]: NamedPipeServer::readable()
846     /// [`writable()`]: NamedPipeServer::writable()
847     /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>848     pub fn try_io<R>(
849         &self,
850         interest: Interest,
851         f: impl FnOnce() -> io::Result<R>,
852     ) -> io::Result<R> {
853         self.io.registration().try_io(interest, f)
854     }
855 
856     /// Reads or writes from the pipe using a user-provided IO operation.
857     ///
858     /// The readiness of the pipe is awaited and when the pipe is ready,
859     /// the provided closure is called. The closure should attempt to perform
860     /// IO operation on the pipe by manually calling the appropriate syscall.
861     /// If the operation fails because the pipe is not actually ready,
862     /// then the closure should return a `WouldBlock` error. In such case the
863     /// readiness flag is cleared and the pipe readiness is awaited again.
864     /// This loop is repeated until the closure returns an `Ok` or an error
865     /// other than `WouldBlock`.
866     ///
867     /// The closure should only return a `WouldBlock` error if it has performed
868     /// an IO operation on the pipe that failed due to the pipe not being
869     /// ready. Returning a `WouldBlock` error in any other situation will
870     /// incorrectly clear the readiness flag, which can cause the pipe to
871     /// behave incorrectly.
872     ///
873     /// The closure should not perform the IO operation using any of the methods
874     /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
875     /// readiness flag and can cause the pipe to behave incorrectly.
876     ///
877     /// This method is not intended to be used with combined interests.
878     /// The closure should perform only one type of IO operation, so it should not
879     /// require more than one ready state. This method may panic or sleep forever
880     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>881     pub async fn async_io<R>(
882         &self,
883         interest: Interest,
884         f: impl FnMut() -> io::Result<R>,
885     ) -> io::Result<R> {
886         self.io.registration().async_io(interest, f).await
887     }
888 }
889 
890 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>891     fn poll_read(
892         self: Pin<&mut Self>,
893         cx: &mut Context<'_>,
894         buf: &mut ReadBuf<'_>,
895     ) -> Poll<io::Result<()>> {
896         unsafe { self.io.poll_read(cx, buf) }
897     }
898 }
899 
900 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>901     fn poll_write(
902         self: Pin<&mut Self>,
903         cx: &mut Context<'_>,
904         buf: &[u8],
905     ) -> Poll<io::Result<usize>> {
906         self.io.poll_write(cx, buf)
907     }
908 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>909     fn poll_write_vectored(
910         self: Pin<&mut Self>,
911         cx: &mut Context<'_>,
912         bufs: &[io::IoSlice<'_>],
913     ) -> Poll<io::Result<usize>> {
914         self.io.poll_write_vectored(cx, bufs)
915     }
916 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>917     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
918         Poll::Ready(Ok(()))
919     }
920 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>921     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
922         self.poll_flush(cx)
923     }
924 }
925 
926 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle927     fn as_raw_handle(&self) -> RawHandle {
928         self.io.as_raw_handle()
929     }
930 }
931 
932 impl AsHandle for NamedPipeServer {
as_handle(&self) -> BorrowedHandle<'_>933     fn as_handle(&self) -> BorrowedHandle<'_> {
934         unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
935     }
936 }
937 
938 /// A [Windows named pipe] client.
939 ///
940 /// Constructed using [`ClientOptions::open`].
941 ///
942 /// Connecting a client correctly involves a few steps. When connecting through
943 /// [`ClientOptions::open`], it might error indicating one of two things:
944 ///
945 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
946 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
947 ///   for a while and try again.
948 ///
949 /// So a correctly implemented client looks like this:
950 ///
951 /// ```no_run
952 /// use std::time::Duration;
953 /// use tokio::net::windows::named_pipe::ClientOptions;
954 /// use tokio::time;
955 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
956 ///
957 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
958 ///
959 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
960 /// let client = loop {
961 ///     match ClientOptions::new().open(PIPE_NAME) {
962 ///         Ok(client) => break client,
963 ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
964 ///         Err(e) => return Err(e),
965 ///     }
966 ///
967 ///     time::sleep(Duration::from_millis(50)).await;
968 /// };
969 ///
970 /// /* use the connected client */
971 /// # Ok(()) }
972 /// ```
973 ///
974 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
975 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
976 #[derive(Debug)]
977 pub struct NamedPipeClient {
978     io: PollEvented<mio_windows::NamedPipe>,
979 }
980 
981 impl NamedPipeClient {
982     /// Constructs a new named pipe client from the specified raw handle.
983     ///
984     /// This function will consume ownership of the handle given, passing
985     /// responsibility for closing the handle to the returned object.
986     ///
987     /// This function is also unsafe as the primitives currently returned have
988     /// the contract that they are the sole owner of the file descriptor they
989     /// are wrapping. Usage of this function could accidentally allow violating
990     /// this contract which can cause memory unsafety in code that relies on it
991     /// being true.
992     ///
993     /// # Errors
994     ///
995     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
996     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
997     ///
998     /// [Tokio Runtime]: crate::runtime::Runtime
999     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>1000     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1001         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1002 
1003         Ok(Self {
1004             io: PollEvented::new(named_pipe)?,
1005         })
1006     }
1007 
1008     /// Retrieves information about the named pipe the client is associated
1009     /// with.
1010     ///
1011     /// ```no_run
1012     /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1013     ///
1014     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1015     ///
1016     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1017     /// let client = ClientOptions::new()
1018     ///     .open(PIPE_NAME)?;
1019     ///
1020     /// let client_info = client.info()?;
1021     ///
1022     /// assert_eq!(client_info.end, PipeEnd::Client);
1023     /// assert_eq!(client_info.mode, PipeMode::Message);
1024     /// assert_eq!(client_info.max_instances, 5);
1025     /// # Ok(()) }
1026     /// ```
info(&self) -> io::Result<PipeInfo>1027     pub fn info(&self) -> io::Result<PipeInfo> {
1028         // Safety: we're ensuring the lifetime of the named pipe.
1029         unsafe { named_pipe_info(self.io.as_raw_handle()) }
1030     }
1031 
1032     /// Waits for any of the requested ready states.
1033     ///
1034     /// This function is usually paired with `try_read()` or `try_write()`. It
1035     /// can be used to concurrently read / write to the same pipe on a single
1036     /// task without splitting the pipe.
1037     ///
1038     /// The function may complete without the pipe being ready. This is a
1039     /// false-positive and attempting an operation will return with
1040     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1041     /// [`Ready`] set, so you should always check the returned value and possibly
1042     /// wait again if the requested states are not set.
1043     ///
1044     /// # Examples
1045     ///
1046     /// Concurrently read and write to the pipe on the same task without
1047     /// splitting.
1048     ///
1049     /// ```no_run
1050     /// use tokio::io::Interest;
1051     /// use tokio::net::windows::named_pipe;
1052     /// use std::error::Error;
1053     /// use std::io;
1054     ///
1055     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1056     ///
1057     /// #[tokio::main]
1058     /// async fn main() -> Result<(), Box<dyn Error>> {
1059     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1060     ///
1061     ///     loop {
1062     ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1063     ///
1064     ///         if ready.is_readable() {
1065     ///             let mut data = vec![0; 1024];
1066     ///             // Try to read data, this may still fail with `WouldBlock`
1067     ///             // if the readiness event is a false positive.
1068     ///             match client.try_read(&mut data) {
1069     ///                 Ok(n) => {
1070     ///                     println!("read {} bytes", n);
1071     ///                 }
1072     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1073     ///                     continue;
1074     ///                 }
1075     ///                 Err(e) => {
1076     ///                     return Err(e.into());
1077     ///                 }
1078     ///             }
1079     ///         }
1080     ///
1081     ///         if ready.is_writable() {
1082     ///             // Try to write data, this may still fail with `WouldBlock`
1083     ///             // if the readiness event is a false positive.
1084     ///             match client.try_write(b"hello world") {
1085     ///                 Ok(n) => {
1086     ///                     println!("write {} bytes", n);
1087     ///                 }
1088     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1089     ///                     continue;
1090     ///                 }
1091     ///                 Err(e) => {
1092     ///                     return Err(e.into());
1093     ///                 }
1094     ///             }
1095     ///         }
1096     ///     }
1097     /// }
1098     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>1099     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1100         let event = self.io.registration().readiness(interest).await?;
1101         Ok(event.ready)
1102     }
1103 
1104     /// Waits for the pipe to become readable.
1105     ///
1106     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1107     /// paired with `try_read()`.
1108     ///
1109     /// # Examples
1110     ///
1111     /// ```no_run
1112     /// use tokio::net::windows::named_pipe;
1113     /// use std::error::Error;
1114     /// use std::io;
1115     ///
1116     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1117     ///
1118     /// #[tokio::main]
1119     /// async fn main() -> Result<(), Box<dyn Error>> {
1120     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1121     ///
1122     ///     let mut msg = vec![0; 1024];
1123     ///
1124     ///     loop {
1125     ///         // Wait for the pipe to be readable
1126     ///         client.readable().await?;
1127     ///
1128     ///         // Try to read data, this may still fail with `WouldBlock`
1129     ///         // if the readiness event is a false positive.
1130     ///         match client.try_read(&mut msg) {
1131     ///             Ok(n) => {
1132     ///                 msg.truncate(n);
1133     ///                 break;
1134     ///             }
1135     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1136     ///                 continue;
1137     ///             }
1138     ///             Err(e) => {
1139     ///                 return Err(e.into());
1140     ///             }
1141     ///         }
1142     ///     }
1143     ///
1144     ///     println!("GOT = {:?}", msg);
1145     ///     Ok(())
1146     /// }
1147     /// ```
readable(&self) -> io::Result<()>1148     pub async fn readable(&self) -> io::Result<()> {
1149         self.ready(Interest::READABLE).await?;
1150         Ok(())
1151     }
1152 
1153     /// Polls for read readiness.
1154     ///
1155     /// If the pipe is not currently ready for reading, this method will
1156     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1157     /// becomes ready for reading, `Waker::wake` will be called on the waker.
1158     ///
1159     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1160     /// the `Waker` from the `Context` passed to the most recent call is
1161     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1162     /// second, independent waker.)
1163     ///
1164     /// This function is intended for cases where creating and pinning a future
1165     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1166     /// preferred, as this supports polling from multiple tasks at once.
1167     ///
1168     /// # Return value
1169     ///
1170     /// The function returns:
1171     ///
1172     /// * `Poll::Pending` if the pipe is not ready for reading.
1173     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1174     /// * `Poll::Ready(Err(e))` if an error is encountered.
1175     ///
1176     /// # Errors
1177     ///
1178     /// This function may encounter any standard I/O error except `WouldBlock`.
1179     ///
1180     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1181     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1182         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1183     }
1184 
1185     /// Tries to read data from the pipe into the provided buffer, returning how
1186     /// many bytes were read.
1187     ///
1188     /// Receives any pending data from the pipe but does not wait for new data
1189     /// to arrive. On success, returns the number of bytes read. Because
1190     /// `try_read()` is non-blocking, the buffer does not have to be stored by
1191     /// the async task and can exist entirely on the stack.
1192     ///
1193     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1194     ///
1195     /// [`readable()`]: NamedPipeClient::readable()
1196     /// [`ready()`]: NamedPipeClient::ready()
1197     ///
1198     /// # Return
1199     ///
1200     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1201     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1202     ///
1203     /// 1. The pipe's read half is closed and will no longer yield data.
1204     /// 2. The specified buffer was 0 bytes in length.
1205     ///
1206     /// If the pipe is not ready to read data,
1207     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1208     ///
1209     /// # Examples
1210     ///
1211     /// ```no_run
1212     /// use tokio::net::windows::named_pipe;
1213     /// use std::error::Error;
1214     /// use std::io;
1215     ///
1216     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1217     ///
1218     /// #[tokio::main]
1219     /// async fn main() -> Result<(), Box<dyn Error>> {
1220     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1221     ///
1222     ///     loop {
1223     ///         // Wait for the pipe to be readable
1224     ///         client.readable().await?;
1225     ///
1226     ///         // Creating the buffer **after** the `await` prevents it from
1227     ///         // being stored in the async task.
1228     ///         let mut buf = [0; 4096];
1229     ///
1230     ///         // Try to read data, this may still fail with `WouldBlock`
1231     ///         // if the readiness event is a false positive.
1232     ///         match client.try_read(&mut buf) {
1233     ///             Ok(0) => break,
1234     ///             Ok(n) => {
1235     ///                 println!("read {} bytes", n);
1236     ///             }
1237     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1238     ///                 continue;
1239     ///             }
1240     ///             Err(e) => {
1241     ///                 return Err(e.into());
1242     ///             }
1243     ///         }
1244     ///     }
1245     ///
1246     ///     Ok(())
1247     /// }
1248     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1249     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1250         self.io
1251             .registration()
1252             .try_io(Interest::READABLE, || (&*self.io).read(buf))
1253     }
1254 
1255     /// Tries to read data from the pipe into the provided buffers, returning
1256     /// how many bytes were read.
1257     ///
1258     /// Data is copied to fill each buffer in order, with the final buffer
1259     /// written to possibly being only partially filled. This method behaves
1260     /// equivalently to a single call to [`try_read()`] with concatenated
1261     /// buffers.
1262     ///
1263     /// Receives any pending data from the pipe but does not wait for new data
1264     /// to arrive. On success, returns the number of bytes read. Because
1265     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1266     /// stored by the async task and can exist entirely on the stack.
1267     ///
1268     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1269     ///
1270     /// [`try_read()`]: NamedPipeClient::try_read()
1271     /// [`readable()`]: NamedPipeClient::readable()
1272     /// [`ready()`]: NamedPipeClient::ready()
1273     ///
1274     /// # Return
1275     ///
1276     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1277     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1278     /// and will no longer yield data. If the pipe is not ready to read data
1279     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1280     ///
1281     /// # Examples
1282     ///
1283     /// ```no_run
1284     /// use tokio::net::windows::named_pipe;
1285     /// use std::error::Error;
1286     /// use std::io::{self, IoSliceMut};
1287     ///
1288     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1289     ///
1290     /// #[tokio::main]
1291     /// async fn main() -> Result<(), Box<dyn Error>> {
1292     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1293     ///
1294     ///     loop {
1295     ///         // Wait for the pipe to be readable
1296     ///         client.readable().await?;
1297     ///
1298     ///         // Creating the buffer **after** the `await` prevents it from
1299     ///         // being stored in the async task.
1300     ///         let mut buf_a = [0; 512];
1301     ///         let mut buf_b = [0; 1024];
1302     ///         let mut bufs = [
1303     ///             IoSliceMut::new(&mut buf_a),
1304     ///             IoSliceMut::new(&mut buf_b),
1305     ///         ];
1306     ///
1307     ///         // Try to read data, this may still fail with `WouldBlock`
1308     ///         // if the readiness event is a false positive.
1309     ///         match client.try_read_vectored(&mut bufs) {
1310     ///             Ok(0) => break,
1311     ///             Ok(n) => {
1312     ///                 println!("read {} bytes", n);
1313     ///             }
1314     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1315     ///                 continue;
1316     ///             }
1317     ///             Err(e) => {
1318     ///                 return Err(e.into());
1319     ///             }
1320     ///         }
1321     ///     }
1322     ///
1323     ///     Ok(())
1324     /// }
1325     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1326     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1327         self.io
1328             .registration()
1329             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1330     }
1331 
1332     cfg_io_util! {
1333         /// Tries to read data from the stream into the provided buffer, advancing the
1334         /// buffer's internal cursor, returning how many bytes were read.
1335         ///
1336         /// Receives any pending data from the pipe but does not wait for new data
1337         /// to arrive. On success, returns the number of bytes read. Because
1338         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1339         /// the async task and can exist entirely on the stack.
1340         ///
1341         /// Usually, [`readable()`] or [`ready()`] is used with this function.
1342         ///
1343         /// [`readable()`]: NamedPipeClient::readable()
1344         /// [`ready()`]: NamedPipeClient::ready()
1345         ///
1346         /// # Return
1347         ///
1348         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1349         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1350         /// and will no longer yield data. If the stream is not ready to read data
1351         /// `Err(io::ErrorKind::WouldBlock)` is returned.
1352         ///
1353         /// # Examples
1354         ///
1355         /// ```no_run
1356         /// use tokio::net::windows::named_pipe;
1357         /// use std::error::Error;
1358         /// use std::io;
1359         ///
1360         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1361         ///
1362         /// #[tokio::main]
1363         /// async fn main() -> Result<(), Box<dyn Error>> {
1364         ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1365         ///
1366         ///     loop {
1367         ///         // Wait for the pipe to be readable
1368         ///         client.readable().await?;
1369         ///
1370         ///         let mut buf = Vec::with_capacity(4096);
1371         ///
1372         ///         // Try to read data, this may still fail with `WouldBlock`
1373         ///         // if the readiness event is a false positive.
1374         ///         match client.try_read_buf(&mut buf) {
1375         ///             Ok(0) => break,
1376         ///             Ok(n) => {
1377         ///                 println!("read {} bytes", n);
1378         ///             }
1379         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1380         ///                 continue;
1381         ///             }
1382         ///             Err(e) => {
1383         ///                 return Err(e.into());
1384         ///             }
1385         ///         }
1386         ///     }
1387         ///
1388         ///     Ok(())
1389         /// }
1390         /// ```
1391         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1392             self.io.registration().try_io(Interest::READABLE, || {
1393                 use std::io::Read;
1394 
1395                 let dst = buf.chunk_mut();
1396                 let dst =
1397                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1398 
1399                 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1400                 // buffer.
1401                 let n = (&*self.io).read(dst)?;
1402 
1403                 unsafe {
1404                     buf.advance_mut(n);
1405                 }
1406 
1407                 Ok(n)
1408             })
1409         }
1410     }
1411 
1412     /// Waits for the pipe to become writable.
1413     ///
1414     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1415     /// paired with `try_write()`.
1416     ///
1417     /// # Examples
1418     ///
1419     /// ```no_run
1420     /// use tokio::net::windows::named_pipe;
1421     /// use std::error::Error;
1422     /// use std::io;
1423     ///
1424     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1425     ///
1426     /// #[tokio::main]
1427     /// async fn main() -> Result<(), Box<dyn Error>> {
1428     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1429     ///
1430     ///     loop {
1431     ///         // Wait for the pipe to be writable
1432     ///         client.writable().await?;
1433     ///
1434     ///         // Try to write data, this may still fail with `WouldBlock`
1435     ///         // if the readiness event is a false positive.
1436     ///         match client.try_write(b"hello world") {
1437     ///             Ok(n) => {
1438     ///                 break;
1439     ///             }
1440     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1441     ///                 continue;
1442     ///             }
1443     ///             Err(e) => {
1444     ///                 return Err(e.into());
1445     ///             }
1446     ///         }
1447     ///     }
1448     ///
1449     ///     Ok(())
1450     /// }
1451     /// ```
writable(&self) -> io::Result<()>1452     pub async fn writable(&self) -> io::Result<()> {
1453         self.ready(Interest::WRITABLE).await?;
1454         Ok(())
1455     }
1456 
1457     /// Polls for write readiness.
1458     ///
1459     /// If the pipe is not currently ready for writing, this method will
1460     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1461     /// becomes ready for writing, `Waker::wake` will be called on the waker.
1462     ///
1463     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1464     /// the `Waker` from the `Context` passed to the most recent call is
1465     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1466     /// second, independent waker.)
1467     ///
1468     /// This function is intended for cases where creating and pinning a future
1469     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1470     /// preferred, as this supports polling from multiple tasks at once.
1471     ///
1472     /// # Return value
1473     ///
1474     /// The function returns:
1475     ///
1476     /// * `Poll::Pending` if the pipe is not ready for writing.
1477     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1478     /// * `Poll::Ready(Err(e))` if an error is encountered.
1479     ///
1480     /// # Errors
1481     ///
1482     /// This function may encounter any standard I/O error except `WouldBlock`.
1483     ///
1484     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1485     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1486         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1487     }
1488 
1489     /// Tries to write a buffer to the pipe, returning how many bytes were
1490     /// written.
1491     ///
1492     /// The function will attempt to write the entire contents of `buf`, but
1493     /// only part of the buffer may be written.
1494     ///
1495     /// This function is usually paired with `writable()`.
1496     ///
1497     /// # Return
1498     ///
1499     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1500     /// number of bytes written. If the pipe is not ready to write data,
1501     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1502     ///
1503     /// # Examples
1504     ///
1505     /// ```no_run
1506     /// use tokio::net::windows::named_pipe;
1507     /// use std::error::Error;
1508     /// use std::io;
1509     ///
1510     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1511     ///
1512     /// #[tokio::main]
1513     /// async fn main() -> Result<(), Box<dyn Error>> {
1514     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1515     ///
1516     ///     loop {
1517     ///         // Wait for the pipe to be writable
1518     ///         client.writable().await?;
1519     ///
1520     ///         // Try to write data, this may still fail with `WouldBlock`
1521     ///         // if the readiness event is a false positive.
1522     ///         match client.try_write(b"hello world") {
1523     ///             Ok(n) => {
1524     ///                 break;
1525     ///             }
1526     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1527     ///                 continue;
1528     ///             }
1529     ///             Err(e) => {
1530     ///                 return Err(e.into());
1531     ///             }
1532     ///         }
1533     ///     }
1534     ///
1535     ///     Ok(())
1536     /// }
1537     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1538     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1539         self.io
1540             .registration()
1541             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1542     }
1543 
1544     /// Tries to write several buffers to the pipe, returning how many bytes
1545     /// were written.
1546     ///
1547     /// Data is written from each buffer in order, with the final buffer read
1548     /// from possible being only partially consumed. This method behaves
1549     /// equivalently to a single call to [`try_write()`] with concatenated
1550     /// buffers.
1551     ///
1552     /// This function is usually paired with `writable()`.
1553     ///
1554     /// [`try_write()`]: NamedPipeClient::try_write()
1555     ///
1556     /// # Return
1557     ///
1558     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1559     /// number of bytes written. If the pipe is not ready to write data,
1560     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1561     ///
1562     /// # Examples
1563     ///
1564     /// ```no_run
1565     /// use tokio::net::windows::named_pipe;
1566     /// use std::error::Error;
1567     /// use std::io;
1568     ///
1569     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1570     ///
1571     /// #[tokio::main]
1572     /// async fn main() -> Result<(), Box<dyn Error>> {
1573     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1574     ///
1575     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1576     ///
1577     ///     loop {
1578     ///         // Wait for the pipe to be writable
1579     ///         client.writable().await?;
1580     ///
1581     ///         // Try to write data, this may still fail with `WouldBlock`
1582     ///         // if the readiness event is a false positive.
1583     ///         match client.try_write_vectored(&bufs) {
1584     ///             Ok(n) => {
1585     ///                 break;
1586     ///             }
1587     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1588     ///                 continue;
1589     ///             }
1590     ///             Err(e) => {
1591     ///                 return Err(e.into());
1592     ///             }
1593     ///         }
1594     ///     }
1595     ///
1596     ///     Ok(())
1597     /// }
1598     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1599     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1600         self.io
1601             .registration()
1602             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1603     }
1604 
1605     /// Tries to read or write from the pipe using a user-provided IO operation.
1606     ///
1607     /// If the pipe is ready, the provided closure is called. The closure
1608     /// should attempt to perform IO operation from the pipe by manually
1609     /// calling the appropriate syscall. If the operation fails because the
1610     /// pipe is not actually ready, then the closure should return a
1611     /// `WouldBlock` error and the readiness flag is cleared. The return value
1612     /// of the closure is then returned by `try_io`.
1613     ///
1614     /// If the pipe is not ready, then the closure is not called
1615     /// and a `WouldBlock` error is returned.
1616     ///
1617     /// The closure should only return a `WouldBlock` error if it has performed
1618     /// an IO operation on the pipe that failed due to the pipe not being
1619     /// ready. Returning a `WouldBlock` error in any other situation will
1620     /// incorrectly clear the readiness flag, which can cause the pipe to
1621     /// behave incorrectly.
1622     ///
1623     /// The closure should not perform the IO operation using any of the methods
1624     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1625     /// readiness flag and can cause the pipe to behave incorrectly.
1626     ///
1627     /// This method is not intended to be used with combined interests.
1628     /// The closure should perform only one type of IO operation, so it should not
1629     /// require more than one ready state. This method may panic or sleep forever
1630     /// if it is called with a combined interest.
1631     ///
1632     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1633     ///
1634     /// [`readable()`]: NamedPipeClient::readable()
1635     /// [`writable()`]: NamedPipeClient::writable()
1636     /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1637     pub fn try_io<R>(
1638         &self,
1639         interest: Interest,
1640         f: impl FnOnce() -> io::Result<R>,
1641     ) -> io::Result<R> {
1642         self.io.registration().try_io(interest, f)
1643     }
1644 
1645     /// Reads or writes from the pipe using a user-provided IO operation.
1646     ///
1647     /// The readiness of the pipe is awaited and when the pipe is ready,
1648     /// the provided closure is called. The closure should attempt to perform
1649     /// IO operation on the pipe by manually calling the appropriate syscall.
1650     /// If the operation fails because the pipe is not actually ready,
1651     /// then the closure should return a `WouldBlock` error. In such case the
1652     /// readiness flag is cleared and the pipe readiness is awaited again.
1653     /// This loop is repeated until the closure returns an `Ok` or an error
1654     /// other than `WouldBlock`.
1655     ///
1656     /// The closure should only return a `WouldBlock` error if it has performed
1657     /// an IO operation on the pipe that failed due to the pipe not being
1658     /// ready. Returning a `WouldBlock` error in any other situation will
1659     /// incorrectly clear the readiness flag, which can cause the pipe to
1660     /// behave incorrectly.
1661     ///
1662     /// The closure should not perform the IO operation using any of the methods
1663     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1664     /// readiness flag and can cause the pipe to behave incorrectly.
1665     ///
1666     /// This method is not intended to be used with combined interests.
1667     /// The closure should perform only one type of IO operation, so it should not
1668     /// require more than one ready state. This method may panic or sleep forever
1669     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1670     pub async fn async_io<R>(
1671         &self,
1672         interest: Interest,
1673         f: impl FnMut() -> io::Result<R>,
1674     ) -> io::Result<R> {
1675         self.io.registration().async_io(interest, f).await
1676     }
1677 }
1678 
1679 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1680     fn poll_read(
1681         self: Pin<&mut Self>,
1682         cx: &mut Context<'_>,
1683         buf: &mut ReadBuf<'_>,
1684     ) -> Poll<io::Result<()>> {
1685         unsafe { self.io.poll_read(cx, buf) }
1686     }
1687 }
1688 
1689 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1690     fn poll_write(
1691         self: Pin<&mut Self>,
1692         cx: &mut Context<'_>,
1693         buf: &[u8],
1694     ) -> Poll<io::Result<usize>> {
1695         self.io.poll_write(cx, buf)
1696     }
1697 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1698     fn poll_write_vectored(
1699         self: Pin<&mut Self>,
1700         cx: &mut Context<'_>,
1701         bufs: &[io::IoSlice<'_>],
1702     ) -> Poll<io::Result<usize>> {
1703         self.io.poll_write_vectored(cx, bufs)
1704     }
1705 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1706     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1707         Poll::Ready(Ok(()))
1708     }
1709 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1710     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1711         self.poll_flush(cx)
1712     }
1713 }
1714 
1715 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1716     fn as_raw_handle(&self) -> RawHandle {
1717         self.io.as_raw_handle()
1718     }
1719 }
1720 
1721 impl AsHandle for NamedPipeClient {
as_handle(&self) -> BorrowedHandle<'_>1722     fn as_handle(&self) -> BorrowedHandle<'_> {
1723         unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1724     }
1725 }
1726 
1727 /// A builder structure for construct a named pipe with named pipe-specific
1728 /// options. This is required to use for named pipe servers who wants to modify
1729 /// pipe-related options.
1730 ///
1731 /// See [`ServerOptions::create`].
1732 #[derive(Debug, Clone)]
1733 pub struct ServerOptions {
1734     // dwOpenMode
1735     access_inbound: bool,
1736     access_outbound: bool,
1737     first_pipe_instance: bool,
1738     write_dac: bool,
1739     write_owner: bool,
1740     access_system_security: bool,
1741     // dwPipeMode
1742     pipe_mode: PipeMode,
1743     reject_remote_clients: bool,
1744     // other options
1745     max_instances: u32,
1746     out_buffer_size: u32,
1747     in_buffer_size: u32,
1748     default_timeout: u32,
1749 }
1750 
1751 impl ServerOptions {
1752     /// Creates a new named pipe builder with the default settings.
1753     ///
1754     /// ```
1755     /// use tokio::net::windows::named_pipe::ServerOptions;
1756     ///
1757     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1758     ///
1759     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1760     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1761     /// # Ok(()) }
1762     /// ```
new() -> ServerOptions1763     pub fn new() -> ServerOptions {
1764         ServerOptions {
1765             access_inbound: true,
1766             access_outbound: true,
1767             first_pipe_instance: false,
1768             write_dac: false,
1769             write_owner: false,
1770             access_system_security: false,
1771             pipe_mode: PipeMode::Byte,
1772             reject_remote_clients: true,
1773             max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1774             out_buffer_size: 65536,
1775             in_buffer_size: 65536,
1776             default_timeout: 0,
1777         }
1778     }
1779 
1780     /// The pipe mode.
1781     ///
1782     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1783     /// documentation of what each mode means.
1784     ///
1785     /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in  [`dwPipeMode`].
1786     ///
1787     /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1788     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1789         self.pipe_mode = pipe_mode;
1790         self
1791     }
1792 
1793     /// The flow of data in the pipe goes from client to server only.
1794     ///
1795     /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1796     ///
1797     /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1798     ///
1799     /// # Errors
1800     ///
1801     /// Server side prevents connecting by denying inbound access, client errors
1802     /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1803     /// the connection.
1804     ///
1805     /// ```
1806     /// use std::io;
1807     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1808     ///
1809     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1810     ///
1811     /// # #[tokio::main] async fn main() -> io::Result<()> {
1812     /// let _server = ServerOptions::new()
1813     ///     .access_inbound(false)
1814     ///     .create(PIPE_NAME)?;
1815     ///
1816     /// let e = ClientOptions::new()
1817     ///     .open(PIPE_NAME)
1818     ///     .unwrap_err();
1819     ///
1820     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1821     /// # Ok(()) }
1822     /// ```
1823     ///
1824     /// Disabling writing allows a client to connect, but errors with
1825     /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1826     ///
1827     /// ```
1828     /// use std::io;
1829     /// use tokio::io::AsyncWriteExt;
1830     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1831     ///
1832     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1833     ///
1834     /// # #[tokio::main] async fn main() -> io::Result<()> {
1835     /// let server = ServerOptions::new()
1836     ///     .access_inbound(false)
1837     ///     .create(PIPE_NAME)?;
1838     ///
1839     /// let mut client = ClientOptions::new()
1840     ///     .write(false)
1841     ///     .open(PIPE_NAME)?;
1842     ///
1843     /// server.connect().await?;
1844     ///
1845     /// let e = client.write(b"ping").await.unwrap_err();
1846     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1847     /// # Ok(()) }
1848     /// ```
1849     ///
1850     /// # Examples
1851     ///
1852     /// A unidirectional named pipe that only supports server-to-client
1853     /// communication.
1854     ///
1855     /// ```
1856     /// use std::io;
1857     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1858     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1859     ///
1860     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1861     ///
1862     /// # #[tokio::main] async fn main() -> io::Result<()> {
1863     /// let mut server = ServerOptions::new()
1864     ///     .access_inbound(false)
1865     ///     .create(PIPE_NAME)?;
1866     ///
1867     /// let mut client = ClientOptions::new()
1868     ///     .write(false)
1869     ///     .open(PIPE_NAME)?;
1870     ///
1871     /// server.connect().await?;
1872     ///
1873     /// let write = server.write_all(b"ping");
1874     ///
1875     /// let mut buf = [0u8; 4];
1876     /// let read = client.read_exact(&mut buf);
1877     ///
1878     /// let ((), read) = tokio::try_join!(write, read)?;
1879     ///
1880     /// assert_eq!(read, 4);
1881     /// assert_eq!(&buf[..], b"ping");
1882     /// # Ok(()) }
1883     /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1884     pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1885         self.access_inbound = allowed;
1886         self
1887     }
1888 
1889     /// The flow of data in the pipe goes from server to client only.
1890     ///
1891     /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1892     ///
1893     /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1894     ///
1895     /// # Errors
1896     ///
1897     /// Server side prevents connecting by denying outbound access, client
1898     /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1899     /// create the connection.
1900     ///
1901     /// ```
1902     /// use std::io;
1903     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1904     ///
1905     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1906     ///
1907     /// # #[tokio::main] async fn main() -> io::Result<()> {
1908     /// let server = ServerOptions::new()
1909     ///     .access_outbound(false)
1910     ///     .create(PIPE_NAME)?;
1911     ///
1912     /// let e = ClientOptions::new()
1913     ///     .open(PIPE_NAME)
1914     ///     .unwrap_err();
1915     ///
1916     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1917     /// # Ok(()) }
1918     /// ```
1919     ///
1920     /// Disabling reading allows a client to connect, but attempting to read
1921     /// will error with [`std::io::ErrorKind::PermissionDenied`].
1922     ///
1923     /// ```
1924     /// use std::io;
1925     /// use tokio::io::AsyncReadExt;
1926     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1927     ///
1928     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1929     ///
1930     /// # #[tokio::main] async fn main() -> io::Result<()> {
1931     /// let server = ServerOptions::new()
1932     ///     .access_outbound(false)
1933     ///     .create(PIPE_NAME)?;
1934     ///
1935     /// let mut client = ClientOptions::new()
1936     ///     .read(false)
1937     ///     .open(PIPE_NAME)?;
1938     ///
1939     /// server.connect().await?;
1940     ///
1941     /// let mut buf = [0u8; 4];
1942     /// let e = client.read(&mut buf).await.unwrap_err();
1943     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1944     /// # Ok(()) }
1945     /// ```
1946     ///
1947     /// # Examples
1948     ///
1949     /// A unidirectional named pipe that only supports client-to-server
1950     /// communication.
1951     ///
1952     /// ```
1953     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1954     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1955     ///
1956     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1957     ///
1958     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1959     /// let mut server = ServerOptions::new()
1960     ///     .access_outbound(false)
1961     ///     .create(PIPE_NAME)?;
1962     ///
1963     /// let mut client = ClientOptions::new()
1964     ///     .read(false)
1965     ///     .open(PIPE_NAME)?;
1966     ///
1967     /// server.connect().await?;
1968     ///
1969     /// let write = client.write_all(b"ping");
1970     ///
1971     /// let mut buf = [0u8; 4];
1972     /// let read = server.read_exact(&mut buf);
1973     ///
1974     /// let ((), read) = tokio::try_join!(write, read)?;
1975     ///
1976     /// println!("done reading and writing");
1977     ///
1978     /// assert_eq!(read, 4);
1979     /// assert_eq!(&buf[..], b"ping");
1980     /// # Ok(()) }
1981     /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1982     pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1983         self.access_outbound = allowed;
1984         self
1985     }
1986 
1987     /// If you attempt to create multiple instances of a pipe with this flag
1988     /// set, creation of the first server instance succeeds, but creation of any
1989     /// subsequent instances will fail with
1990     /// [`std::io::ErrorKind::PermissionDenied`].
1991     ///
1992     /// This option is intended to be used with servers that want to ensure that
1993     /// they are the only process listening for clients on a given named pipe.
1994     /// This is accomplished by enabling it for the first server instance
1995     /// created in a process.
1996     ///
1997     /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1998     ///
1999     /// # Errors
2000     ///
2001     /// If this option is set and more than one instance of the server for a
2002     /// given named pipe exists, calling [`create`] will fail with
2003     /// [`std::io::ErrorKind::PermissionDenied`].
2004     ///
2005     /// ```
2006     /// use std::io;
2007     /// use tokio::net::windows::named_pipe::ServerOptions;
2008     ///
2009     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2010     ///
2011     /// # #[tokio::main] async fn main() -> io::Result<()> {
2012     /// let server1 = ServerOptions::new()
2013     ///     .first_pipe_instance(true)
2014     ///     .create(PIPE_NAME)?;
2015     ///
2016     /// // Second server errs, since it's not the first instance.
2017     /// let e = ServerOptions::new()
2018     ///     .first_pipe_instance(true)
2019     ///     .create(PIPE_NAME)
2020     ///     .unwrap_err();
2021     ///
2022     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2023     /// # Ok(()) }
2024     /// ```
2025     ///
2026     /// # Examples
2027     ///
2028     /// ```
2029     /// use std::io;
2030     /// use tokio::net::windows::named_pipe::ServerOptions;
2031     ///
2032     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2033     ///
2034     /// # #[tokio::main] async fn main() -> io::Result<()> {
2035     /// let mut builder = ServerOptions::new();
2036     /// builder.first_pipe_instance(true);
2037     ///
2038     /// let server = builder.create(PIPE_NAME)?;
2039     /// let e = builder.create(PIPE_NAME).unwrap_err();
2040     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2041     /// drop(server);
2042     ///
2043     /// // OK: since, we've closed the other instance.
2044     /// let _server2 = builder.create(PIPE_NAME)?;
2045     /// # Ok(()) }
2046     /// ```
2047     ///
2048     /// [`create`]: ServerOptions::create
2049     /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self2050     pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2051         self.first_pipe_instance = first;
2052         self
2053     }
2054 
2055     /// Requests permission to modify the pipe's discretionary access control list.
2056     ///
2057     /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2058     ///
2059     /// # Examples
2060     ///
2061     /// ```
2062     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2063     ///
2064     /// use tokio::net::windows::named_pipe::ServerOptions;
2065     /// use windows_sys::{
2066     ///     Win32::Foundation::ERROR_SUCCESS,
2067     ///     Win32::Security::DACL_SECURITY_INFORMATION,
2068     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2069     /// };
2070     ///
2071     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2072     ///
2073     /// # #[tokio::main] async fn main() -> io::Result<()> {
2074     /// let mut pipe_template = ServerOptions::new();
2075     /// pipe_template.write_dac(true);
2076     /// let pipe = pipe_template.create(PIPE_NAME)?;
2077     ///
2078     /// unsafe {
2079     ///     assert_eq!(
2080     ///         ERROR_SUCCESS,
2081     ///         SetSecurityInfo(
2082     ///             pipe.as_raw_handle() as _,
2083     ///             SE_KERNEL_OBJECT,
2084     ///             DACL_SECURITY_INFORMATION,
2085     ///             ptr::null_mut(),
2086     ///             ptr::null_mut(),
2087     ///             ptr::null_mut(),
2088     ///             ptr::null_mut(),
2089     ///         )
2090     ///     );
2091     /// }
2092     ///
2093     /// # Ok(()) }
2094     /// ```
2095     ///
2096     /// ```
2097     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2098     ///
2099     /// use tokio::net::windows::named_pipe::ServerOptions;
2100     /// use windows_sys::{
2101     ///     Win32::Foundation::ERROR_ACCESS_DENIED,
2102     ///     Win32::Security::DACL_SECURITY_INFORMATION,
2103     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2104     /// };
2105     ///
2106     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2107     ///
2108     /// # #[tokio::main] async fn main() -> io::Result<()> {
2109     /// let mut pipe_template = ServerOptions::new();
2110     /// pipe_template.write_dac(false);
2111     /// let pipe = pipe_template.create(PIPE_NAME)?;
2112     ///
2113     /// unsafe {
2114     ///     assert_eq!(
2115     ///         ERROR_ACCESS_DENIED,
2116     ///         SetSecurityInfo(
2117     ///             pipe.as_raw_handle() as _,
2118     ///             SE_KERNEL_OBJECT,
2119     ///             DACL_SECURITY_INFORMATION,
2120     ///             ptr::null_mut(),
2121     ///             ptr::null_mut(),
2122     ///             ptr::null_mut(),
2123     ///             ptr::null_mut(),
2124     ///         )
2125     ///     );
2126     /// }
2127     ///
2128     /// # Ok(()) }
2129     /// ```
2130     ///
2131     /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_dac(&mut self, requested: bool) -> &mut Self2132     pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2133         self.write_dac = requested;
2134         self
2135     }
2136 
2137     /// Requests permission to modify the pipe's owner.
2138     ///
2139     /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2140     ///
2141     /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_owner(&mut self, requested: bool) -> &mut Self2142     pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2143         self.write_owner = requested;
2144         self
2145     }
2146 
2147     /// Requests permission to modify the pipe's system access control list.
2148     ///
2149     /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2150     ///
2151     /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
access_system_security(&mut self, requested: bool) -> &mut Self2152     pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2153         self.access_system_security = requested;
2154         self
2155     }
2156 
2157     /// Indicates whether this server can accept remote clients or not. Remote
2158     /// clients are disabled by default.
2159     ///
2160     /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2161     ///
2162     /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self2163     pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2164         self.reject_remote_clients = reject;
2165         self
2166     }
2167 
2168     /// The maximum number of instances that can be created for this pipe. The
2169     /// first instance of the pipe can specify this value; the same number must
2170     /// be specified for other instances of the pipe. Acceptable values are in
2171     /// the range 1 through 254. The default value is unlimited.
2172     ///
2173     /// This corresponds to specifying [`nMaxInstances`].
2174     ///
2175     /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2176     ///
2177     /// # Errors
2178     ///
2179     /// The same numbers of `max_instances` have to be used by all servers. Any
2180     /// additional servers trying to be built which uses a mismatching value
2181     /// might error.
2182     ///
2183     /// ```
2184     /// use std::io;
2185     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2186     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2187     ///
2188     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2189     ///
2190     /// # #[tokio::main] async fn main() -> io::Result<()> {
2191     /// let mut server = ServerOptions::new();
2192     /// server.max_instances(2);
2193     ///
2194     /// let s1 = server.create(PIPE_NAME)?;
2195     /// let c1 = ClientOptions::new().open(PIPE_NAME);
2196     ///
2197     /// let s2 = server.create(PIPE_NAME)?;
2198     /// let c2 = ClientOptions::new().open(PIPE_NAME);
2199     ///
2200     /// // Too many servers!
2201     /// let e = server.create(PIPE_NAME).unwrap_err();
2202     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2203     ///
2204     /// // Still too many servers even if we specify a higher value!
2205     /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2206     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2207     /// # Ok(()) }
2208     /// ```
2209     ///
2210     /// # Panics
2211     ///
2212     /// This function will panic if more than 254 instances are specified. If
2213     /// you do not wish to set an instance limit, leave it unspecified.
2214     ///
2215     /// ```should_panic
2216     /// use tokio::net::windows::named_pipe::ServerOptions;
2217     ///
2218     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2219     /// let builder = ServerOptions::new().max_instances(255);
2220     /// # Ok(()) }
2221     /// ```
2222     #[track_caller]
max_instances(&mut self, instances: usize) -> &mut Self2223     pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2224         assert!(instances < 255, "cannot specify more than 254 instances");
2225         self.max_instances = instances as u32;
2226         self
2227     }
2228 
2229     /// The number of bytes to reserve for the output buffer.
2230     ///
2231     /// This corresponds to specifying [`nOutBufferSize`].
2232     ///
2233     /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self2234     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2235         self.out_buffer_size = buffer;
2236         self
2237     }
2238 
2239     /// The number of bytes to reserve for the input buffer.
2240     ///
2241     /// This corresponds to specifying [`nInBufferSize`].
2242     ///
2243     /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self2244     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2245         self.in_buffer_size = buffer;
2246         self
2247     }
2248 
2249     /// Creates the named pipe identified by `addr` for use as a server.
2250     ///
2251     /// This uses the [`CreateNamedPipe`] function.
2252     ///
2253     /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2254     ///
2255     /// # Errors
2256     ///
2257     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2258     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2259     ///
2260     /// [Tokio Runtime]: crate::runtime::Runtime
2261     /// [enabled I/O]: crate::runtime::Builder::enable_io
2262     ///
2263     /// # Examples
2264     ///
2265     /// ```
2266     /// use tokio::net::windows::named_pipe::ServerOptions;
2267     ///
2268     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2269     ///
2270     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2271     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2272     /// # Ok(()) }
2273     /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>2274     pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2275         // Safety: We're calling create_with_security_attributes_raw w/ a null
2276         // pointer which disables it.
2277         unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2278     }
2279 
2280     /// Creates the named pipe identified by `addr` for use as a server.
2281     ///
2282     /// This is the same as [`create`] except that it supports providing the raw
2283     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2284     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2285     ///
2286     /// # Errors
2287     ///
2288     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2289     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2290     ///
2291     /// [Tokio Runtime]: crate::runtime::Runtime
2292     /// [enabled I/O]: crate::runtime::Builder::enable_io
2293     ///
2294     /// # Safety
2295     ///
2296     /// The `attrs` argument must either be null or point at a valid instance of
2297     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2298     /// behavior is identical to calling the [`create`] method.
2299     ///
2300     /// [`create`]: ServerOptions::create
2301     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2302     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>2303     pub unsafe fn create_with_security_attributes_raw(
2304         &self,
2305         addr: impl AsRef<OsStr>,
2306         attrs: *mut c_void,
2307     ) -> io::Result<NamedPipeServer> {
2308         let addr = encode_addr(addr);
2309 
2310         let pipe_mode = {
2311             let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2312                 windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2313             } else {
2314                 windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2315             };
2316             if self.reject_remote_clients {
2317                 mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2318             } else {
2319                 mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2320             }
2321             mode
2322         };
2323         let open_mode = {
2324             let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2325             if self.access_inbound {
2326                 mode |= windows_sys::PIPE_ACCESS_INBOUND;
2327             }
2328             if self.access_outbound {
2329                 mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2330             }
2331             if self.first_pipe_instance {
2332                 mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2333             }
2334             if self.write_dac {
2335                 mode |= windows_sys::WRITE_DAC;
2336             }
2337             if self.write_owner {
2338                 mode |= windows_sys::WRITE_OWNER;
2339             }
2340             if self.access_system_security {
2341                 mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2342             }
2343             mode
2344         };
2345 
2346         let h = windows_sys::CreateNamedPipeW(
2347             addr.as_ptr(),
2348             open_mode,
2349             pipe_mode,
2350             self.max_instances,
2351             self.out_buffer_size,
2352             self.in_buffer_size,
2353             self.default_timeout,
2354             attrs as *mut _,
2355         );
2356 
2357         if h == windows_sys::INVALID_HANDLE_VALUE {
2358             return Err(io::Error::last_os_error());
2359         }
2360 
2361         NamedPipeServer::from_raw_handle(h as _)
2362     }
2363 }
2364 
2365 /// A builder suitable for building and interacting with named pipes from the
2366 /// client side.
2367 ///
2368 /// See [`ClientOptions::open`].
2369 #[derive(Debug, Clone)]
2370 pub struct ClientOptions {
2371     generic_read: bool,
2372     generic_write: bool,
2373     security_qos_flags: u32,
2374     pipe_mode: PipeMode,
2375 }
2376 
2377 impl ClientOptions {
2378     /// Creates a new named pipe builder with the default settings.
2379     ///
2380     /// ```
2381     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2382     ///
2383     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2384     ///
2385     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2386     /// // Server must be created in order for the client creation to succeed.
2387     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2388     /// let client = ClientOptions::new().open(PIPE_NAME)?;
2389     /// # Ok(()) }
2390     /// ```
new() -> Self2391     pub fn new() -> Self {
2392         Self {
2393             generic_read: true,
2394             generic_write: true,
2395             security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2396                 | windows_sys::SECURITY_SQOS_PRESENT,
2397             pipe_mode: PipeMode::Byte,
2398         }
2399     }
2400 
2401     /// If the client supports reading data. This is enabled by default.
2402     ///
2403     /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2404     ///
2405     /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2406     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2407     pub fn read(&mut self, allowed: bool) -> &mut Self {
2408         self.generic_read = allowed;
2409         self
2410     }
2411 
2412     /// If the created pipe supports writing data. This is enabled by default.
2413     ///
2414     /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2415     ///
2416     /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2417     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2418     pub fn write(&mut self, allowed: bool) -> &mut Self {
2419         self.generic_write = allowed;
2420         self
2421     }
2422 
2423     /// Sets qos flags which are combined with other flags and attributes in the
2424     /// call to [`CreateFile`].
2425     ///
2426     /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2427     /// calling this function would override that value completely with the
2428     /// argument specified.
2429     ///
2430     /// When `security_qos_flags` is not set, a malicious program can gain the
2431     /// elevated privileges of a privileged Rust process when it allows opening
2432     /// user-specified paths, by tricking it into opening a named pipe. So
2433     /// arguably `security_qos_flags` should also be set when opening arbitrary
2434     /// paths. However the bits can then conflict with other flags, specifically
2435     /// `FILE_FLAG_OPEN_NO_RECALL`.
2436     ///
2437     /// For information about possible values, see [Impersonation Levels] on the
2438     /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2439     /// automatically when using this method.
2440     ///
2441     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2442     /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2443     /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2444     pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2445         // See: https://github.com/rust-lang/rust/pull/58216
2446         self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2447         self
2448     }
2449 
2450     /// The pipe mode.
2451     ///
2452     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2453     /// documentation of what each mode means.
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self2454     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2455         self.pipe_mode = pipe_mode;
2456         self
2457     }
2458 
2459     /// Opens the named pipe identified by `addr`.
2460     ///
2461     /// This opens the client using [`CreateFile`] with the
2462     /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2463     ///
2464     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2465     ///
2466     /// # Errors
2467     ///
2468     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2469     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2470     ///
2471     /// There are a few errors you need to take into account when creating a
2472     /// named pipe on the client side:
2473     ///
2474     /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2475     ///   does not exist. Presumably the server is not up.
2476     /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2477     ///   but the server is not currently waiting for a connection. Please see the
2478     ///   examples for how to check for this error.
2479     ///
2480     /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2481     /// [enabled I/O]: crate::runtime::Builder::enable_io
2482     /// [Tokio Runtime]: crate::runtime::Runtime
2483     ///
2484     /// A connect loop that waits until a pipe becomes available looks like
2485     /// this:
2486     ///
2487     /// ```no_run
2488     /// use std::time::Duration;
2489     /// use tokio::net::windows::named_pipe::ClientOptions;
2490     /// use tokio::time;
2491     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2492     ///
2493     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2494     ///
2495     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2496     /// let client = loop {
2497     ///     match ClientOptions::new().open(PIPE_NAME) {
2498     ///         Ok(client) => break client,
2499     ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2500     ///         Err(e) => return Err(e),
2501     ///     }
2502     ///
2503     ///     time::sleep(Duration::from_millis(50)).await;
2504     /// };
2505     ///
2506     /// // use the connected client.
2507     /// # Ok(()) }
2508     /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2509     pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2510         // Safety: We're calling open_with_security_attributes_raw w/ a null
2511         // pointer which disables it.
2512         unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2513     }
2514 
2515     /// Opens the named pipe identified by `addr`.
2516     ///
2517     /// This is the same as [`open`] except that it supports providing the raw
2518     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2519     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2520     ///
2521     /// # Safety
2522     ///
2523     /// The `attrs` argument must either be null or point at a valid instance of
2524     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2525     /// behavior is identical to calling the [`open`] method.
2526     ///
2527     /// [`open`]: ClientOptions::open
2528     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2529     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2530     pub unsafe fn open_with_security_attributes_raw(
2531         &self,
2532         addr: impl AsRef<OsStr>,
2533         attrs: *mut c_void,
2534     ) -> io::Result<NamedPipeClient> {
2535         let addr = encode_addr(addr);
2536 
2537         let desired_access = {
2538             let mut access = 0;
2539             if self.generic_read {
2540                 access |= windows_sys::GENERIC_READ;
2541             }
2542             if self.generic_write {
2543                 access |= windows_sys::GENERIC_WRITE;
2544             }
2545             access
2546         };
2547 
2548         // NB: We could use a platform specialized `OpenOptions` here, but since
2549         // we have access to windows_sys it ultimately doesn't hurt to use
2550         // `CreateFile` explicitly since it allows the use of our already
2551         // well-structured wide `addr` to pass into CreateFileW.
2552         let h = windows_sys::CreateFileW(
2553             addr.as_ptr(),
2554             desired_access,
2555             0,
2556             attrs as *mut _,
2557             windows_sys::OPEN_EXISTING,
2558             self.get_flags(),
2559             0,
2560         );
2561 
2562         if h == windows_sys::INVALID_HANDLE_VALUE {
2563             return Err(io::Error::last_os_error());
2564         }
2565 
2566         if matches!(self.pipe_mode, PipeMode::Message) {
2567             let mode = windows_sys::PIPE_READMODE_MESSAGE;
2568             let result =
2569                 windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2570 
2571             if result == 0 {
2572                 return Err(io::Error::last_os_error());
2573             }
2574         }
2575 
2576         NamedPipeClient::from_raw_handle(h as _)
2577     }
2578 
get_flags(&self) -> u322579     fn get_flags(&self) -> u32 {
2580         self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2581     }
2582 }
2583 
2584 /// The pipe mode of a named pipe.
2585 ///
2586 /// Set through [`ServerOptions::pipe_mode`].
2587 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2588 #[non_exhaustive]
2589 pub enum PipeMode {
2590     /// Data is written to the pipe as a stream of bytes. The pipe does not
2591     /// distinguish bytes written during different write operations.
2592     ///
2593     /// Corresponds to [`PIPE_TYPE_BYTE`].
2594     ///
2595     /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2596     Byte,
2597     /// Data is written to the pipe as a stream of messages. The pipe treats the
2598     /// bytes written during each write operation as a message unit. Any reading
2599     /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2600     /// completely.
2601     ///
2602     /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2603     ///
2604     /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2605     /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2606     Message,
2607 }
2608 
2609 /// Indicates the end of a named pipe.
2610 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2611 #[non_exhaustive]
2612 pub enum PipeEnd {
2613     /// The named pipe refers to the client end of a named pipe instance.
2614     ///
2615     /// Corresponds to [`PIPE_CLIENT_END`].
2616     ///
2617     /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2618     Client,
2619     /// The named pipe refers to the server end of a named pipe instance.
2620     ///
2621     /// Corresponds to [`PIPE_SERVER_END`].
2622     ///
2623     /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2624     Server,
2625 }
2626 
2627 /// Information about a named pipe.
2628 ///
2629 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2630 #[derive(Debug, Clone)]
2631 #[non_exhaustive]
2632 pub struct PipeInfo {
2633     /// Indicates the mode of a named pipe.
2634     pub mode: PipeMode,
2635     /// Indicates the end of a named pipe.
2636     pub end: PipeEnd,
2637     /// The maximum number of instances that can be created for this pipe.
2638     pub max_instances: u32,
2639     /// The number of bytes to reserve for the output buffer.
2640     pub out_buffer_size: u32,
2641     /// The number of bytes to reserve for the input buffer.
2642     pub in_buffer_size: u32,
2643 }
2644 
2645 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2646 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2647     let len = addr.as_ref().encode_wide().count();
2648     let mut vec = Vec::with_capacity(len + 1);
2649     vec.extend(addr.as_ref().encode_wide());
2650     vec.push(0);
2651     vec.into_boxed_slice()
2652 }
2653 
2654 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2655 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2656     let mut flags = 0;
2657     let mut out_buffer_size = 0;
2658     let mut in_buffer_size = 0;
2659     let mut max_instances = 0;
2660 
2661     let result = windows_sys::GetNamedPipeInfo(
2662         handle as _,
2663         &mut flags,
2664         &mut out_buffer_size,
2665         &mut in_buffer_size,
2666         &mut max_instances,
2667     );
2668 
2669     if result == 0 {
2670         return Err(io::Error::last_os_error());
2671     }
2672 
2673     let mut end = PipeEnd::Client;
2674     let mut mode = PipeMode::Byte;
2675 
2676     if flags & windows_sys::PIPE_SERVER_END != 0 {
2677         end = PipeEnd::Server;
2678     }
2679 
2680     if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2681         mode = PipeMode::Message;
2682     }
2683 
2684     Ok(PipeInfo {
2685         end,
2686         mode,
2687         out_buffer_size,
2688         in_buffer_size,
2689         max_instances,
2690     })
2691 }
2692