1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Module providing an async abstraction around a quiche HTTP/3 connection
17
18 use crate::boot_time::BootTime;
19 use crate::connection::driver::Cause;
20 use crate::connection::driver::HandshakeInfo;
21 use crate::network::ServerInfo;
22 use crate::network::SocketTagger;
23 use log::{debug, error, warn};
24 use quiche::h3;
25 use std::future::Future;
26 use std::io;
27 use std::net::SocketAddr;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::sync::{mpsc, oneshot, watch};
31 use tokio::task;
32
33 pub mod driver;
34
35 pub use driver::Stream;
36 use driver::{drive, Request};
37
38 #[derive(Debug, Clone)]
39 pub enum Status {
40 QUIC,
41 H3,
42 Dead {
43 /// The session of the closed connection.
44 session: Option<Vec<u8>>,
45 },
46 }
47
48 /// Quiche HTTP/3 connection
49 pub struct Connection {
50 request_tx: mpsc::Sender<Request>,
51 status_rx: watch::Receiver<Status>,
52 }
53
new_scid() -> [u8; quiche::MAX_CONN_ID_LEN]54 fn new_scid() -> [u8; quiche::MAX_CONN_ID_LEN] {
55 use ring::rand::{SecureRandom, SystemRandom};
56 let mut scid = [0; quiche::MAX_CONN_ID_LEN];
57 SystemRandom::new().fill(&mut scid).unwrap();
58 scid
59 }
60
mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()>61 fn mark_socket(socket: &std::net::UdpSocket, socket_mark: u32) -> io::Result<()> {
62 use std::os::unix::io::AsRawFd;
63 let fd = socket.as_raw_fd();
64 // SAFETY: libc::setsockopt is a wrapper function calling into bionic setsockopt.
65 // The only pointer being passed in is &socket_mark, which is valid by virtue of being a
66 // reference, and the foreign function doesn't take ownership or a reference to that memory
67 // after completion.
68 if unsafe {
69 libc::setsockopt(
70 fd,
71 libc::SOL_SOCKET,
72 libc::SO_MARK,
73 &socket_mark as *const _ as *const libc::c_void,
74 std::mem::size_of::<u32>() as libc::socklen_t,
75 )
76 } == 0
77 {
78 Ok(())
79 } else {
80 Err(io::Error::last_os_error())
81 }
82 }
83
build_socket( peer_addr: SocketAddr, socket_mark: u32, tag_socket: &SocketTagger, ) -> io::Result<UdpSocket>84 async fn build_socket(
85 peer_addr: SocketAddr,
86 socket_mark: u32,
87 tag_socket: &SocketTagger,
88 ) -> io::Result<UdpSocket> {
89 let bind_addr = match peer_addr {
90 SocketAddr::V4(_) => "0.0.0.0:0",
91 SocketAddr::V6(_) => "[::]:0",
92 };
93
94 let socket = UdpSocket::bind(bind_addr).await?;
95 let std_socket = socket.into_std()?;
96 mark_socket(&std_socket, socket_mark)
97 .unwrap_or_else(|e| error!("Unable to mark socket : {:?}", e));
98 tag_socket(&std_socket).await;
99 let socket = UdpSocket::from_std(std_socket)?;
100 socket.connect(peer_addr).await?;
101 Ok(socket)
102 }
103
104 /// Error type for HTTP/3 connection
105 #[derive(Debug, Error)]
106 pub enum Error {
107 /// QUIC protocol error
108 #[error("QUIC error: {0}")]
109 Quic(#[from] quiche::Error),
110 /// HTTP/3 protocol error
111 #[error("HTTP/3 error: {0}")]
112 H3(#[from] h3::Error),
113 /// Unable to send the request to the driver. This likely means the
114 /// backing task has died.
115 #[error("Unable to send request")]
116 SendRequest(#[from] mpsc::error::SendError<Request>),
117 /// IO failed. This is most likely to occur while trying to set up the
118 /// UDP socket for use by the connection.
119 #[error("IO error: {0}")]
120 Io(#[from] io::Error),
121 /// The request is no longer being serviced. This could mean that the
122 /// request was dropped for an unspecified reason, or that the connection
123 /// was closed prematurely and it can no longer be serviced.
124 #[error("Driver dropped request")]
125 RecvResponse(#[from] oneshot::error::RecvError),
126 }
127
128 /// Common result type for working with a HTTP/3 connection
129 pub type Result<T> = std::result::Result<T, Error>;
130
131 impl Connection {
132 const MAX_PENDING_REQUESTS: usize = 10;
133 /// Create a new connection with a background task handling IO.
new( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut quiche::Config, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Self>134 pub async fn new(
135 info: &ServerInfo,
136 tag_socket: &SocketTagger,
137 config: &mut quiche::Config,
138 session: Option<Vec<u8>>,
139 cause: Cause,
140 ) -> Result<Self> {
141 let server_name = info.domain.as_deref();
142 let to = info.peer_addr;
143 let socket_mark = info.sk_mark;
144 let net_id = info.net_id;
145 let (request_tx, request_rx) = mpsc::channel(Self::MAX_PENDING_REQUESTS);
146 let (status_tx, status_rx) = watch::channel(Status::QUIC);
147 let scid = new_scid();
148 let socket = build_socket(to, socket_mark, tag_socket).await?;
149 let from = socket.local_addr()?;
150
151 let mut quiche_conn =
152 quiche::connect(server_name, &quiche::ConnectionId::from_ref(&scid), from, to, config)?;
153
154 // We will fall back to a full handshake if the session is expired.
155 if let Some(session) = session {
156 debug!("Setting session");
157 quiche_conn.set_session(&session)?;
158 }
159
160 let handshake_info = HandshakeInfo {
161 cause,
162 sent_bytes: 0,
163 recv_bytes: 0,
164 elapsed: 0,
165 quic_version: 0,
166 network_type: info.network_type,
167 private_dns_mode: info.private_dns_mode,
168 session_hit_checker: quiche_conn.session().is_some(),
169 };
170
171 let driver = async move {
172 let result =
173 drive(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).await;
174 if let Err(ref e) = result {
175 warn!("Connection driver returns some Err: {:?}", e);
176 }
177 result
178 };
179 task::spawn(driver);
180 Ok(Self { request_tx, status_rx })
181 }
182
183 /// Waits until we're either fully alive or dead
wait_for_live(&mut self) -> bool184 pub async fn wait_for_live(&mut self) -> bool {
185 // Once sc-mainline-prod updates to modern tokio, use
186 // borrow_and_update here.
187 match &*self.status_rx.borrow() {
188 Status::H3 => return true,
189 Status::Dead { .. } => return false,
190 Status::QUIC => (),
191 }
192 if self.status_rx.changed().await.is_err() {
193 // status_tx is gone, we're dead
194 return false;
195 }
196 if matches!(*self.status_rx.borrow(), Status::H3) {
197 return true;
198 }
199 // Since we're stuck on legacy tokio due to mainline, we need to try one more time in case there was an outstanding change notification. Using borrow_and_update avoids this.
200 match self.status_rx.changed().await {
201 // status_tx is gone, we're dead
202 Err(_) => false,
203 // If there's an HTTP/3 connection now we're alive, otherwise we're stuck/dead
204 _ => matches!(*self.status_rx.borrow(), Status::H3),
205 }
206 }
207
session(&self) -> Option<Vec<u8>>208 pub fn session(&self) -> Option<Vec<u8>> {
209 match &*self.status_rx.borrow() {
210 Status::Dead { session } => session.clone(),
211 _ => None,
212 }
213 }
214
215 /// Send a query, produce a future which will provide a response.
216 /// The future is separately returned rather than awaited to allow it to be waited on without
217 /// keeping the `Connection` itself borrowed.
query( &self, headers: Vec<h3::Header>, expiry: Option<BootTime>, ) -> Result<impl Future<Output = Option<Stream>>>218 pub async fn query(
219 &self,
220 headers: Vec<h3::Header>,
221 expiry: Option<BootTime>,
222 ) -> Result<impl Future<Output = Option<Stream>>> {
223 let (response_tx, response_rx) = oneshot::channel();
224 self.request_tx.send(Request { headers, response_tx, expiry }).await?;
225 Ok(async move { response_rx.await.ok() })
226 }
227 }
228