1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::cell::RefCell;
6 use std::collections::BTreeMap;
7 use std::collections::HashMap;
8 use std::fmt;
9 use std::fmt::Display;
10 use std::io;
11 use std::io::Read;
12 use std::io::Write;
13 use std::os::windows::io::RawHandle;
14 use std::rc::Rc;
15 use std::result;
16 use std::sync::Arc;
17 use std::thread;
18
19 use anyhow::anyhow;
20 use anyhow::Context;
21 use base::error;
22 use base::info;
23 use base::named_pipes;
24 use base::named_pipes::BlockingMode;
25 use base::named_pipes::FramingMode;
26 use base::named_pipes::OverlappedWrapper;
27 use base::named_pipes::PipeConnection;
28 use base::warn;
29 use base::AsRawDescriptor;
30 use base::Error as SysError;
31 use base::Event;
32 use base::EventExt;
33 use base::WorkerThread;
34 use cros_async::select3;
35 use cros_async::select6;
36 use cros_async::sync::RwLock;
37 use cros_async::AsyncError;
38 use cros_async::EventAsync;
39 use cros_async::Executor;
40 use cros_async::SelectResult;
41 use data_model::Le32;
42 use data_model::Le64;
43 use futures::channel::mpsc;
44 use futures::channel::oneshot;
45 use futures::pin_mut;
46 use futures::select;
47 use futures::select_biased;
48 use futures::stream::FuturesUnordered;
49 use futures::FutureExt;
50 use futures::SinkExt;
51 use futures::StreamExt;
52 use remain::sorted;
53 use serde::Deserialize;
54 use serde::Serialize;
55 use thiserror::Error as ThisError;
56 use vm_memory::GuestMemory;
57 use zerocopy::AsBytes;
58 use zerocopy::FromBytes;
59 use zerocopy::FromZeroes;
60
61 use crate::virtio::async_utils;
62 use crate::virtio::copy_config;
63 use crate::virtio::create_stop_oneshot;
64 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
65 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
66 use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
67 use crate::virtio::vsock::sys::windows::protocol::vsock_op;
68 use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
69 use crate::virtio::DescriptorChain;
70 use crate::virtio::DeviceType;
71 use crate::virtio::Interrupt;
72 use crate::virtio::Queue;
73 use crate::virtio::StoppedWorker;
74 use crate::virtio::VirtioDevice;
75 use crate::virtio::Writer;
76 use crate::Suspendable;
77
78 #[sorted]
79 #[derive(ThisError, Debug)]
80 pub enum VsockError {
81 #[error("Failed to await next descriptor chain from queue: {0}")]
82 AwaitQueue(AsyncError),
83 #[error("OverlappedWrapper error.")]
84 BadOverlappedWrapper,
85 #[error("Failed to clone descriptor: {0}")]
86 CloneDescriptor(SysError),
87 #[error("Failed to create EventAsync: {0}")]
88 CreateEventAsync(AsyncError),
89 #[error("Failed to create wait context: {0}")]
90 CreateWaitContext(SysError),
91 #[error("Failed to read queue: {0}")]
92 ReadQueue(io::Error),
93 #[error("Failed to reset event object: {0}")]
94 ResetEventObject(SysError),
95 #[error("Failed to run executor: {0}")]
96 RunExecutor(AsyncError),
97 #[error("Failed to write to pipe, port {0}: {1}")]
98 WriteFailed(PortPair, io::Error),
99 #[error("Failed to write queue: {0}")]
100 WriteQueue(io::Error),
101 }
102 pub type Result<T> = result::Result<T, VsockError>;
103
104 // Vsock has three virt IO queues: rx, tx, and event.
105 const QUEUE_SIZE: u16 = 256;
106 const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
107 // We overload port numbers so that if message is to be received from
108 // CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
109 // new connection was set up.
110 const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
111
112 /// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
113 const KB: usize = 1024;
114
115 /// Size of the buffer we read into from the host side named pipe. Note that data flows from the
116 /// host pipe -> this buffer -> rx queue.
117 /// This should be large enough to facilitate fast transmission of host data, see b/232950349.
118 const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
119
120 /// In the case where the host side named pipe does not have a specified buffer size, we'll default
121 /// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
122 /// This should be larger than the volume of data that the guest will generally send at one time to
123 /// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
124 const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
125
126 /// Minimum free buffer threshold to notify the peer with a credit update
127 /// message. This is in case we are ingesting many messages without an
128 /// opportunity to send a message back to the peer with a buffer size update.
129 /// This value is a percentage of `buf_alloc`.
130 /// TODO(b/204246759): This value was chosen without much more thought than "it
131 /// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
132 /// value that makes empirical sense for the packet sizes that we expect to
133 /// receive.
134 /// TODO(b/239848326): Replace float with integer, in order to remove risk
135 /// of losing precision. Ie. change to `10` and perform
136 /// `FOO * MIN_FREE_BUFFER_PCT / 100`
137 const MIN_FREE_BUFFER_PCT: f64 = 0.1;
138
139 // Number of packets to buffer in the tx processing channels.
140 const CHANNEL_SIZE: usize = 256;
141
142 type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;
143
144 /// Virtio device for exposing entropy to the guest OS through virtio.
145 pub struct Vsock {
146 guest_cid: u64,
147 host_guid: Option<String>,
148 features: u64,
149 acked_features: u64,
150 worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,
151 /// Stores any active connections when the device sleeps. This allows us to sleep/wake
152 /// without disrupting active connections, which is useful when taking a snapshot.
153 sleeping_connections: Option<VsockConnectionMap>,
154 /// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.
155 /// Used to inform the guest all connections are broken when we restore a snapshot.
156 needs_transport_reset: bool,
157 }
158
159 /// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't
160 /// changed when this device is restored.
161 #[derive(Serialize, Deserialize)]
162 struct VsockSnapshot {
163 guest_cid: u64,
164 features: u64,
165 acked_features: u64,
166 }
167
168 impl Vsock {
new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock>169 pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
170 Ok(Vsock {
171 guest_cid,
172 host_guid,
173 features: base_features,
174 acked_features: 0,
175 worker_thread: None,
176 sleeping_connections: None,
177 needs_transport_reset: false,
178 })
179 }
180
get_config(&self) -> virtio_vsock_config181 fn get_config(&self) -> virtio_vsock_config {
182 virtio_vsock_config {
183 guest_cid: Le64::from(self.guest_cid),
184 }
185 }
186
stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)>187 fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {
188 if let Some(worker_thread) = self.worker_thread.take() {
189 if let Some(queues_and_conns) = worker_thread.stop() {
190 StoppedWorker::WithQueues(Box::new(queues_and_conns))
191 } else {
192 StoppedWorker::MissingQueues
193 }
194 } else {
195 StoppedWorker::AlreadyStopped
196 }
197 }
198
start_worker( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: VsockQueues, existing_connections: Option<VsockConnectionMap>, ) -> anyhow::Result<()>199 fn start_worker(
200 &mut self,
201 mem: GuestMemory,
202 interrupt: Interrupt,
203 mut queues: VsockQueues,
204 existing_connections: Option<VsockConnectionMap>,
205 ) -> anyhow::Result<()> {
206 let rx_queue = queues.rx;
207 let tx_queue = queues.tx;
208 let event_queue = queues.event;
209
210 let host_guid = self.host_guid.clone();
211 let guest_cid = self.guest_cid;
212 let needs_transport_reset = self.needs_transport_reset;
213 self.needs_transport_reset = false;
214 self.worker_thread = Some(WorkerThread::start(
215 "userspace_virtio_vsock",
216 move |kill_evt| {
217 let mut worker = Worker::new(
218 mem,
219 interrupt,
220 host_guid,
221 guest_cid,
222 existing_connections,
223 needs_transport_reset,
224 );
225 let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
226
227 match result {
228 Err(e) => {
229 error!("userspace vsock worker thread exited with error: {:?}", e);
230 None
231 }
232 Ok(paused_queues_and_connections_option) => {
233 paused_queues_and_connections_option
234 }
235 }
236 },
237 ));
238
239 Ok(())
240 }
241 }
242
243 impl VirtioDevice for Vsock {
keep_rds(&self) -> Vec<RawHandle>244 fn keep_rds(&self) -> Vec<RawHandle> {
245 Vec::new()
246 }
247
read_config(&self, offset: u64, data: &mut [u8])248 fn read_config(&self, offset: u64, data: &mut [u8]) {
249 copy_config(data, 0, self.get_config().as_bytes(), offset);
250 }
251
device_type(&self) -> DeviceType252 fn device_type(&self) -> DeviceType {
253 DeviceType::Vsock
254 }
255
queue_max_sizes(&self) -> &[u16]256 fn queue_max_sizes(&self) -> &[u16] {
257 QUEUE_SIZES
258 }
259
features(&self) -> u64260 fn features(&self) -> u64 {
261 self.features
262 }
263
ack_features(&mut self, value: u64)264 fn ack_features(&mut self, value: u64) {
265 self.acked_features |= value;
266 }
267
activate( &mut self, mem: GuestMemory, interrupt: Interrupt, mut queues: BTreeMap<usize, Queue>, ) -> anyhow::Result<()>268 fn activate(
269 &mut self,
270 mem: GuestMemory,
271 interrupt: Interrupt,
272 mut queues: BTreeMap<usize, Queue>,
273 ) -> anyhow::Result<()> {
274 if queues.len() != QUEUE_SIZES.len() {
275 return Err(anyhow!(
276 "Failed to activate vsock device. queues.len(): {} != {}",
277 queues.len(),
278 QUEUE_SIZES.len(),
279 ));
280 }
281
282 let vsock_queues = VsockQueues {
283 rx: queues.remove(&0).unwrap(),
284 tx: queues.remove(&1).unwrap(),
285 event: queues.remove(&2).unwrap(),
286 };
287
288 self.start_worker(mem, interrupt, vsock_queues, None)
289 }
290
virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>>291 fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
292 match self.stop_worker() {
293 StoppedWorker::WithQueues(paused_queues_and_conns) => {
294 let (queues, sleeping_connections) = *paused_queues_and_conns;
295 self.sleeping_connections = Some(sleeping_connections);
296 Ok(Some(queues.into()))
297 }
298 StoppedWorker::MissingQueues => {
299 anyhow::bail!("vsock queue workers did not stop cleanly")
300 }
301 StoppedWorker::AlreadyStopped => {
302 // The device isn't in the activated state.
303 Ok(None)
304 }
305 }
306 }
307
virtio_wake( &mut self, queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>, ) -> anyhow::Result<()>308 fn virtio_wake(
309 &mut self,
310 queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
311 ) -> anyhow::Result<()> {
312 if let Some((mem, interrupt, queues)) = queues_state {
313 let connections = self.sleeping_connections.take();
314 self.start_worker(
315 mem,
316 interrupt,
317 queues
318 .try_into()
319 .expect("Failed to convert queue BTreeMap to VsockQueues"),
320 connections,
321 )?;
322 }
323 Ok(())
324 }
325
virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value>326 fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
327 serde_json::to_value(VsockSnapshot {
328 guest_cid: self.guest_cid,
329 features: self.features,
330 acked_features: self.acked_features,
331 })
332 .context("failed to serialize vsock snapshot")
333 }
334
virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()>335 fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
336 let vsock_snapshot: VsockSnapshot =
337 serde_json::from_value(data).context("error deserializing vsock snapshot")?;
338 anyhow::ensure!(
339 self.guest_cid == vsock_snapshot.guest_cid,
340 "expected guest_cid to match, but they did not. Live: {}, snapshot: {}",
341 self.guest_cid,
342 vsock_snapshot.guest_cid
343 );
344 anyhow::ensure!(
345 self.features == vsock_snapshot.features,
346 "vsock: expected features to match, but they did not. Live: {}, snapshot: {}",
347 self.features,
348 vsock_snapshot.features
349 );
350 self.acked_features = vsock_snapshot.acked_features;
351 self.needs_transport_reset = true;
352
353 Ok(())
354 }
355 }
356
357 #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
358 pub struct PortPair {
359 host: u32,
360 guest: u32,
361 }
362
363 impl Display for PortPair {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result364 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365 write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
366 }
367 }
368
369 impl PortPair {
from_tx_header(header: &virtio_vsock_hdr) -> PortPair370 fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
371 PortPair {
372 host: header.dst_port.to_native(),
373 guest: header.src_port.to_native(),
374 }
375 }
376 }
377
378 // Note: variables herein do not have to be atomic because this struct is guarded
379 // by a RwLock.
380 struct VsockConnection {
381 // The guest port.
382 guest_port: Le32,
383
384 // The actual named (asynchronous) pipe connection.
385 pipe: PipeConnection,
386 // The overlapped struct contains an event object for the named pipe.
387 // This lets us select() on the pipes by waiting on the events.
388 // This is for Reads only.
389 overlapped: Box<OverlappedWrapper>,
390 // Read buffer for the named pipes. These are needed because reads complete
391 // asynchronously.
392 buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
393
394 // Total free-running count of received bytes.
395 recv_cnt: usize,
396
397 // Total free-running count of received bytes that the peer has been informed of.
398 prev_recv_cnt: usize,
399
400 // Total auxiliary buffer space available to receive packets from the driver, not including
401 // the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain
402 // packets for the connection. Note that if the named pipe has a grow on demand TX buffer,
403 // we use DEFAULT_BUF_ALLOC instead.
404 buf_alloc: usize,
405
406 // Peer (driver) total free-running count of received bytes.
407 peer_recv_cnt: usize,
408
409 // Peer (driver) total rx buffer allocated.
410 peer_buf_alloc: usize,
411
412 // Total free-running count of transmitted bytes.
413 tx_cnt: usize,
414
415 // State tracking for full buffer condition. Currently just used for logging. If the peer's
416 // buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
417 // gets set to `true`. Once there's enough space in the buffer, this gets unset.
418 is_buffer_full: bool,
419 }
420
421 struct Worker {
422 mem: GuestMemory,
423 interrupt: Interrupt,
424 host_guid: Option<String>,
425 guest_cid: u64,
426 // Map of host port to a VsockConnection.
427 connections: VsockConnectionMap,
428 connection_event: Event,
429 device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,
430 device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,
431 send_protocol_reset: bool,
432 }
433
434 impl Worker {
new( mem: GuestMemory, interrupt: Interrupt, host_guid: Option<String>, guest_cid: u64, existing_connections: Option<VsockConnectionMap>, send_protocol_reset: bool, ) -> Worker435 fn new(
436 mem: GuestMemory,
437 interrupt: Interrupt,
438 host_guid: Option<String>,
439 guest_cid: u64,
440 existing_connections: Option<VsockConnectionMap>,
441 send_protocol_reset: bool,
442 ) -> Worker {
443 // Buffer size here is arbitrary, but must be at least one since we need
444 // to be able to write a reset event to the channel when the device
445 // worker is brought up on a VM restore. Note that we send exactly one
446 // message per VM session, so we should never see these messages backing
447 // up.
448 let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);
449
450 Worker {
451 mem,
452 interrupt,
453 host_guid,
454 guest_cid,
455 connections: existing_connections.unwrap_or_default(),
456 connection_event: Event::new().unwrap(),
457 device_event_queue_tx,
458 device_event_queue_rx: Some(device_event_queue_rx),
459 send_protocol_reset,
460 }
461 }
462
process_rx_queue( &self, recv_queue: Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, ) -> Result<()>463 async fn process_rx_queue(
464 &self,
465 recv_queue: Arc<RwLock<Queue>>,
466 mut rx_queue_evt: EventAsync,
467 ex: &Executor,
468 mut stop_rx: oneshot::Receiver<()>,
469 ) -> Result<()> {
470 'connections_changed: loop {
471 // Run continuously until exit evt
472
473 // TODO(b/200810561): Optimize this FuturesUnordered code.
474 // Set up the EventAsyncs to select on
475 let futures = FuturesUnordered::new();
476 // This needs to be its own scope since it holds a RwLock on `self.connections`.
477 {
478 let connections = self.connections.read_lock().await;
479 for (port, connection) in connections.iter() {
480 let h_evt = connection
481 .overlapped
482 .get_h_event_ref()
483 .ok_or_else(|| {
484 error!("Missing h_event in OverlappedWrapper.");
485 VsockError::BadOverlappedWrapper
486 })
487 .unwrap()
488 .try_clone()
489 .map_err(|e| {
490 error!("Could not clone h_event.");
491 VsockError::CloneDescriptor(e)
492 })?;
493 let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
494 error!("Could not create EventAsync.");
495 VsockError::CreateEventAsync(e)
496 })?;
497 futures.push(wait_event_and_return_port_pair(evt_async, *port));
498 }
499 }
500 let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
501 error!("Could not clone connection_event.");
502 VsockError::CloneDescriptor(e)
503 })?;
504 let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
505 error!("Could not create EventAsync.");
506 VsockError::CreateEventAsync(e)
507 })?;
508 futures.push(wait_event_and_return_port_pair(
509 connection_evt_async,
510 PortPair {
511 host: CONNECTION_EVENT_PORT_NUM,
512 guest: 0,
513 },
514 ));
515
516 // Wait to service the sockets. Note that for fairness, it is critical that we service
517 // all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
518 // is used, as it returns all currently *ready* futures from the stream.
519 //
520 // The expect here only triggers if the FuturesUnordered stream is exhausted. This never
521 // happens because it has at least one item, and we only request chunks once.
522 let futures_len = futures.len();
523 let mut ready_chunks = futures.ready_chunks(futures_len);
524 let ports = select_biased! {
525 ports = ready_chunks.next() => {
526 ports.expect("failed to wait on vsock sockets")
527 }
528 _ = stop_rx => {
529 break;
530 }
531 };
532
533 for port in ports {
534 if port.host == CONNECTION_EVENT_PORT_NUM {
535 // New connection event. Setup futures again.
536 if let Err(e) = self.connection_event.reset() {
537 error!("vsock: port: {}: could not reset connection_event.", port);
538 return Err(VsockError::ResetEventObject(e));
539 }
540 continue 'connections_changed;
541 }
542 let mut connections = self.connections.lock().await;
543 let connection = if let Some(conn) = connections.get_mut(&port) {
544 conn
545 } else {
546 // We could have been scheduled to run the rx queue *before* the connection was
547 // closed. In that case, we do nothing. The code which closed the connection
548 // (e.g. in response to a message in the tx/rx queues) will handle notifying
549 // the guest/host as required.
550 continue 'connections_changed;
551 };
552
553 // Check if the peer has enough space in their buffer to
554 // receive the maximum amount of data that we could possibly
555 // read from the host pipe. If not, we should continue to
556 // process other sockets as each socket has an independent
557 // buffer.
558 let peer_free_buf_size =
559 connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
560 if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
561 if !connection.is_buffer_full {
562 warn!(
563 "vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
564 port, peer_free_buf_size
565 );
566 connection.is_buffer_full = true;
567 }
568 continue;
569 } else if connection.is_buffer_full {
570 connection.is_buffer_full = false;
571 }
572
573 let pipe_connection = &mut connection.pipe;
574 let overlapped = &mut connection.overlapped;
575 let guest_port = connection.guest_port;
576 let buffer = &mut connection.buffer;
577
578 match overlapped.get_h_event_ref() {
579 Some(h_event) => {
580 if let Err(e) = h_event.reset() {
581 error!(
582 "vsock: port: {}: Could not reset event in OverlappedWrapper.",
583 port
584 );
585 return Err(VsockError::ResetEventObject(e));
586 }
587 }
588 None => {
589 error!(
590 "vsock: port: {}: missing h_event in OverlappedWrapper.",
591 port
592 );
593 return Err(VsockError::BadOverlappedWrapper);
594 }
595 }
596
597 let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
598 Ok(size) => size as usize,
599 Err(e) => {
600 error!("vsock: port {}: Failed to read from pipe {}", port, e);
601 // TODO(b/237278629): Close the connection if we fail to read.
602 continue 'connections_changed;
603 }
604 };
605
606 let response_header = virtio_vsock_hdr {
607 src_cid: 2.into(), // Host CID
608 dst_cid: self.guest_cid.into(), // Guest CID
609 src_port: Le32::from(port.host),
610 dst_port: guest_port,
611 len: Le32::from(data_size as u32),
612 r#type: TYPE_STREAM_SOCKET.into(),
613 op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
614 buf_alloc: Le32::from(connection.buf_alloc as u32),
615 fwd_cnt: Le32::from(connection.recv_cnt as u32),
616 ..Default::default()
617 };
618
619 connection.prev_recv_cnt = connection.recv_cnt;
620
621 // We have to only write to the queue once, so we construct a new buffer
622 // with the concatenated header and data.
623 const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
624 let data_read = &buffer[..data_size];
625 let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
626 header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
627 header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
628 {
629 let mut recv_queue_lock = recv_queue.lock().await;
630 let write_fut = self
631 .write_bytes_to_queue(
632 &mut recv_queue_lock,
633 &mut rx_queue_evt,
634 &header_and_data[..],
635 )
636 .fuse();
637 pin_mut!(write_fut);
638 // If `stop_rx` is fired but the virt queue is full, this loop will break
639 // without draining the `header_and_data`.
640 select_biased! {
641 write = write_fut => {},
642 _ = stop_rx => {
643 break;
644 }
645 }
646 }
647
648 connection.tx_cnt += data_size;
649
650 // Start reading again so we receive the message and
651 // event signal immediately.
652
653 // SAFETY:
654 // Unsafe because the read could happen at any time
655 // after this function is called. We ensure safety
656 // by allocating the buffer and overlapped struct
657 // on the heap.
658 unsafe {
659 match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
660 Ok(()) => {}
661 Err(e) => {
662 error!("vsock: port {}: Failed to read from pipe {}", port, e);
663 }
664 }
665 }
666 }
667 }
668 Ok(())
669 }
670
process_tx_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>, mut stop_rx: oneshot::Receiver<()>, ) -> Result<Queue>671 async fn process_tx_queue(
672 &self,
673 mut queue: Queue,
674 mut queue_evt: EventAsync,
675 mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
676 mut stop_rx: oneshot::Receiver<()>,
677 ) -> Result<Queue> {
678 loop {
679 // Run continuously until exit evt
680 let mut avail_desc = match queue
681 .next_async_interruptable(&mut queue_evt, &mut stop_rx)
682 .await
683 {
684 Ok(Some(d)) => d,
685 Ok(None) => break,
686 Err(e) => {
687 error!("vsock: Failed to read descriptor {}", e);
688 return Err(VsockError::AwaitQueue(e));
689 }
690 };
691
692 let reader = &mut avail_desc.reader;
693 while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
694 let header = match reader.read_obj::<virtio_vsock_hdr>() {
695 Ok(hdr) => hdr,
696 Err(e) => {
697 error!("vsock: Error while reading header: {}", e);
698 break;
699 }
700 };
701
702 let len = header.len.to_native() as usize;
703 if reader.available_bytes() < len {
704 error!("vsock: Error reading packet data");
705 break;
706 }
707
708 let mut data = vec![0_u8; len];
709 if len > 0 {
710 if let Err(e) = reader.read_exact(&mut data) {
711 error!("vosck: failed to read data from tx packet: {:?}", e);
712 }
713 }
714
715 if let Err(e) = process_packets_queue.send((header, data)).await {
716 error!(
717 "Error while sending packet to queue, dropping packet: {:?}",
718 e
719 )
720 };
721 }
722
723 queue.add_used(avail_desc, 0);
724 queue.trigger_interrupt();
725 }
726
727 Ok(queue)
728 }
729
calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize730 fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
731 match pipe.get_info() {
732 Ok(info) => {
733 if info.outgoing_buffer_size > 0 {
734 info.outgoing_buffer_size as usize
735 } else {
736 info!(
737 "vsock: port {}: using default extra rx buffer size \
738 (named pipe does not have an explicit buffer size)",
739 port
740 );
741
742 // A zero buffer size implies that the buffer grows as
743 // needed. We set our own cap here for flow control
744 // purposes.
745 DEFAULT_BUF_ALLOC_BYTES
746 }
747 }
748 Err(e) => {
749 error!(
750 "vsock: port {}: failed to get named pipe info, using default \
751 buf size. Error: {}",
752 port, e
753 );
754 DEFAULT_BUF_ALLOC_BYTES
755 }
756 }
757 }
758
759 /// Processes a connection request and returns whether to return a response (true), or reset
760 /// (false).
handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool761 async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
762 let port = PortPair::from_tx_header(&header);
763 info!("vsock: port {}: Received connection request", port);
764
765 if self.connections.read_lock().await.contains_key(&port) {
766 // Connection exists, nothing for us to do.
767 warn!(
768 "vsock: port {}: accepting connection request on already connected port",
769 port
770 );
771 return true;
772 }
773
774 if self.host_guid.is_none() {
775 error!(
776 "vsock: port {}: Cannot accept guest-initiated connections \
777 without host-guid, rejecting connection",
778 port,
779 );
780 return false;
781 }
782
783 // We have a new connection to establish.
784 let mut overlapped_wrapper =
785 Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
786 let pipe_result = named_pipes::create_client_pipe(
787 get_pipe_name(
788 self.host_guid.as_ref().unwrap(),
789 header.dst_port.to_native(),
790 )
791 .as_str(),
792 &FramingMode::Byte,
793 &BlockingMode::Wait,
794 true, /* overlapped */
795 );
796
797 match pipe_result {
798 Ok(mut pipe_connection) => {
799 let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
800 info!("vsock: port {}: created client pipe", port);
801
802 // SAFETY:
803 // Unsafe because the read could happen at any time
804 // after this function is called. We ensure safety
805 // by allocating the buffer and overlapped struct
806 // on the heap.
807 unsafe {
808 match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
809 {
810 Ok(()) => {}
811 Err(e) => {
812 error!("vsock: port {}: Failed to read from pipe {}", port, e);
813 return false;
814 }
815 }
816 }
817 info!("vsock: port {}: started read on client pipe", port);
818
819 let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
820 let connection = VsockConnection {
821 guest_port: header.src_port,
822 pipe: pipe_connection,
823 overlapped: overlapped_wrapper,
824 peer_buf_alloc: header.buf_alloc.to_native() as usize,
825 peer_recv_cnt: header.fwd_cnt.to_native() as usize,
826 buf_alloc,
827 buffer,
828 // The connection has just been made, so we haven't received
829 // anything yet.
830 recv_cnt: 0_usize,
831 prev_recv_cnt: 0_usize,
832 tx_cnt: 0_usize,
833 is_buffer_full: false,
834 };
835 self.connections.lock().await.insert(port, connection);
836 self.connection_event.signal().unwrap_or_else(|_| {
837 panic!(
838 "Failed to signal new connection event for vsock port {}.",
839 port
840 )
841 });
842 info!("vsock: port {}: signaled connection ready", port);
843 true
844 }
845 Err(e) => {
846 info!(
847 "vsock: No waiting pipe connection on port {}, \
848 not connecting (err: {:?})",
849 port, e
850 );
851 false
852 }
853 }
854 }
855
handle_vsock_guest_data( &self, header: virtio_vsock_hdr, data: &[u8], ex: &Executor, ) -> Result<()>856 async fn handle_vsock_guest_data(
857 &self,
858 header: virtio_vsock_hdr,
859 data: &[u8],
860 ex: &Executor,
861 ) -> Result<()> {
862 let port = PortPair::from_tx_header(&header);
863 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
864 {
865 let mut connections = self.connections.lock().await;
866 if let Some(connection) = connections.get_mut(&port) {
867 // Update peer buffer/recv counters
868 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
869 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
870
871 let pipe = &mut connection.pipe;
872 // We have to provide a OVERLAPPED struct to write to the pipe.
873 //
874 // SAFETY: safe because data & overlapped_wrapper live until the
875 // overlapped operation completes (we wait on completion below).
876 if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
877 return Err(VsockError::WriteFailed(port, e));
878 }
879 } else {
880 error!(
881 "vsock: Guest attempted to send data packet over unconnected \
882 port ({}), dropping packet",
883 port
884 );
885 return Ok(());
886 }
887 }
888 if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
889 // Don't block the executor while the write completes. This time should
890 // always be negligible, but will sometimes be non-zero in cases where
891 // traffic is high on the NamedPipe, especially a duplex pipe.
892 if let Ok(cloned_event) = write_completed_event.try_clone() {
893 if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
894 let _ = async_event.next_val().await;
895 } else {
896 error!(
897 "vsock: port {}: Failed to convert write event to async",
898 port
899 );
900 }
901 } else {
902 error!(
903 "vsock: port {}: Failed to clone write completion event",
904 port
905 );
906 }
907 } else {
908 error!(
909 "vsock: port: {}: Failed to get overlapped event for write",
910 port
911 );
912 }
913
914 let mut connections = self.connections.lock().await;
915 if let Some(connection) = connections.get_mut(&port) {
916 let pipe = &mut connection.pipe;
917 match pipe.get_overlapped_result(&mut overlapped_wrapper) {
918 Ok(len) => {
919 // We've received bytes from the guest, account for them in our
920 // received bytes counter.
921 connection.recv_cnt += len as usize;
922
923 if len != data.len() as u32 {
924 return Err(VsockError::WriteFailed(
925 port,
926 std::io::Error::new(
927 std::io::ErrorKind::Other,
928 format!(
929 "port {} failed to write correct number of bytes:
930 (expected: {}, wrote: {})",
931 port,
932 data.len(),
933 len
934 ),
935 ),
936 ));
937 }
938 }
939 Err(e) => {
940 return Err(VsockError::WriteFailed(port, e));
941 }
942 }
943 } else {
944 error!(
945 "vsock: Guest attempted to send data packet over unconnected \
946 port ({}), dropping packet",
947 port
948 );
949 }
950 Ok(())
951 }
952
process_tx_packets( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: Event, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, ex: &Executor, mut stop_rx: oneshot::Receiver<()>, )953 async fn process_tx_packets(
954 &self,
955 send_queue: &Arc<RwLock<Queue>>,
956 rx_queue_evt: Event,
957 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
958 ex: &Executor,
959 mut stop_rx: oneshot::Receiver<()>,
960 ) {
961 let mut packet_queues = HashMap::new();
962 let mut futures = FuturesUnordered::new();
963 // Push a pending future that will never complete into FuturesUnordered.
964 // This will keep us from spinning on spurious notifications when we
965 // don't have any open connections.
966 futures.push(std::future::pending::<PortPair>().left_future());
967
968 let mut stop_future = FuturesUnordered::new();
969 stop_future.push(stop_rx);
970 loop {
971 let (new_packet, connection, stop_rx_res) =
972 select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;
973 match connection {
974 SelectResult::Finished(Some(port)) => {
975 packet_queues.remove(&port);
976 }
977 SelectResult::Finished(_) => {
978 // This is only triggered when FuturesUnordered completes
979 // all pending futures. Right now, this can never happen, as
980 // we have a pending future that we push that will never
981 // complete.
982 }
983 SelectResult::Pending(_) => {
984 // Nothing to do.
985 }
986 };
987 match new_packet {
988 SelectResult::Finished(Some(packet)) => {
989 let port = PortPair::from_tx_header(&packet.0);
990 let queue = packet_queues.entry(port).or_insert_with(|| {
991 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
992 let event_async = EventAsync::new(
993 rx_queue_evt.try_clone().expect("Failed to clone event"),
994 ex,
995 )
996 .expect("Failed to set up the rx queue event");
997 futures.push(
998 self.process_tx_packets_for_port(
999 port,
1000 recv,
1001 send_queue,
1002 event_async,
1003 ex,
1004 )
1005 .right_future(),
1006 );
1007 send
1008 });
1009 // Try to send the packet. Do not block other ports if the queue is full.
1010 if let Err(e) = queue.try_send(packet) {
1011 error!(
1012 "vsock: port {}: error sending packet to queue, dropping packet: {:?}",
1013 port, e
1014 )
1015 }
1016 }
1017 SelectResult::Finished(_) => {
1018 // Triggers when the channel is closed; no more packets coming.
1019 packet_recv_queue.close();
1020 return;
1021 }
1022 SelectResult::Pending(_) => {
1023 // Nothing to do.
1024 }
1025 }
1026 match stop_rx_res {
1027 SelectResult::Finished(_) => {
1028 break;
1029 }
1030 SelectResult::Pending(_) => {
1031 // Nothing to do.
1032 }
1033 }
1034 }
1035 }
1036
process_tx_packets_for_port( &self, port: PortPair, mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>, send_queue: &Arc<RwLock<Queue>>, mut rx_queue_evt: EventAsync, ex: &Executor, ) -> PortPair1037 async fn process_tx_packets_for_port(
1038 &self,
1039 port: PortPair,
1040 mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
1041 send_queue: &Arc<RwLock<Queue>>,
1042 mut rx_queue_evt: EventAsync,
1043 ex: &Executor,
1044 ) -> PortPair {
1045 while let Some((header, data)) = packet_recv_queue.next().await {
1046 if !self
1047 .handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
1048 .await
1049 {
1050 packet_recv_queue.close();
1051 if let Ok(Some(_)) = packet_recv_queue.try_next() {
1052 warn!("vsock: closing port {} with unprocessed packets", port);
1053 } else {
1054 info!("vsock: closing port {} cleanly", port)
1055 }
1056 break;
1057 }
1058 }
1059 port
1060 }
1061
handle_tx_packet( &self, header: virtio_vsock_hdr, data: &[u8], send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, ex: &Executor, ) -> bool1062 async fn handle_tx_packet(
1063 &self,
1064 header: virtio_vsock_hdr,
1065 data: &[u8],
1066 send_queue: &Arc<RwLock<Queue>>,
1067 rx_queue_evt: &mut EventAsync,
1068 ex: &Executor,
1069 ) -> bool {
1070 let mut is_open = true;
1071 let port = PortPair::from_tx_header(&header);
1072 match header.op.to_native() {
1073 vsock_op::VIRTIO_VSOCK_OP_INVALID => {
1074 error!("vsock: Invalid Operation requested, dropping packet");
1075 }
1076 vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
1077 let (resp_op, buf_alloc, fwd_cnt) =
1078 if self.handle_vsock_connection_request(header).await {
1079 let connections = self.connections.read_lock().await;
1080
1081 connections.get(&port).map_or_else(
1082 || {
1083 warn!("vsock: port: {} connection closed during connect", port);
1084 is_open = false;
1085 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1086 },
1087 |conn| {
1088 (
1089 vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
1090 conn.buf_alloc as u32,
1091 conn.recv_cnt as u32,
1092 )
1093 },
1094 )
1095 } else {
1096 is_open = false;
1097 (vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1098 };
1099
1100 let response_header = virtio_vsock_hdr {
1101 src_cid: { header.dst_cid },
1102 dst_cid: { header.src_cid },
1103 src_port: { header.dst_port },
1104 dst_port: { header.src_port },
1105 len: 0.into(),
1106 r#type: TYPE_STREAM_SOCKET.into(),
1107 op: resp_op.into(),
1108 buf_alloc: Le32::from(buf_alloc),
1109 fwd_cnt: Le32::from(fwd_cnt),
1110 ..Default::default()
1111 };
1112 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1113 // bytes.
1114 self.write_bytes_to_queue(
1115 &mut *send_queue.lock().await,
1116 rx_queue_evt,
1117 response_header.as_bytes(),
1118 )
1119 .await
1120 .expect("vsock: failed to write to queue");
1121 info!(
1122 "vsock: port {}: replied {} to connection request",
1123 port,
1124 if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {
1125 "success"
1126 } else {
1127 "reset"
1128 },
1129 );
1130 }
1131 vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
1132 // TODO(b/237811512): Implement this for host-initiated connections
1133 }
1134 vsock_op::VIRTIO_VSOCK_OP_RST => {
1135 // TODO(b/237811512): Implement this for host-initiated connections
1136 }
1137 vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
1138 // While the header provides flags to specify tx/rx-specific shutdown,
1139 // we only support full shutdown.
1140 // TODO(b/237811512): Provide an optimal way to notify host of shutdowns
1141 // while still maintaining easy reconnections.
1142 let mut connections = self.connections.lock().await;
1143 if connections.remove(&port).is_some() {
1144 let mut response = virtio_vsock_hdr {
1145 src_cid: { header.dst_cid },
1146 dst_cid: { header.src_cid },
1147 src_port: { header.dst_port },
1148 dst_port: { header.src_port },
1149 len: 0.into(),
1150 r#type: TYPE_STREAM_SOCKET.into(),
1151 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1152 // There is no buffer on a closed connection
1153 buf_alloc: 0.into(),
1154 // There is no fwd_cnt anymore on a closed connection
1155 fwd_cnt: 0.into(),
1156 ..Default::default()
1157 };
1158 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1159 // bytes
1160 self.write_bytes_to_queue(
1161 &mut *send_queue.lock().await,
1162 rx_queue_evt,
1163 response.as_bytes_mut(),
1164 )
1165 .await
1166 .expect("vsock: failed to write to queue");
1167 self.connection_event
1168 .signal()
1169 .expect("vsock: failed to write to event");
1170 info!("vsock: port: {}: disconnected by the guest", port);
1171 } else {
1172 error!("vsock: Attempted to close unopened port: {}", port);
1173 }
1174 is_open = false;
1175 }
1176 vsock_op::VIRTIO_VSOCK_OP_RW => {
1177 match self.handle_vsock_guest_data(header, data, ex).await {
1178 Ok(()) => {
1179 if self
1180 .check_free_buffer_threshold(header)
1181 .await
1182 .unwrap_or(false)
1183 {
1184 // Send a credit update if we're below the minimum free
1185 // buffer size. We skip this if the connection is closed,
1186 // which could've happened if we were closed on the other
1187 // end.
1188 info!(
1189 "vsock: port {}: Buffer below threshold; sending credit update.",
1190 port
1191 );
1192 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1193 .await;
1194 }
1195 }
1196 Err(e) => {
1197 error!("vsock: port {}: resetting connection: {}", port, e);
1198 self.send_vsock_reset(send_queue, rx_queue_evt, header)
1199 .await;
1200 is_open = false;
1201 }
1202 }
1203 }
1204 // An update from our peer with their buffer state, which they are sending
1205 // (probably) due to a a credit request *we* made.
1206 vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
1207 let port = PortPair::from_tx_header(&header);
1208 let mut connections = self.connections.lock().await;
1209 if let Some(connection) = connections.get_mut(&port) {
1210 connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1211 connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1212 } else {
1213 error!("vsock: port {}: got credit update on unknown port", port);
1214 is_open = false;
1215 }
1216 }
1217 // A request from our peer to get *our* buffer state. We reply to the RX queue.
1218 vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1219 info!(
1220 "vsock: port {}: Got credit request from peer; sending credit update.",
1221 port,
1222 );
1223 self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1224 .await;
1225 }
1226 _ => {
1227 error!(
1228 "vsock: port {}: unknown operation requested, dropping packet",
1229 port
1230 );
1231 }
1232 }
1233 is_open
1234 }
1235
1236 // Checks if how much free buffer our peer thinks that *we* have available
1237 // is below our threshold percentage. If the connection is closed, returns `None`.
check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool>1238 async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1239 let mut connections = self.connections.lock().await;
1240 let port = PortPair::from_tx_header(&header);
1241 connections.get_mut(&port).map(|connection| {
1242 let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1243 connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1244 })
1245 }
1246
send_vsock_credit_update( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1247 async fn send_vsock_credit_update(
1248 &self,
1249 send_queue: &Arc<RwLock<Queue>>,
1250 rx_queue_evt: &mut EventAsync,
1251 header: virtio_vsock_hdr,
1252 ) {
1253 let mut connections = self.connections.lock().await;
1254 let port = PortPair::from_tx_header(&header);
1255
1256 if let Some(connection) = connections.get_mut(&port) {
1257 let mut response = virtio_vsock_hdr {
1258 src_cid: { header.dst_cid },
1259 dst_cid: { header.src_cid },
1260 src_port: { header.dst_port },
1261 dst_port: { header.src_port },
1262 len: 0.into(),
1263 r#type: TYPE_STREAM_SOCKET.into(),
1264 op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1265 buf_alloc: Le32::from(connection.buf_alloc as u32),
1266 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1267 ..Default::default()
1268 };
1269
1270 connection.prev_recv_cnt = connection.recv_cnt;
1271
1272 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1273 // to bytes
1274 self.write_bytes_to_queue(
1275 &mut *send_queue.lock().await,
1276 rx_queue_evt,
1277 response.as_bytes_mut(),
1278 )
1279 .await
1280 .unwrap_or_else(|_| panic!("vsock: port {}: failed to write to queue", port));
1281 } else {
1282 error!(
1283 "vsock: port {}: error sending credit update on unknown port",
1284 port
1285 );
1286 }
1287 }
1288
send_vsock_reset( &self, send_queue: &Arc<RwLock<Queue>>, rx_queue_evt: &mut EventAsync, header: virtio_vsock_hdr, )1289 async fn send_vsock_reset(
1290 &self,
1291 send_queue: &Arc<RwLock<Queue>>,
1292 rx_queue_evt: &mut EventAsync,
1293 header: virtio_vsock_hdr,
1294 ) {
1295 let mut connections = self.connections.lock().await;
1296 let port = PortPair::from_tx_header(&header);
1297 if let Some(connection) = connections.remove(&port) {
1298 let mut response = virtio_vsock_hdr {
1299 src_cid: { header.dst_cid },
1300 dst_cid: { header.src_cid },
1301 src_port: { header.dst_port },
1302 dst_port: { header.src_port },
1303 len: 0.into(),
1304 r#type: TYPE_STREAM_SOCKET.into(),
1305 op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1306 buf_alloc: Le32::from(connection.buf_alloc as u32),
1307 fwd_cnt: Le32::from(connection.recv_cnt as u32),
1308 ..Default::default()
1309 };
1310
1311 // Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1312 // to bytes
1313 self.write_bytes_to_queue(
1314 &mut *send_queue.lock().await,
1315 rx_queue_evt,
1316 response.as_bytes_mut(),
1317 )
1318 .await
1319 .expect("failed to write to queue");
1320 } else {
1321 error!("vsock: port {}: error closing unknown port", port);
1322 }
1323 }
1324
write_bytes_to_queue( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], ) -> Result<()>1325 async fn write_bytes_to_queue(
1326 &self,
1327 queue: &mut Queue,
1328 queue_evt: &mut EventAsync,
1329 bytes: &[u8],
1330 ) -> Result<()> {
1331 let mut avail_desc = match queue.next_async(queue_evt).await {
1332 Ok(d) => d,
1333 Err(e) => {
1334 error!("vsock: failed to read descriptor {}", e);
1335 return Err(VsockError::AwaitQueue(e));
1336 }
1337 };
1338 self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1339 }
1340
write_bytes_to_queue_interruptable( &self, queue: &mut Queue, queue_evt: &mut EventAsync, bytes: &[u8], mut stop_rx: &mut oneshot::Receiver<()>, ) -> Result<()>1341 async fn write_bytes_to_queue_interruptable(
1342 &self,
1343 queue: &mut Queue,
1344 queue_evt: &mut EventAsync,
1345 bytes: &[u8],
1346 mut stop_rx: &mut oneshot::Receiver<()>,
1347 ) -> Result<()> {
1348 let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {
1349 Ok(d) => match d {
1350 Some(desc) => desc,
1351 None => return Ok(()),
1352 },
1353 Err(e) => {
1354 error!("vsock: failed to read descriptor {}", e);
1355 return Err(VsockError::AwaitQueue(e));
1356 }
1357 };
1358 self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1359 }
1360
write_bytes_to_queue_inner( &self, queue: &mut Queue, mut desc_chain: DescriptorChain, bytes: &[u8], ) -> Result<()>1361 fn write_bytes_to_queue_inner(
1362 &self,
1363 queue: &mut Queue,
1364 mut desc_chain: DescriptorChain,
1365 bytes: &[u8],
1366 ) -> Result<()> {
1367 let writer = &mut desc_chain.writer;
1368 let res = writer.write_all(bytes);
1369
1370 if let Err(e) = res {
1371 error!(
1372 "vsock: failed to write {} bytes to queue, err: {:?}",
1373 bytes.len(),
1374 e
1375 );
1376 return Err(VsockError::WriteQueue(e));
1377 }
1378
1379 let bytes_written = writer.bytes_written() as u32;
1380 if bytes_written > 0 {
1381 queue.add_used(desc_chain, bytes_written);
1382 queue.trigger_interrupt();
1383 Ok(())
1384 } else {
1385 error!("vsock: failed to write bytes to queue");
1386 Err(VsockError::WriteQueue(std::io::Error::new(
1387 std::io::ErrorKind::Other,
1388 "failed to write bytes to queue",
1389 )))
1390 }
1391 }
1392
process_event_queue( &self, mut queue: Queue, mut queue_evt: EventAsync, mut stop_rx: oneshot::Receiver<()>, mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>, ) -> Result<Queue>1393 async fn process_event_queue(
1394 &self,
1395 mut queue: Queue,
1396 mut queue_evt: EventAsync,
1397 mut stop_rx: oneshot::Receiver<()>,
1398 mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,
1399 ) -> Result<Queue> {
1400 loop {
1401 let vsock_event = select_biased! {
1402 vsock_event = vsock_event_receiver.next() => {
1403 vsock_event
1404 }
1405 _ = stop_rx => {
1406 break;
1407 }
1408 };
1409 let vsock_event = match vsock_event {
1410 Some(event) => event,
1411 None => break,
1412 };
1413 self.write_bytes_to_queue_interruptable(
1414 &mut queue,
1415 &mut queue_evt,
1416 vsock_event.as_bytes(),
1417 &mut stop_rx,
1418 )
1419 .await?;
1420 }
1421 Ok(queue)
1422 }
1423
run( mut self, rx_queue: Queue, tx_queue: Queue, event_queue: Queue, kill_evt: Event, ) -> Result<Option<(PausedQueues, VsockConnectionMap)>>1424 fn run(
1425 mut self,
1426 rx_queue: Queue,
1427 tx_queue: Queue,
1428 event_queue: Queue,
1429 kill_evt: Event,
1430 ) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {
1431 let rx_queue_evt = rx_queue
1432 .event()
1433 .try_clone()
1434 .map_err(VsockError::CloneDescriptor)?;
1435
1436 // Note that this mutex won't ever be contended because the HandleExecutor is single
1437 // threaded. We need the mutex for compile time correctness, but technically it is not
1438 // actually providing mandatory locking, at least not at the moment. If we later use a
1439 // multi-threaded executor, then this lock will be important.
1440 let rx_queue_arc = Arc::new(RwLock::new(rx_queue));
1441
1442 // Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.
1443 let res = {
1444 let ex = Executor::new().unwrap();
1445
1446 let rx_evt_async = EventAsync::new(
1447 rx_queue_evt
1448 .try_clone()
1449 .map_err(VsockError::CloneDescriptor)?,
1450 &ex,
1451 )
1452 .expect("Failed to set up the rx queue event");
1453 let mut stop_queue_oneshots = Vec::new();
1454
1455 let vsock_event_receiver = self
1456 .device_event_queue_rx
1457 .take()
1458 .expect("event queue rx must be present");
1459
1460 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1461 let rx_handler =
1462 self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);
1463 let rx_handler = rx_handler.fuse();
1464 pin_mut!(rx_handler);
1465
1466 let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1467
1468 let tx_evt_async = EventAsync::new(
1469 tx_queue
1470 .event()
1471 .try_clone()
1472 .map_err(VsockError::CloneDescriptor)?,
1473 &ex,
1474 )
1475 .expect("Failed to set up the tx queue event");
1476 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1477 let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
1478 let tx_handler = tx_handler.fuse();
1479 pin_mut!(tx_handler);
1480
1481 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1482 let packet_handler =
1483 self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);
1484 let packet_handler = packet_handler.fuse();
1485 pin_mut!(packet_handler);
1486
1487 let event_evt_async = EventAsync::new(
1488 event_queue
1489 .event()
1490 .try_clone()
1491 .map_err(VsockError::CloneDescriptor)?,
1492 &ex,
1493 )
1494 .expect("Failed to set up the event queue event");
1495 let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1496 let event_handler = self.process_event_queue(
1497 event_queue,
1498 event_evt_async,
1499 stop_rx,
1500 vsock_event_receiver,
1501 );
1502 let event_handler = event_handler.fuse();
1503 pin_mut!(event_handler);
1504
1505 // Process any requests to resample the irq value.
1506 let resample_handler = async_utils::handle_irq_resample(&ex, self.interrupt.clone());
1507 let resample_handler = resample_handler.fuse();
1508 pin_mut!(resample_handler);
1509
1510 let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1511 let kill_handler = kill_evt.next_val();
1512 pin_mut!(kill_handler);
1513
1514 let mut device_event_queue_tx = self.device_event_queue_tx.clone();
1515 if self.send_protocol_reset {
1516 ex.run_until(async move { device_event_queue_tx.send(
1517 virtio_vsock_event {
1518 id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET
1519 .into(),
1520 }).await
1521 }).expect("failed to write to empty mpsc queue.");
1522 }
1523
1524 ex.run_until(async {
1525 select! {
1526 _ = kill_handler.fuse() => (),
1527 _ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),
1528 _ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),
1529 _ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),
1530 _ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),
1531 _ = resample_handler => return Err(anyhow!("resample_handler stop unexpectedly.")),
1532 }
1533 // kill_evt has fired
1534
1535 for stop_tx in stop_queue_oneshots {
1536 if stop_tx.send(()).is_err() {
1537 return Err(anyhow!("failed to request stop for queue future"));
1538 }
1539 }
1540
1541 rx_handler.await.context("Failed to stop rx handler.")?;
1542 packet_handler.await;
1543
1544 Ok((
1545 tx_handler.await.context("Failed to stop tx handler.")?,
1546 event_handler
1547 .await
1548 .context("Failed to stop event handler.")?,
1549 ))
1550 })
1551 };
1552
1553 // At this point, a request to stop this worker has been sent or an error has happened in
1554 // one of the futures, which will stop this worker anyways.
1555
1556 let queues_and_connections = match res {
1557 Ok(main_future_res) => match main_future_res {
1558 Ok((tx_queue, event_handler_queue)) => {
1559 let rx_queue = match Arc::try_unwrap(rx_queue_arc) {
1560 Ok(queue_rw_lock) => queue_rw_lock.into_inner(),
1561 Err(_) => panic!("failed to recover queue from worker"),
1562 };
1563 let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);
1564 Some((paused_queues, self.connections))
1565 }
1566 Err(e) => {
1567 error!("Error happened in a vsock future: {}", e);
1568 None
1569 }
1570 },
1571 Err(e) => {
1572 error!("error happened in executor: {}", e);
1573 None
1574 }
1575 };
1576
1577 Ok(queues_and_connections)
1578 }
1579 }
1580
1581 /// Queues & events for the vsock device.
1582 struct VsockQueues {
1583 rx: Queue,
1584 tx: Queue,
1585 event: Queue,
1586 }
1587
1588 impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
1589 type Error = anyhow::Error;
try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error>1590 fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
1591 if queues.len() < 3 {
1592 anyhow::bail!(
1593 "{} queues were found, but an activated vsock must have at 3 active queues.",
1594 queues.len()
1595 );
1596 }
1597
1598 Ok(VsockQueues {
1599 rx: queues.remove(&0).context("the rx queue is required.")?,
1600 tx: queues.remove(&1).context("the tx queue is required.")?,
1601 event: queues.remove(&2).context("the event queue is required.")?,
1602 })
1603 }
1604 }
1605
1606 impl From<PausedQueues> for BTreeMap<usize, Queue> {
from(queues: PausedQueues) -> Self1607 fn from(queues: PausedQueues) -> Self {
1608 let mut ret = BTreeMap::new();
1609 ret.insert(0, queues.rx_queue);
1610 ret.insert(1, queues.tx_queue);
1611 ret.insert(2, queues.event_queue);
1612 ret
1613 }
1614 }
1615
1616 struct PausedQueues {
1617 rx_queue: Queue,
1618 tx_queue: Queue,
1619 event_queue: Queue,
1620 }
1621
1622 impl PausedQueues {
new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self1623 fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {
1624 PausedQueues {
1625 rx_queue,
1626 tx_queue,
1627 event_queue,
1628 }
1629 }
1630 }
1631
get_pipe_name(guid: &str, pipe: u32) -> String1632 fn get_pipe_name(guid: &str, pipe: u32) -> String {
1633 format!("\\\\.\\pipe\\{}\\vsock-{}", guid, pipe)
1634 }
1635
wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair1636 async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1637 // This doesn't reset the event since we have to call GetOverlappedResult
1638 // on the OVERLAPPED struct first before resetting it.
1639 let _ = evt.get_io_source_ref().wait_for_handle().await;
1640 pair
1641 }
1642