1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 //! Module providing an async abstraction around a quiche HTTP/3 connection
17 
18 use crate::boot_time::BootTime;
19 use crate::connection::driver::Cause;
20 use crate::connection::driver::HandshakeInfo;
21 use crate::network::ServerInfo;
22 use crate::network::SocketTagger;
23 use log::{debug, error, warn};
24 use quiche::h3;
25 use std::future::Future;
26 use std::io;
27 use std::net::SocketAddr;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::sync::{mpsc, oneshot, watch};
31 use tokio::task;
32 
33 pub mod driver;
34 
35 pub use driver::Stream;
36 use driver::{drive, Request};
37 
38 #[derive(Debug, Clone)]
39 pub enum Status {
40     QUIC,
41     H3,
42     Dead {
43         /// The session of the closed connection.
44         session: Option<Vec<u8>>,
45     },
46 }
47 
48 /// Quiche HTTP/3 connection
49 pub struct Connection {
50     request_tx: mpsc::Sender<Request>,
51     status_rx: watch::Receiver<Status>,
52 }
53 
new_scid() -> [u8; quiche::MAX_CONN_ID_LEN]54 fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
55     use ring::rand::{SecureRandom, SystemRandom};
56     let mut scid = [0; quiche::MAX_CONN_ID_LEN];
57     SystemRandom::new().fill(&mut scid).unwrap();
58     scid
59 }
60 
mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()>61 fn mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()> {
62     use std::os::unix::io::AsRawFd;
63     let fd = socket.as_raw_fd();
64     // SAFETY: libc::setsockopt is a wrapper function calling into bionic setsockopt.
65     // The only pointer being passed in is &socket_mark, which is valid by virtue of being a
66     // reference, and the foreign function doesn't take ownership or a reference to that memory
67     // after completion.
68     if unsafe {
69         libc::setsockopt(
70             fd,
71             libc::SOL_SOCKET,
72             libc::SO_MARK,
73             &socket_mark as *const _ as *const libc::c_void,
74             std::mem::size_of::<u32>() as libc::socklen_t,
75         )
76     } == 0
77     {
78         Ok(())
79     } else {
80         Err(io::Error::last_os_error())
81     }
82 }
83 
build_socket( peer_addr: SocketAddr, socket_mark: u32, tag_socket: &SocketTagger, ) -> io::Result<UdpSocket>84 async fn build_socket(
85     peer_addr: SocketAddr,
86     socket_mark: u32,
87     tag_socket: &SocketTagger,
88 ) -> io::Result<UdpSocket> {
89     let bind_addr = match peer_addr {
90         SocketAddr::V4(_) => "0.0.0.0:0",
91         SocketAddr::V6(_) => "[::]:0",
92     };
93 
94     let socket = UdpSocket::bind(bind_addr).await?;
95     let std_socket = socket.into_std()?;
96     mark_socket(&std_socket, socket_mark)
97         .unwrap_or_else(|e| error!("Unable to mark socket : {:?}", e));
98     tag_socket(&std_socket).await;
99     let socket = UdpSocket::from_std(std_socket)?;
100     socket.connect(peer_addr).await?;
101     Ok(socket)
102 }
103 
104 /// Error type for HTTP/3 connection
105 #[derive(Debug, Error)]
106 pub enum Error {
107     /// QUIC protocol error
108     #[error("QUIC error: {0}")]
109     Quic(#[from] quiche::Error),
110     /// HTTP/3 protocol error
111     #[error("HTTP/3 error: {0}")]
112     H3(#[from] h3::Error),
113     /// Unable to send the request to the driver. This likely means the
114     /// backing task has died.
115     #[error("Unable to send request")]
116     SendRequest(#[from] mpsc::error::SendError<Request>),
117     /// IO failed. This is most likely to occur while trying to set up the
118     /// UDP socket for use by the connection.
119     #[error("IO error: {0}")]
120     Io(#[from] io::Error),
121     /// The request is no longer being serviced. This could mean that the
122     /// request was dropped for an unspecified reason, or that the connection
123     /// was closed prematurely and it can no longer be serviced.
124     #[error("Driver dropped request")]
125     RecvResponse(#[from] oneshot::error::RecvError),
126 }
127 
128 /// Common result type for working with a HTTP/3 connection
129 pub type Result<T> = std::result::Result<T, Error>;
130 
131 impl Connection {
132     const MAX_PENDING_REQUESTS: usize = 10;
133     /// Create a new connection with a background task handling IO.
new( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Self>134     pub async fn new(
135         info: &ServerInfo,
136         tag_socket: &SocketTagger,
137         config: &mut quiche::Config,
138         session: Option<Vec<u8>>,
139         cause: Cause,
140     ) -> Result<Self> {
141         let server_name = info.domain.as_deref();
142         let to = info.peer_addr;
143         let socket_mark = info.sk_mark;
144         let net_id = info.net_id;
145         let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
146         let (status_tx, status_rx) = watch::channel(Status::QUIC);
147         let scid = new_scid();
148         let socket = build_socket(to, socket_mark, tag_socket).await?;
149         let from = socket.local_addr()?;
150 
151         let mut quiche_conn =
152             quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), from, to, config)?;
153 
154         // We will fall back to a full handshake if the session is expired.
155         if let Some(session) = session {
156             debug!("Setting session");
157             quiche_conn.set_session(&session)?;
158         }
159 
160         let handshake_info = HandshakeInfo {
161             cause,
162             sent_bytes: 0,
163             recv_bytes: 0,
164             elapsed: 0,
165             quic_version: 0,
166             network_type: info.network_type,
167             private_dns_mode: info.private_dns_mode,
168             session_hit_checker: quiche_conn.session().is_some(),
169         };
170 
171         let driver = async move {
172             let result =
173                 drive(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).await;
174             if let Err(ref e) = result {
175                 warn!("Connection driver returns some Err: {:?}", e);
176             }
177             result
178         };
179         task::spawn(driver);
180         Ok(Self { request_tx, status_rx })
181     }
182 
183     /// Waits until we're either fully alive or dead
wait_for_live(&mut self) -> bool184     pub async fn wait_for_live(&mut self) -> bool {
185         // Once sc-mainline-prod updates to modern tokio, use
186         // borrow_and_update here.
187         match &*self.status_rx.borrow() {
188             Status::H3 => return true,
189             Status::Dead { .. } => return false,
190             Status::QUIC => (),
191         }
192         if self.status_rx.changed().await.is_err() {
193             // status_tx is gone, we're dead
194             return false;
195         }
196         if matches!(*self.status_rx.borrow(), Status::H3) {
197             return true;
198         }
199         // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
200         match self.status_rx.changed().await {
201             // status_tx is gone, we're dead
202             Err(_) => false,
203             // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
204             _ => matches!(*self.status_rx.borrow(), Status::H3),
205         }
206     }
207 
session(&self) -> Option<Vec<u8>>208     pub fn session(&self) -> Option<Vec<u8>> {
209         match &*self.status_rx.borrow() {
210             Status::Dead { session } => session.clone(),
211             _ => None,
212         }
213     }
214 
215     /// Send a query, produce a future which will provide a response.
216     /// The future is separately returned rather than awaited to allow it to be waited on without
217     /// keeping the `Connection` itself borrowed.
query( &self, headers: Vec<h3::Header>, expiry: Option<BootTime>, ) -> Result<impl Future<Output = Option<Stream>>>218     pub async fn query(
219         &self,
220         headers: Vec<h3::Header>,
221         expiry: Option<BootTime>,
222     ) -> Result<impl Future<Output = Option<Stream>>> {
223         let (response_tx, response_rx) = oneshot::channel();
224         self.request_tx.send(Request { headers, response_tx, expiry }).await?;
225         Ok(async move { response_rx.await.ok() })
226     }
227 }
228