xref: /aosp_15_r20/external/crosvm/devices/src/virtio/vsock/sys/windows/vsock.rs (revision bb4ee6a4ae7042d18b07a98463b9c8b875e44b39)
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::HashMap;
8 use std::fmt;
9 use std::fmt::Display;
10 use std::io;
11 use std::io::Read;
12 use std::io::Write;
13 use std::os::windows::io::RawHandle;
14 use std::rc::Rc;
15 use std::result;
16 use std::sync::Arc;
17 use std::thread;
18 
19 use anyhow::anyhow;
20 use anyhow::Context;
21 use base::error;
22 use base::info;
23 use base::named_pipes;
24 use base::named_pipes::BlockingMode;
25 use base::named_pipes::FramingMode;
26 use base::named_pipes::OverlappedWrapper;
27 use base::named_pipes::PipeConnection;
28 use base::warn;
29 use base::AsRawDescriptor;
30 use base::Error as SysError;
31 use base::Event;
32 use base::EventExt;
33 use base::WorkerThread;
34 use cros_async::select3;
35 use cros_async::select6;
36 use cros_async::sync::RwLock;
37 use cros_async::AsyncError;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::SelectResult;
41 use data_model::Le32;
42 use data_model::Le64;
43 use futures::channel::mpsc;
44 use futures::channel::oneshot;
45 use futures::pin_mut;
46 use futures::select;
47 use futures::select_biased;
48 use futures::stream::FuturesUnordered;
49 use futures::FutureExt;
50 use futures::SinkExt;
51 use futures::StreamExt;
52 use remain::sorted;
53 use serde::Deserialize;
54 use serde::Serialize;
55 use thiserror::Error as ThisError;
56 use vm_memory::GuestMemory;
57 use zerocopy::AsBytes;
58 use zerocopy::FromBytes;
59 use zerocopy::FromZeroes;
60 
61 use crate::virtio::async_utils;
62 use crate::virtio::copy_config;
63 use crate::virtio::create_stop_oneshot;
64 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
65 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
66 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
67 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
68 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
69 use crate::virtio::DescriptorChain;
70 use crate::virtio::DeviceType;
71 use crate::virtio::Interrupt;
72 use crate::virtio::Queue;
73 use crate::virtio::StoppedWorker;
74 use crate::virtio::VirtioDevice;
75 use crate::virtio::Writer;
76 use crate::Suspendable;
77 
78 #[sorted]
79 #[derive(ThisError, Debug)]
80 pub enum VsockError {
81     #[error("Failed to await next descriptor chain from queue: {0}")]
82     AwaitQueue(AsyncError),
83     #[error("OverlappedWrapper error.")]
84     BadOverlappedWrapper,
85     #[error("Failed to clone descriptor: {0}")]
86     CloneDescriptor(SysError),
87     #[error("Failed to create EventAsync: {0}")]
88     CreateEventAsync(AsyncError),
89     #[error("Failed to create wait context: {0}")]
90     CreateWaitContext(SysError),
91     #[error("Failed to read queue: {0}")]
92     ReadQueue(io::Error),
93     #[error("Failed to reset event object: {0}")]
94     ResetEventObject(SysError),
95     #[error("Failed to run executor: {0}")]
96     RunExecutor(AsyncError),
97     #[error("Failed to write to pipe, port {0}: {1}")]
98     WriteFailed(PortPair, io::Error),
99     #[error("Failed to write queue: {0}")]
100     WriteQueue(io::Error),
101 }
102 pub type Result<T> = result::Result<T, VsockError>;
103 
104 // Vsock has three virt IO queues: rx, tx, and event.
105 const QUEUE_SIZE: u16 = 256;
106 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
107 // We overload port numbers so that if message is to be received from
108 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
109 // new connection was set up.
110 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
111 
112 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
113 const KB: usize = 1024;
114 
115 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
116 /// host pipe -> this buffer -> rx queue.
117 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
118 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
119 
120 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
121 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
122 /// This should be larger than the volume of data that the guest will generally send at one time to
123 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
124 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
125 
126 /// Minimum free buffer threshold to notify the peer with a credit update
127 /// message. This is in case we are ingesting many messages without an
128 /// opportunity to send a message back to the peer with a buffer size update.
129 /// This value is a percentage of `buf_alloc`.
130 /// TODO(b/204246759): This value was chosen without much more thought than "it
131 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
132 /// value that makes empirical sense for the packet sizes that we expect to
133 /// receive.
134 /// TODO(b/239848326): Replace float with integer, in order to remove risk
135 /// of losing precision. Ie. change to `10` and perform
136 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
137 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
138 
139 // Number of packets to buffer in the tx processing channels.
140 const CHANNEL_SIZE: usize = 256;
141 
142 type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;
143 
144 /// Virtio device for exposing entropy to the guest OS through virtio.
145 pub struct Vsock {
146     guest_cid: u64,
147     host_guid: Option<String>,
148     features: u64,
149     acked_features: u64,
150     worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,
151     /// Stores any active connections when the device sleeps. This allows us to sleep/wake
152     /// without disrupting active connections, which is useful when taking a snapshot.
153     sleeping_connections: Option<VsockConnectionMap>,
154     /// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.
155     /// Used to inform the guest all connections are broken when we restore a snapshot.
156     needs_transport_reset: bool,
157 }
158 
159 /// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't
160 /// changed when this device is restored.
161 #[derive(Serialize, Deserialize)]
162 struct VsockSnapshot {
163     guest_cid: u64,
164     features: u64,
165     acked_features: u64,
166 }
167 
168 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>169     pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
170         Ok(Vsock {
171             guest_cid,
172             host_guid,
173             features: base_features,
174             acked_features: 0,
175             worker_thread: None,
176             sleeping_connections: None,
177             needs_transport_reset: false,
178         })
179     }
180 
get_config(&self) -> virtio_vsock_config181     fn get_config(&self) -> virtio_vsock_config {
182         virtio_vsock_config {
183             guest_cid: Le64::from(self.guest_cid),
184         }
185     }
186 
stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)>187     fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {
188         if let Some(worker_thread) = self.worker_thread.take() {
189             if let Some(queues_and_conns) = worker_thread.stop() {
190                 StoppedWorker::WithQueues(Box::new(queues_and_conns))
191             } else {
192                 StoppedWorker::MissingQueues
193             }
194         } else {
195             StoppedWorker::AlreadyStopped
196         }
197     }
198 
start_worker( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: VsockQueues, existing_connections: Option<VsockConnectionMap>, ) -> anyhow::Result<()>199     fn start_worker(
200         &mut self,
201         mem: GuestMemory,
202         interrupt: Interrupt,
203         mut queues: VsockQueues,
204         existing_connections: Option<VsockConnectionMap>,
205     ) -> anyhow::Result<()> {
206         let rx_queue = queues.rx;
207         let tx_queue = queues.tx;
208         let event_queue = queues.event;
209 
210         let host_guid = self.host_guid.clone();
211         let guest_cid = self.guest_cid;
212         let needs_transport_reset = self.needs_transport_reset;
213         self.needs_transport_reset = false;
214         self.worker_thread = Some(WorkerThread::start(
215             "userspace_virtio_vsock",
216             move |kill_evt| {
217                 let mut worker = Worker::new(
218                     mem,
219                     interrupt,
220                     host_guid,
221                     guest_cid,
222                     existing_connections,
223                     needs_transport_reset,
224                 );
225                 let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
226 
227                 match result {
228                     Err(e) => {
229                         error!("userspace vsock worker thread exited with error: {:?}", e);
230                         None
231                     }
232                     Ok(paused_queues_and_connections_option) => {
233                         paused_queues_and_connections_option
234                     }
235                 }
236             },
237         ));
238 
239         Ok(())
240     }
241 }
242 
243 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>244     fn keep_rds(&self) -> Vec<RawHandle> {
245         Vec::new()
246     }
247 
read_config(&self, offset: u64, data: &mut [u8])248     fn read_config(&self, offset: u64, data: &mut [u8]) {
249         copy_config(data, 0, self.get_config().as_bytes(), offset);
250     }
251 
device_type(&self) -> DeviceType252     fn device_type(&self) -> DeviceType {
253         DeviceType::Vsock
254     }
255 
queue_max_sizes(&self) -> &[u16]256     fn queue_max_sizes(&self) -> &[u16] {
257         QUEUE_SIZES
258     }
259 
features(&self) -> u64260     fn features(&self) -> u64 {
261         self.features
262     }
263 
ack_features(&mut self, value: u64)264     fn ack_features(&mut self, value: u64) {
265         self.acked_features |= value;
266     }
267 
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>268     fn activate(
269         &mut self,
270         mem: GuestMemory,
271         interrupt: Interrupt,
272         mut queues: BTreeMap<usize, Queue>,
273     ) -> anyhow::Result<()> {
274         if queues.len() != QUEUE_SIZES.len() {
275             return Err(anyhow!(
276                 "Failed to activate vsock device. queues.len(): {} != {}",
277                 queues.len(),
278                 QUEUE_SIZES.len(),
279             ));
280         }
281 
282         let vsock_queues = VsockQueues {
283             rx: queues.remove(&0).unwrap(),
284             tx: queues.remove(&1).unwrap(),
285             event: queues.remove(&2).unwrap(),
286         };
287 
288         self.start_worker(mem, interrupt, vsock_queues, None)
289     }
290 
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>291     fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
292         match self.stop_worker() {
293             StoppedWorker::WithQueues(paused_queues_and_conns) => {
294                 let (queues, sleeping_connections) = *paused_queues_and_conns;
295                 self.sleeping_connections = Some(sleeping_connections);
296                 Ok(Some(queues.into()))
297             }
298             StoppedWorker::MissingQueues => {
299                 anyhow::bail!("vsock queue workers did not stop cleanly")
300             }
301             StoppedWorker::AlreadyStopped => {
302                 // The device isn't in the activated state.
303                 Ok(None)
304             }
305         }
306     }
307 
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>308     fn virtio_wake(
309         &mut self,
310         queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
311     ) -> anyhow::Result<()> {
312         if let Some((mem, interrupt, queues)) = queues_state {
313             let connections = self.sleeping_connections.take();
314             self.start_worker(
315                 mem,
316                 interrupt,
317                 queues
318                     .try_into()
319                     .expect("Failed to convert queue BTreeMap to VsockQueues"),
320                 connections,
321             )?;
322         }
323         Ok(())
324     }
325 
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>326     fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
327         serde_json::to_value(VsockSnapshot {
328             guest_cid: self.guest_cid,
329             features: self.features,
330             acked_features: self.acked_features,
331         })
332         .context("failed to serialize vsock snapshot")
333     }
334 
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>335     fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
336         let vsock_snapshot: VsockSnapshot =
337             serde_json::from_value(data).context("error deserializing vsock snapshot")?;
338         anyhow::ensure!(
339             self.guest_cid == vsock_snapshot.guest_cid,
340             "expected guest_cid to match, but they did not. Live: {}, snapshot: {}",
341             self.guest_cid,
342             vsock_snapshot.guest_cid
343         );
344         anyhow::ensure!(
345             self.features == vsock_snapshot.features,
346             "vsock: expected features to match, but they did not. Live: {}, snapshot: {}",
347             self.features,
348             vsock_snapshot.features
349         );
350         self.acked_features = vsock_snapshot.acked_features;
351         self.needs_transport_reset = true;
352 
353         Ok(())
354     }
355 }
356 
357 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
358 pub struct PortPair {
359     host: u32,
360     guest: u32,
361 }
362 
363 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result364     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365         write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
366     }
367 }
368 
369 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair370     fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
371         PortPair {
372             host: header.dst_port.to_native(),
373             guest: header.src_port.to_native(),
374         }
375     }
376 }
377 
378 // Note: variables herein do not have to be atomic because this struct is guarded
379 // by a RwLock.
380 struct VsockConnection {
381     // The guest port.
382     guest_port: Le32,
383 
384     // The actual named (asynchronous) pipe connection.
385     pipe: PipeConnection,
386     // The overlapped struct contains an event object for the named pipe.
387     // This lets us select() on the pipes by waiting on the events.
388     // This is for Reads only.
389     overlapped: Box<OverlappedWrapper>,
390     // Read buffer for the named pipes. These are needed because reads complete
391     // asynchronously.
392     buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
393 
394     // Total free-running count of received bytes.
395     recv_cnt: usize,
396 
397     // Total free-running count of received bytes that the peer has been informed of.
398     prev_recv_cnt: usize,
399 
400     // Total auxiliary buffer space available to receive packets from the driver, not including
401     // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain
402     // packets for the connection. Note that if the named pipe has a grow on demand TX buffer,
403     // we use DEFAULT_BUF_ALLOC instead.
404     buf_alloc: usize,
405 
406     // Peer (driver) total free-running count of received bytes.
407     peer_recv_cnt: usize,
408 
409     // Peer (driver) total rx buffer allocated.
410     peer_buf_alloc: usize,
411 
412     // Total free-running count of transmitted bytes.
413     tx_cnt: usize,
414 
415     // State tracking for full buffer condition. Currently just used for logging. If the peer's
416     // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
417     // gets set to `true`. Once there's enough space in the buffer, this gets unset.
418     is_buffer_full: bool,
419 }
420 
421 struct Worker {
422     mem: GuestMemory,
423     interrupt: Interrupt,
424     host_guid: Option<String>,
425     guest_cid: u64,
426     // Map of host port to a VsockConnection.
427     connections: VsockConnectionMap,
428     connection_event: Event,
429     device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,
430     device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,
431     send_protocol_reset: bool,
432 }
433 
434 impl Worker {
new( mem: GuestMemory, interrupt: Interrupt, host_guid: Option<String>, guest_cid: u64, existing_connections: Option<VsockConnectionMap>, send_protocol_reset: bool, ) -> Worker435     fn new(
436         mem: GuestMemory,
437         interrupt: Interrupt,
438         host_guid: Option<String>,
439         guest_cid: u64,
440         existing_connections: Option<VsockConnectionMap>,
441         send_protocol_reset: bool,
442     ) -> Worker {
443         // Buffer size here is arbitrary, but must be at least one since we need
444         // to be able to write a reset event to the channel when the device
445         // worker is brought up on a VM restore. Note that we send exactly one
446         // message per VM session, so we should never see these messages backing
447         // up.
448         let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);
449 
450         Worker {
451             mem,
452             interrupt,
453             host_guid,
454             guest_cid,
455             connections: existing_connections.unwrap_or_default(),
456             connection_event: Event::new().unwrap(),
457             device_event_queue_tx,
458             device_event_queue_rx: Some(device_event_queue_rx),
459             send_protocol_reset,
460         }
461     }
462 
process_rx_queue( &self, recv_queue: Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, ) -> Result<()>463     async fn process_rx_queue(
464         &self,
465         recv_queue: Arc<RwLock<Queue>>,
466         mut rx_queue_evt: EventAsync,
467         ex: &Executor,
468         mut stop_rx: oneshot::Receiver<()>,
469     ) -> Result<()> {
470         'connections_changed: loop {
471             // Run continuously until exit evt
472 
473             // TODO(b/200810561): Optimize this FuturesUnordered code.
474             // Set up the EventAsyncs to select on
475             let futures = FuturesUnordered::new();
476             // This needs to be its own scope since it holds a RwLock on `self.connections`.
477             {
478                 let connections = self.connections.read_lock().await;
479                 for (port, connection) in connections.iter() {
480                     let h_evt = connection
481                         .overlapped
482                         .get_h_event_ref()
483                         .ok_or_else(|| {
484                             error!("Missing h_event in OverlappedWrapper.");
485                             VsockError::BadOverlappedWrapper
486                         })
487                         .unwrap()
488                         .try_clone()
489                         .map_err(|e| {
490                             error!("Could not clone h_event.");
491                             VsockError::CloneDescriptor(e)
492                         })?;
493                     let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
494                         error!("Could not create EventAsync.");
495                         VsockError::CreateEventAsync(e)
496                     })?;
497                     futures.push(wait_event_and_return_port_pair(evt_async, *port));
498                 }
499             }
500             let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
501                 error!("Could not clone connection_event.");
502                 VsockError::CloneDescriptor(e)
503             })?;
504             let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
505                 error!("Could not create EventAsync.");
506                 VsockError::CreateEventAsync(e)
507             })?;
508             futures.push(wait_event_and_return_port_pair(
509                 connection_evt_async,
510                 PortPair {
511                     host: CONNECTION_EVENT_PORT_NUM,
512                     guest: 0,
513                 },
514             ));
515 
516             // Wait to service the sockets. Note that for fairness, it is critical that we service
517             // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
518             // is used, as it returns all currently *ready* futures from the stream.
519             //
520             // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
521             // happens because it has at least one item, and we only request chunks once.
522             let futures_len = futures.len();
523             let mut ready_chunks = futures.ready_chunks(futures_len);
524             let ports = select_biased! {
525                 ports = ready_chunks.next() => {
526                     ports.expect("failed to wait on vsock sockets")
527                 }
528                 _ = stop_rx => {
529                     break;
530                 }
531             };
532 
533             for port in ports {
534                 if port.host == CONNECTION_EVENT_PORT_NUM {
535                     // New connection event. Setup futures again.
536                     if let Err(e) = self.connection_event.reset() {
537                         error!("vsock: port: {}: could not reset connection_event.", port);
538                         return Err(VsockError::ResetEventObject(e));
539                     }
540                     continue 'connections_changed;
541                 }
542                 let mut connections = self.connections.lock().await;
543                 let connection = if let Some(conn) = connections.get_mut(&port) {
544                     conn
545                 } else {
546                     // We could have been scheduled to run the rx queue *before* the connection was
547                     // closed. In that case, we do nothing. The code which closed the connection
548                     // (e.g. in response to a message in the tx/rx queues) will handle notifying
549                     // the guest/host as required.
550                     continue 'connections_changed;
551                 };
552 
553                 // Check if the peer has enough space in their buffer to
554                 // receive the maximum amount of data that we could possibly
555                 // read from the host pipe. If not, we should continue to
556                 // process other sockets as each socket has an independent
557                 // buffer.
558                 let peer_free_buf_size =
559                     connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
560                 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
561                     if !connection.is_buffer_full {
562                         warn!(
563                             "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
564                             port, peer_free_buf_size
565                         );
566                         connection.is_buffer_full = true;
567                     }
568                     continue;
569                 } else if connection.is_buffer_full {
570                     connection.is_buffer_full = false;
571                 }
572 
573                 let pipe_connection = &mut connection.pipe;
574                 let overlapped = &mut connection.overlapped;
575                 let guest_port = connection.guest_port;
576                 let buffer = &mut connection.buffer;
577 
578                 match overlapped.get_h_event_ref() {
579                     Some(h_event) => {
580                         if let Err(e) = h_event.reset() {
581                             error!(
582                                 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
583                                 port
584                             );
585                             return Err(VsockError::ResetEventObject(e));
586                         }
587                     }
588                     None => {
589                         error!(
590                             "vsock: port: {}: missing h_event in OverlappedWrapper.",
591                             port
592                         );
593                         return Err(VsockError::BadOverlappedWrapper);
594                     }
595                 }
596 
597                 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
598                     Ok(size) => size as usize,
599                     Err(e) => {
600                         error!("vsock: port {}: Failed to read from pipe {}", port, e);
601                         // TODO(b/237278629): Close the connection if we fail to read.
602                         continue 'connections_changed;
603                     }
604                 };
605 
606                 let response_header = virtio_vsock_hdr {
607                     src_cid: 2.into(),              // Host CID
608                     dst_cid: self.guest_cid.into(), // Guest CID
609                     src_port: Le32::from(port.host),
610                     dst_port: guest_port,
611                     len: Le32::from(data_size as u32),
612                     r#type: TYPE_STREAM_SOCKET.into(),
613                     op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
614                     buf_alloc: Le32::from(connection.buf_alloc as u32),
615                     fwd_cnt: Le32::from(connection.recv_cnt as u32),
616                     ..Default::default()
617                 };
618 
619                 connection.prev_recv_cnt = connection.recv_cnt;
620 
621                 // We have to only write to the queue once, so we construct a new buffer
622                 // with the concatenated header and data.
623                 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
624                 let data_read = &buffer[..data_size];
625                 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
626                 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
627                 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
628                 {
629                     let mut recv_queue_lock = recv_queue.lock().await;
630                     let write_fut = self
631                         .write_bytes_to_queue(
632                             &mut recv_queue_lock,
633                             &mut rx_queue_evt,
634                             &header_and_data[..],
635                         )
636                         .fuse();
637                     pin_mut!(write_fut);
638                     // If `stop_rx` is fired but the virt queue is full, this loop will break
639                     // without draining the `header_and_data`.
640                     select_biased! {
641                         write = write_fut => {},
642                         _ = stop_rx => {
643                             break;
644                         }
645                     }
646                 }
647 
648                 connection.tx_cnt += data_size;
649 
650                 // Start reading again so we receive the message and
651                 // event signal immediately.
652 
653                 // SAFETY:
654                 // Unsafe because the read could happen at any time
655                 // after this function is called. We ensure safety
656                 // by allocating the buffer and overlapped struct
657                 // on the heap.
658                 unsafe {
659                     match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
660                         Ok(()) => {}
661                         Err(e) => {
662                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
663                         }
664                     }
665                 }
666             }
667         }
668         Ok(())
669     }
670 
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, mut stop_rx: oneshot::Receiver<()>, ) -> Result<Queue>671     async fn process_tx_queue(
672         &self,
673         mut queue: Queue,
674         mut queue_evt: EventAsync,
675         mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
676         mut stop_rx: oneshot::Receiver<()>,
677     ) -> Result<Queue> {
678         loop {
679             // Run continuously until exit evt
680             let mut avail_desc = match queue
681                 .next_async_interruptable(&mut queue_evt, &mut stop_rx)
682                 .await
683             {
684                 Ok(Some(d)) => d,
685                 Ok(None) => break,
686                 Err(e) => {
687                     error!("vsock: Failed to read descriptor {}", e);
688                     return Err(VsockError::AwaitQueue(e));
689                 }
690             };
691 
692             let reader = &mut avail_desc.reader;
693             while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
694                 let header = match reader.read_obj::<virtio_vsock_hdr>() {
695                     Ok(hdr) => hdr,
696                     Err(e) => {
697                         error!("vsock: Error while reading header: {}", e);
698                         break;
699                     }
700                 };
701 
702                 let len = header.len.to_native() as usize;
703                 if reader.available_bytes() < len {
704                     error!("vsock: Error reading packet data");
705                     break;
706                 }
707 
708                 let mut data = vec![0_u8; len];
709                 if len > 0 {
710                     if let Err(e) = reader.read_exact(&mut data) {
711                         error!("vosck: failed to read data from tx packet: {:?}", e);
712                     }
713                 }
714 
715                 if let Err(e) = process_packets_queue.send((header, data)).await {
716                     error!(
717                         "Error while sending packet to queue, dropping packet: {:?}",
718                         e
719                     )
720                 };
721             }
722 
723             queue.add_used(avail_desc, 0);
724             queue.trigger_interrupt();
725         }
726 
727         Ok(queue)
728     }
729 
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize730     fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
731         match pipe.get_info() {
732             Ok(info) => {
733                 if info.outgoing_buffer_size > 0 {
734                     info.outgoing_buffer_size as usize
735                 } else {
736                     info!(
737                         "vsock: port {}: using default extra rx buffer size \
738                                             (named pipe does not have an explicit buffer size)",
739                         port
740                     );
741 
742                     // A zero buffer size implies that the buffer grows as
743                     // needed. We set our own cap here for flow control
744                     // purposes.
745                     DEFAULT_BUF_ALLOC_BYTES
746                 }
747             }
748             Err(e) => {
749                 error!(
750                     "vsock: port {}: failed to get named pipe info, using default \
751                                         buf size. Error: {}",
752                     port, e
753                 );
754                 DEFAULT_BUF_ALLOC_BYTES
755             }
756         }
757     }
758 
759     /// Processes a connection request and returns whether to return a response (true), or reset
760     /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool761     async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
762         let port = PortPair::from_tx_header(&header);
763         info!("vsock: port {}: Received connection request", port);
764 
765         if self.connections.read_lock().await.contains_key(&port) {
766             // Connection exists, nothing for us to do.
767             warn!(
768                 "vsock: port {}: accepting connection request on already connected port",
769                 port
770             );
771             return true;
772         }
773 
774         if self.host_guid.is_none() {
775             error!(
776                 "vsock: port {}: Cannot accept guest-initiated connections \
777                         without host-guid, rejecting connection",
778                 port,
779             );
780             return false;
781         }
782 
783         // We have a new connection to establish.
784         let mut overlapped_wrapper =
785             Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
786         let pipe_result = named_pipes::create_client_pipe(
787             get_pipe_name(
788                 self.host_guid.as_ref().unwrap(),
789                 header.dst_port.to_native(),
790             )
791             .as_str(),
792             &FramingMode::Byte,
793             &BlockingMode::Wait,
794             true, /* overlapped */
795         );
796 
797         match pipe_result {
798             Ok(mut pipe_connection) => {
799                 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
800                 info!("vsock: port {}: created client pipe", port);
801 
802                 // SAFETY:
803                 // Unsafe because the read could happen at any time
804                 // after this function is called. We ensure safety
805                 // by allocating the buffer and overlapped struct
806                 // on the heap.
807                 unsafe {
808                     match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
809                     {
810                         Ok(()) => {}
811                         Err(e) => {
812                             error!("vsock: port {}: Failed to read from pipe {}", port, e);
813                             return false;
814                         }
815                     }
816                 }
817                 info!("vsock: port {}: started read on client pipe", port);
818 
819                 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
820                 let connection = VsockConnection {
821                     guest_port: header.src_port,
822                     pipe: pipe_connection,
823                     overlapped: overlapped_wrapper,
824                     peer_buf_alloc: header.buf_alloc.to_native() as usize,
825                     peer_recv_cnt: header.fwd_cnt.to_native() as usize,
826                     buf_alloc,
827                     buffer,
828                     // The connection has just been made, so we haven't received
829                     // anything yet.
830                     recv_cnt: 0_usize,
831                     prev_recv_cnt: 0_usize,
832                     tx_cnt: 0_usize,
833                     is_buffer_full: false,
834                 };
835                 self.connections.lock().await.insert(port, connection);
836                 self.connection_event.signal().unwrap_or_else(|_| {
837                     panic!(
838                         "Failed to signal new connection event for vsock port {}.",
839                         port
840                     )
841                 });
842                 info!("vsock: port {}: signaled connection ready", port);
843                 true
844             }
845             Err(e) => {
846                 info!(
847                     "vsock: No waiting pipe connection on port {}, \
848                                 not connecting (err: {:?})",
849                     port, e
850                 );
851                 false
852             }
853         }
854     }
855 
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>856     async fn handle_vsock_guest_data(
857         &self,
858         header: virtio_vsock_hdr,
859         data: &[u8],
860         ex: &Executor,
861     ) -> Result<()> {
862         let port = PortPair::from_tx_header(&header);
863         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
864         {
865             let mut connections = self.connections.lock().await;
866             if let Some(connection) = connections.get_mut(&port) {
867                 // Update peer buffer/recv counters
868                 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
869                 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
870 
871                 let pipe = &mut connection.pipe;
872                 // We have to provide a OVERLAPPED struct to write to the pipe.
873                 //
874                 // SAFETY: safe because data & overlapped_wrapper live until the
875                 // overlapped operation completes (we wait on completion below).
876                 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
877                     return Err(VsockError::WriteFailed(port, e));
878                 }
879             } else {
880                 error!(
881                     "vsock: Guest attempted to send data packet over unconnected \
882                             port ({}), dropping packet",
883                     port
884                 );
885                 return Ok(());
886             }
887         }
888         if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
889             // Don't block the executor while the write completes. This time should
890             // always be negligible, but will sometimes be non-zero in cases where
891             // traffic is high on the NamedPipe, especially a duplex pipe.
892             if let Ok(cloned_event) = write_completed_event.try_clone() {
893                 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
894                     let _ = async_event.next_val().await;
895                 } else {
896                     error!(
897                         "vsock: port {}: Failed to convert write event to async",
898                         port
899                     );
900                 }
901             } else {
902                 error!(
903                     "vsock: port {}: Failed to clone write completion event",
904                     port
905                 );
906             }
907         } else {
908             error!(
909                 "vsock: port: {}: Failed to get overlapped event for write",
910                 port
911             );
912         }
913 
914         let mut connections = self.connections.lock().await;
915         if let Some(connection) = connections.get_mut(&port) {
916             let pipe = &mut connection.pipe;
917             match pipe.get_overlapped_result(&mut overlapped_wrapper) {
918                 Ok(len) => {
919                     // We've received bytes from the guest, account for them in our
920                     // received bytes counter.
921                     connection.recv_cnt += len as usize;
922 
923                     if len != data.len() as u32 {
924                         return Err(VsockError::WriteFailed(
925                             port,
926                             std::io::Error::new(
927                                 std::io::ErrorKind::Other,
928                                 format!(
929                                     "port {} failed to write correct number of bytes:
930                                         (expected: {}, wrote: {})",
931                                     port,
932                                     data.len(),
933                                     len
934                                 ),
935                             ),
936                         ));
937                     }
938                 }
939                 Err(e) => {
940                     return Err(VsockError::WriteFailed(port, e));
941                 }
942             }
943         } else {
944             error!(
945                 "vsock: Guest attempted to send data packet over unconnected \
946                         port ({}), dropping packet",
947                 port
948             );
949         }
950         Ok(())
951     }
952 
process_tx_packets( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, )953     async fn process_tx_packets(
954         &self,
955         send_queue: &Arc<RwLock<Queue>>,
956         rx_queue_evt: Event,
957         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
958         ex: &Executor,
959         mut stop_rx: oneshot::Receiver<()>,
960     ) {
961         let mut packet_queues = HashMap::new();
962         let mut futures = FuturesUnordered::new();
963         // Push a pending future that will never complete into FuturesUnordered.
964         // This will keep us from spinning on spurious notifications when we
965         // don't have any open connections.
966         futures.push(std::future::pending::<PortPair>().left_future());
967 
968         let mut stop_future = FuturesUnordered::new();
969         stop_future.push(stop_rx);
970         loop {
971             let (new_packet, connection, stop_rx_res) =
972                 select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;
973             match connection {
974                 SelectResult::Finished(Some(port)) => {
975                     packet_queues.remove(&port);
976                 }
977                 SelectResult::Finished(_) => {
978                     // This is only triggered when FuturesUnordered completes
979                     // all pending futures. Right now, this can never happen, as
980                     // we have a pending future that we push that will never
981                     // complete.
982                 }
983                 SelectResult::Pending(_) => {
984                     // Nothing to do.
985                 }
986             };
987             match new_packet {
988                 SelectResult::Finished(Some(packet)) => {
989                     let port = PortPair::from_tx_header(&packet.0);
990                     let queue = packet_queues.entry(port).or_insert_with(|| {
991                         let (send, recv) = mpsc::channel(CHANNEL_SIZE);
992                         let event_async = EventAsync::new(
993                             rx_queue_evt.try_clone().expect("Failed to clone event"),
994                             ex,
995                         )
996                         .expect("Failed to set up the rx queue event");
997                         futures.push(
998                             self.process_tx_packets_for_port(
999                                 port,
1000                                 recv,
1001                                 send_queue,
1002                                 event_async,
1003                                 ex,
1004                             )
1005                             .right_future(),
1006                         );
1007                         send
1008                     });
1009                     // Try to send the packet. Do not block other ports if the queue is full.
1010                     if let Err(e) = queue.try_send(packet) {
1011                         error!(
1012                             "vsock: port {}: error sending packet to queue, dropping packet: {:?}",
1013                             port, e
1014                         )
1015                     }
1016                 }
1017                 SelectResult::Finished(_) => {
1018                     // Triggers when the channel is closed; no more packets coming.
1019                     packet_recv_queue.close();
1020                     return;
1021                 }
1022                 SelectResult::Pending(_) => {
1023                     // Nothing to do.
1024                 }
1025             }
1026             match stop_rx_res {
1027                 SelectResult::Finished(_) => {
1028                     break;
1029                 }
1030                 SelectResult::Pending(_) => {
1031                     // Nothing to do.
1032                 }
1033             }
1034         }
1035     }
1036 
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair1037     async fn process_tx_packets_for_port(
1038         &self,
1039         port: PortPair,
1040         mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
1041         send_queue: &Arc<RwLock<Queue>>,
1042         mut rx_queue_evt: EventAsync,
1043         ex: &Executor,
1044     ) -> PortPair {
1045         while let Some((header, data)) = packet_recv_queue.next().await {
1046             if !self
1047                 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
1048                 .await
1049             {
1050                 packet_recv_queue.close();
1051                 if let Ok(Some(_)) = packet_recv_queue.try_next() {
1052                     warn!("vsock: closing port {} with unprocessed packets", port);
1053                 } else {
1054                     info!("vsock: closing port {} cleanly", port)
1055                 }
1056                 break;
1057             }
1058         }
1059         port
1060     }
1061 
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool1062     async fn handle_tx_packet(
1063         &self,
1064         header: virtio_vsock_hdr,
1065         data: &[u8],
1066         send_queue: &Arc<RwLock<Queue>>,
1067         rx_queue_evt: &mut EventAsync,
1068         ex: &Executor,
1069     ) -> bool {
1070         let mut is_open = true;
1071         let port = PortPair::from_tx_header(&header);
1072         match header.op.to_native() {
1073             vsock_op::VIRTIO_VSOCK_OP_INVALID => {
1074                 error!("vsock: Invalid Operation requested, dropping packet");
1075             }
1076             vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
1077                 let (resp_op, buf_alloc, fwd_cnt) =
1078                     if self.handle_vsock_connection_request(header).await {
1079                         let connections = self.connections.read_lock().await;
1080 
1081                         connections.get(&port).map_or_else(
1082                             || {
1083                                 warn!("vsock: port: {} connection closed during connect", port);
1084                                 is_open = false;
1085                                 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1086                             },
1087                             |conn| {
1088                                 (
1089                                     vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
1090                                     conn.buf_alloc as u32,
1091                                     conn.recv_cnt as u32,
1092                                 )
1093                             },
1094                         )
1095                     } else {
1096                         is_open = false;
1097                         (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1098                     };
1099 
1100                 let response_header = virtio_vsock_hdr {
1101                     src_cid: { header.dst_cid },
1102                     dst_cid: { header.src_cid },
1103                     src_port: { header.dst_port },
1104                     dst_port: { header.src_port },
1105                     len: 0.into(),
1106                     r#type: TYPE_STREAM_SOCKET.into(),
1107                     op: resp_op.into(),
1108                     buf_alloc: Le32::from(buf_alloc),
1109                     fwd_cnt: Le32::from(fwd_cnt),
1110                     ..Default::default()
1111                 };
1112                 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1113                 // bytes.
1114                 self.write_bytes_to_queue(
1115                     &mut *send_queue.lock().await,
1116                     rx_queue_evt,
1117                     response_header.as_bytes(),
1118                 )
1119                 .await
1120                 .expect("vsock: failed to write to queue");
1121                 info!(
1122                     "vsock: port {}: replied {} to connection request",
1123                     port,
1124                     if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {
1125                         "success"
1126                     } else {
1127                         "reset"
1128                     },
1129                 );
1130             }
1131             vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
1132                 // TODO(b/237811512): Implement this for host-initiated connections
1133             }
1134             vsock_op::VIRTIO_VSOCK_OP_RST => {
1135                 // TODO(b/237811512): Implement this for host-initiated connections
1136             }
1137             vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
1138                 // While the header provides flags to specify tx/rx-specific shutdown,
1139                 // we only support full shutdown.
1140                 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
1141                 // while still maintaining easy reconnections.
1142                 let mut connections = self.connections.lock().await;
1143                 if connections.remove(&port).is_some() {
1144                     let mut response = virtio_vsock_hdr {
1145                         src_cid: { header.dst_cid },
1146                         dst_cid: { header.src_cid },
1147                         src_port: { header.dst_port },
1148                         dst_port: { header.src_port },
1149                         len: 0.into(),
1150                         r#type: TYPE_STREAM_SOCKET.into(),
1151                         op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1152                         // There is no buffer on a closed connection
1153                         buf_alloc: 0.into(),
1154                         // There is no fwd_cnt anymore on a closed connection
1155                         fwd_cnt: 0.into(),
1156                         ..Default::default()
1157                     };
1158                     // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1159                     // bytes
1160                     self.write_bytes_to_queue(
1161                         &mut *send_queue.lock().await,
1162                         rx_queue_evt,
1163                         response.as_bytes_mut(),
1164                     )
1165                     .await
1166                     .expect("vsock: failed to write to queue");
1167                     self.connection_event
1168                         .signal()
1169                         .expect("vsock: failed to write to event");
1170                     info!("vsock: port: {}: disconnected by the guest", port);
1171                 } else {
1172                     error!("vsock: Attempted to close unopened port: {}", port);
1173                 }
1174                 is_open = false;
1175             }
1176             vsock_op::VIRTIO_VSOCK_OP_RW => {
1177                 match self.handle_vsock_guest_data(header, data, ex).await {
1178                     Ok(()) => {
1179                         if self
1180                             .check_free_buffer_threshold(header)
1181                             .await
1182                             .unwrap_or(false)
1183                         {
1184                             // Send a credit update if we're below the minimum free
1185                             // buffer size. We skip this if the connection is closed,
1186                             // which could've happened if we were closed on the other
1187                             // end.
1188                             info!(
1189                                 "vsock: port {}: Buffer below threshold; sending credit update.",
1190                                 port
1191                             );
1192                             self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1193                                 .await;
1194                         }
1195                     }
1196                     Err(e) => {
1197                         error!("vsock: port {}: resetting connection: {}", port, e);
1198                         self.send_vsock_reset(send_queue, rx_queue_evt, header)
1199                             .await;
1200                         is_open = false;
1201                     }
1202                 }
1203             }
1204             // An update from our peer with their buffer state, which they are sending
1205             // (probably) due to a a credit request *we* made.
1206             vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
1207                 let port = PortPair::from_tx_header(&header);
1208                 let mut connections = self.connections.lock().await;
1209                 if let Some(connection) = connections.get_mut(&port) {
1210                     connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1211                     connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1212                 } else {
1213                     error!("vsock: port {}: got credit update on unknown port", port);
1214                     is_open = false;
1215                 }
1216             }
1217             // A request from our peer to get *our* buffer state. We reply to the RX queue.
1218             vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1219                 info!(
1220                     "vsock: port {}: Got credit request from peer; sending credit update.",
1221                     port,
1222                 );
1223                 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1224                     .await;
1225             }
1226             _ => {
1227                 error!(
1228                     "vsock: port {}: unknown operation requested, dropping packet",
1229                     port
1230                 );
1231             }
1232         }
1233         is_open
1234     }
1235 
1236     // Checks if how much free buffer our peer thinks that *we* have available
1237     // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1238     async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1239         let mut connections = self.connections.lock().await;
1240         let port = PortPair::from_tx_header(&header);
1241         connections.get_mut(&port).map(|connection| {
1242             let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1243             connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1244         })
1245     }
1246 
send_vsock_credit_update( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1247     async fn send_vsock_credit_update(
1248         &self,
1249         send_queue: &Arc<RwLock<Queue>>,
1250         rx_queue_evt: &mut EventAsync,
1251         header: virtio_vsock_hdr,
1252     ) {
1253         let mut connections = self.connections.lock().await;
1254         let port = PortPair::from_tx_header(&header);
1255 
1256         if let Some(connection) = connections.get_mut(&port) {
1257             let mut response = virtio_vsock_hdr {
1258                 src_cid: { header.dst_cid },
1259                 dst_cid: { header.src_cid },
1260                 src_port: { header.dst_port },
1261                 dst_port: { header.src_port },
1262                 len: 0.into(),
1263                 r#type: TYPE_STREAM_SOCKET.into(),
1264                 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1265                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1266                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1267                 ..Default::default()
1268             };
1269 
1270             connection.prev_recv_cnt = connection.recv_cnt;
1271 
1272             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1273             // to bytes
1274             self.write_bytes_to_queue(
1275                 &mut *send_queue.lock().await,
1276                 rx_queue_evt,
1277                 response.as_bytes_mut(),
1278             )
1279             .await
1280             .unwrap_or_else(|_| panic!("vsock: port {}: failed to write to queue", port));
1281         } else {
1282             error!(
1283                 "vsock: port {}: error sending credit update on unknown port",
1284                 port
1285             );
1286         }
1287     }
1288 
send_vsock_reset( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1289     async fn send_vsock_reset(
1290         &self,
1291         send_queue: &Arc<RwLock<Queue>>,
1292         rx_queue_evt: &mut EventAsync,
1293         header: virtio_vsock_hdr,
1294     ) {
1295         let mut connections = self.connections.lock().await;
1296         let port = PortPair::from_tx_header(&header);
1297         if let Some(connection) = connections.remove(&port) {
1298             let mut response = virtio_vsock_hdr {
1299                 src_cid: { header.dst_cid },
1300                 dst_cid: { header.src_cid },
1301                 src_port: { header.dst_port },
1302                 dst_port: { header.src_port },
1303                 len: 0.into(),
1304                 r#type: TYPE_STREAM_SOCKET.into(),
1305                 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1306                 buf_alloc: Le32::from(connection.buf_alloc as u32),
1307                 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1308                 ..Default::default()
1309             };
1310 
1311             // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1312             // to bytes
1313             self.write_bytes_to_queue(
1314                 &mut *send_queue.lock().await,
1315                 rx_queue_evt,
1316                 response.as_bytes_mut(),
1317             )
1318             .await
1319             .expect("failed to write to queue");
1320         } else {
1321             error!("vsock: port {}: error closing unknown port", port);
1322         }
1323     }
1324 
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1325     async fn write_bytes_to_queue(
1326         &self,
1327         queue: &mut Queue,
1328         queue_evt: &mut EventAsync,
1329         bytes: &[u8],
1330     ) -> Result<()> {
1331         let mut avail_desc = match queue.next_async(queue_evt).await {
1332             Ok(d) => d,
1333             Err(e) => {
1334                 error!("vsock: failed to read descriptor {}", e);
1335                 return Err(VsockError::AwaitQueue(e));
1336             }
1337         };
1338         self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1339     }
1340 
write_bytes_to_queue_interruptable( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], mut stop_rx: &mut oneshot::Receiver<()>, ) -> Result<()>1341     async fn write_bytes_to_queue_interruptable(
1342         &self,
1343         queue: &mut Queue,
1344         queue_evt: &mut EventAsync,
1345         bytes: &[u8],
1346         mut stop_rx: &mut oneshot::Receiver<()>,
1347     ) -> Result<()> {
1348         let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {
1349             Ok(d) => match d {
1350                 Some(desc) => desc,
1351                 None => return Ok(()),
1352             },
1353             Err(e) => {
1354                 error!("vsock: failed to read descriptor {}", e);
1355                 return Err(VsockError::AwaitQueue(e));
1356             }
1357         };
1358         self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1359     }
1360 
write_bytes_to_queue_inner( &self, queue: &mut Queue, mut desc_chain: DescriptorChain, bytes: &[u8], ) -> Result<()>1361     fn write_bytes_to_queue_inner(
1362         &self,
1363         queue: &mut Queue,
1364         mut desc_chain: DescriptorChain,
1365         bytes: &[u8],
1366     ) -> Result<()> {
1367         let writer = &mut desc_chain.writer;
1368         let res = writer.write_all(bytes);
1369 
1370         if let Err(e) = res {
1371             error!(
1372                 "vsock: failed to write {} bytes to queue, err: {:?}",
1373                 bytes.len(),
1374                 e
1375             );
1376             return Err(VsockError::WriteQueue(e));
1377         }
1378 
1379         let bytes_written = writer.bytes_written() as u32;
1380         if bytes_written > 0 {
1381             queue.add_used(desc_chain, bytes_written);
1382             queue.trigger_interrupt();
1383             Ok(())
1384         } else {
1385             error!("vsock: failed to write bytes to queue");
1386             Err(VsockError::WriteQueue(std::io::Error::new(
1387                 std::io::ErrorKind::Other,
1388                 "failed to write bytes to queue",
1389             )))
1390         }
1391     }
1392 
process_event_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>, ) -> Result<Queue>1393     async fn process_event_queue(
1394         &self,
1395         mut queue: Queue,
1396         mut queue_evt: EventAsync,
1397         mut stop_rx: oneshot::Receiver<()>,
1398         mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,
1399     ) -> Result<Queue> {
1400         loop {
1401             let vsock_event = select_biased! {
1402                 vsock_event = vsock_event_receiver.next() => {
1403                     vsock_event
1404                 }
1405                 _ = stop_rx => {
1406                     break;
1407                 }
1408             };
1409             let vsock_event = match vsock_event {
1410                 Some(event) => event,
1411                 None => break,
1412             };
1413             self.write_bytes_to_queue_interruptable(
1414                 &mut queue,
1415                 &mut queue_evt,
1416                 vsock_event.as_bytes(),
1417                 &mut stop_rx,
1418             )
1419             .await?;
1420         }
1421         Ok(queue)
1422     }
1423 
run( mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, kill_evt: Event, ) -> Result<Option<(PausedQueues, VsockConnectionMap)>>1424     fn run(
1425         mut self,
1426         rx_queue: Queue,
1427         tx_queue: Queue,
1428         event_queue: Queue,
1429         kill_evt: Event,
1430     ) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {
1431         let rx_queue_evt = rx_queue
1432             .event()
1433             .try_clone()
1434             .map_err(VsockError::CloneDescriptor)?;
1435 
1436         // Note that this mutex won't ever be contended because the HandleExecutor is single
1437         // threaded. We need the mutex for compile time correctness, but technically it is not
1438         // actually providing mandatory locking, at least not at the moment. If we later use a
1439         // multi-threaded executor, then this lock will be important.
1440         let rx_queue_arc = Arc::new(RwLock::new(rx_queue));
1441 
1442         // Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.
1443         let res = {
1444             let ex = Executor::new().unwrap();
1445 
1446             let rx_evt_async = EventAsync::new(
1447                 rx_queue_evt
1448                     .try_clone()
1449                     .map_err(VsockError::CloneDescriptor)?,
1450                 &ex,
1451             )
1452             .expect("Failed to set up the rx queue event");
1453             let mut stop_queue_oneshots = Vec::new();
1454 
1455             let vsock_event_receiver = self
1456                 .device_event_queue_rx
1457                 .take()
1458                 .expect("event queue rx must be present");
1459 
1460             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1461             let rx_handler =
1462                 self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);
1463             let rx_handler = rx_handler.fuse();
1464             pin_mut!(rx_handler);
1465 
1466             let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1467 
1468             let tx_evt_async = EventAsync::new(
1469                 tx_queue
1470                     .event()
1471                     .try_clone()
1472                     .map_err(VsockError::CloneDescriptor)?,
1473                 &ex,
1474             )
1475             .expect("Failed to set up the tx queue event");
1476             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1477             let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
1478             let tx_handler = tx_handler.fuse();
1479             pin_mut!(tx_handler);
1480 
1481             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1482             let packet_handler =
1483                 self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);
1484             let packet_handler = packet_handler.fuse();
1485             pin_mut!(packet_handler);
1486 
1487             let event_evt_async = EventAsync::new(
1488                 event_queue
1489                     .event()
1490                     .try_clone()
1491                     .map_err(VsockError::CloneDescriptor)?,
1492                 &ex,
1493             )
1494             .expect("Failed to set up the event queue event");
1495             let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1496             let event_handler = self.process_event_queue(
1497                 event_queue,
1498                 event_evt_async,
1499                 stop_rx,
1500                 vsock_event_receiver,
1501             );
1502             let event_handler = event_handler.fuse();
1503             pin_mut!(event_handler);
1504 
1505             // Process any requests to resample the irq value.
1506             let resample_handler = async_utils::handle_irq_resample(&ex, self.interrupt.clone());
1507             let resample_handler = resample_handler.fuse();
1508             pin_mut!(resample_handler);
1509 
1510             let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1511             let kill_handler = kill_evt.next_val();
1512             pin_mut!(kill_handler);
1513 
1514             let mut device_event_queue_tx = self.device_event_queue_tx.clone();
1515             if self.send_protocol_reset {
1516                 ex.run_until(async move { device_event_queue_tx.send(
1517                    virtio_vsock_event {
1518                        id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET
1519                            .into(),
1520                    }).await
1521                 }).expect("failed to write to empty mpsc queue.");
1522             }
1523 
1524             ex.run_until(async {
1525                 select! {
1526                     _ = kill_handler.fuse() => (),
1527                     _ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),
1528                     _ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),
1529                     _ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),
1530                     _ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),
1531                     _ = resample_handler => return Err(anyhow!("resample_handler stop unexpectedly.")),
1532                 }
1533                 // kill_evt has fired
1534 
1535                 for stop_tx in stop_queue_oneshots {
1536                     if stop_tx.send(()).is_err() {
1537                         return Err(anyhow!("failed to request stop for queue future"));
1538                     }
1539                 }
1540 
1541                 rx_handler.await.context("Failed to stop rx handler.")?;
1542                 packet_handler.await;
1543 
1544                 Ok((
1545                     tx_handler.await.context("Failed to stop tx handler.")?,
1546                     event_handler
1547                         .await
1548                         .context("Failed to stop event handler.")?,
1549                 ))
1550             })
1551         };
1552 
1553         // At this point, a request to stop this worker has been sent or an error has happened in
1554         // one of the futures, which will stop this worker anyways.
1555 
1556         let queues_and_connections = match res {
1557             Ok(main_future_res) => match main_future_res {
1558                 Ok((tx_queue, event_handler_queue)) => {
1559                     let rx_queue = match Arc::try_unwrap(rx_queue_arc) {
1560                         Ok(queue_rw_lock) => queue_rw_lock.into_inner(),
1561                         Err(_) => panic!("failed to recover queue from worker"),
1562                     };
1563                     let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);
1564                     Some((paused_queues, self.connections))
1565                 }
1566                 Err(e) => {
1567                     error!("Error happened in a vsock future: {}", e);
1568                     None
1569                 }
1570             },
1571             Err(e) => {
1572                 error!("error happened in executor: {}", e);
1573                 None
1574             }
1575         };
1576 
1577         Ok(queues_and_connections)
1578     }
1579 }
1580 
1581 /// Queues & events for the vsock device.
1582 struct VsockQueues {
1583     rx: Queue,
1584     tx: Queue,
1585     event: Queue,
1586 }
1587 
1588 impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
1589     type Error = anyhow::Error;
try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error>1590     fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
1591         if queues.len() < 3 {
1592             anyhow::bail!(
1593                 "{} queues were found, but an activated vsock must have at 3 active queues.",
1594                 queues.len()
1595             );
1596         }
1597 
1598         Ok(VsockQueues {
1599             rx: queues.remove(&0).context("the rx queue is required.")?,
1600             tx: queues.remove(&1).context("the tx queue is required.")?,
1601             event: queues.remove(&2).context("the event queue is required.")?,
1602         })
1603     }
1604 }
1605 
1606 impl From<PausedQueues> for BTreeMap<usize, Queue> {
from(queues: PausedQueues) -> Self1607     fn from(queues: PausedQueues) -> Self {
1608         let mut ret = BTreeMap::new();
1609         ret.insert(0, queues.rx_queue);
1610         ret.insert(1, queues.tx_queue);
1611         ret.insert(2, queues.event_queue);
1612         ret
1613     }
1614 }
1615 
1616 struct PausedQueues {
1617     rx_queue: Queue,
1618     tx_queue: Queue,
1619     event_queue: Queue,
1620 }
1621 
1622 impl PausedQueues {
new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self1623     fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {
1624         PausedQueues {
1625             rx_queue,
1626             tx_queue,
1627             event_queue,
1628         }
1629     }
1630 }
1631 
get_pipe_name(guid: &str, pipe: u32) -> String1632 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1633     format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1634 }
1635 
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1636 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1637     // This doesn't reset the event since we have to call GetOverlappedResult
1638     // on the OVERLAPPED struct first before resetting it.
1639     let _ = evt.get_io_source_ref().wait_for_handle().await;
1640     pair
1641 }
1642