1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //! Provides a backing task to implement a network
18 
19 use crate::boot_time::{timeout, BootTime, Duration};
20 use crate::config::Config;
21 use crate::connection::driver::Cause;
22 use crate::connection::Connection;
23 use crate::dispatcher::{QueryError, Response};
24 use crate::encoding;
25 use anyhow::{anyhow, bail, Result};
26 use log::{debug, info};
27 use std::sync::Arc;
28 use tokio::sync::{mpsc, watch};
29 use tokio::task;
30 
31 use super::{Query, ServerInfo, SocketTagger, ValidationReporter};
32 
33 pub struct Driver {
34     info: ServerInfo,
35     config: Config,
36     connection: Connection,
37     command_rx: mpsc::Receiver<Command>,
38     status_tx: watch::Sender<Status>,
39     validation: ValidationReporter,
40     tag_socket: SocketTagger,
41 }
42 
43 #[derive(Debug)]
44 /// Requests the network can handle
45 pub enum Command {
46     /// Send a DNS query to the network
47     Query(Query),
48     /// Run a probe to check the health of the network. Argument is timeout.
49     Probe(Duration),
50 }
51 
52 #[derive(Clone, Debug)]
53 /// Current Network Status
54 ///
55 /// (Unprobed or Failed) can go to (Live or Failed) via Probe.
56 /// Currently, there is no way to go from Live to Failed - probing a live network will short-circuit to returning valid, and query failures do not declare the network failed.
57 pub enum Status {
58     /// Network has not been probed, it may or may not work
59     Unprobed,
60     /// Network is believed to be working
61     Live,
62     /// Network is broken, reason as argument
63     #[allow(dead_code)]
64     Failed(Arc<anyhow::Error>),
65 }
66 
67 impl Status {
is_live(&self) -> bool68     pub fn is_live(&self) -> bool {
69         matches!(self, Self::Live)
70     }
is_failed(&self) -> bool71     pub fn is_failed(&self) -> bool {
72         matches!(self, Self::Failed(_))
73     }
74 }
75 
build_connection( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut Config, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Connection>76 async fn build_connection(
77     info: &ServerInfo,
78     tag_socket: &SocketTagger,
79     config: &mut Config,
80     session: Option<Vec<u8>>,
81     cause: Cause,
82 ) -> Result<Connection> {
83     use std::ops::DerefMut;
84     Ok(Connection::new(info, tag_socket, config.take().await.deref_mut(), session, cause).await?)
85 }
86 
87 impl Driver {
88     const MAX_BUFFERED_COMMANDS: usize = 50;
89 
new( info: ServerInfo, mut config: Config, validation: ValidationReporter, tag_socket: SocketTagger, ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)>90     pub async fn new(
91         info: ServerInfo,
92         mut config: Config,
93         validation: ValidationReporter,
94         tag_socket: SocketTagger,
95     ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)> {
96         let (command_tx, command_rx) = mpsc::channel(Self::MAX_BUFFERED_COMMANDS);
97         let (status_tx, status_rx) = watch::channel(Status::Unprobed);
98         let connection =
99             build_connection(&info, &tag_socket, &mut config, None, Cause::Probe).await?;
100         Ok((
101             Self { info, config, connection, status_tx, command_rx, validation, tag_socket },
102             command_tx,
103             status_rx,
104         ))
105     }
106 
drive(mut self) -> Result<()>107     pub async fn drive(mut self) -> Result<()> {
108         while let Some(cmd) = self.command_rx.recv().await {
109             match cmd {
110                 Command::Probe(duration) => {
111                     if let Err(e) = self.probe(duration).await {
112                         self.status_tx.send(Status::Failed(Arc::new(e)))?
113                     }
114                 }
115                 Command::Query(query) => {
116                     if let Err(e) = self.send_query(query).await {
117                         info!("Unable to send query: {:?}", e)
118                     }
119                 }
120             };
121         }
122         Ok(())
123     }
124 
probe(&mut self, probe_timeout: Duration) -> Result<()>125     async fn probe(&mut self, probe_timeout: Duration) -> Result<()> {
126         if self.status_tx.borrow().is_failed() {
127             debug!("Network is currently failed, reconnecting");
128             // If our network is currently failed, it may be due to issues with the connection.
129             // Re-establish before re-probing
130             self.connection = build_connection(
131                 &self.info,
132                 &self.tag_socket,
133                 &mut self.config,
134                 None,
135                 Cause::Retry,
136             )
137             .await?;
138             self.status_tx.send(Status::Unprobed)?;
139         }
140         if self.status_tx.borrow().is_live() {
141             // If we're already validated, short circuit
142             (self.validation)(&self.info, true).await;
143             return Ok(());
144         }
145         self.force_probe(probe_timeout).await
146     }
147 
force_probe(&mut self, probe_timeout: Duration) -> Result<()>148     async fn force_probe(&mut self, probe_timeout: Duration) -> Result<()> {
149         info!("Sending probe to server {} on Network {}", self.info.peer_addr, self.info.net_id);
150         let probe = encoding::probe_query()?;
151         let dns_request = encoding::dns_request(&probe, &self.info.url)?;
152         let expiry = BootTime::now().checked_add(probe_timeout);
153         let request = async {
154             match self.connection.query(dns_request, expiry).await {
155                 Err(e) => self.status_tx.send(Status::Failed(Arc::new(anyhow!(e)))),
156                 Ok(rsp) => {
157                     if let Some(_stream) = rsp.await {
158                         // TODO verify stream contents
159                         self.status_tx.send(Status::Live)
160                     } else {
161                         self.status_tx.send(Status::Failed(Arc::new(anyhow!("Empty response"))))
162                     }
163                 }
164             }
165         };
166         match timeout(probe_timeout, request).await {
167             // Timed out
168             Err(time) => self.status_tx.send(Status::Failed(Arc::new(anyhow!(
169                 "Probe timed out after {:?} (timeout={:?})",
170                 time,
171                 probe_timeout
172             )))),
173             // Query completed
174             Ok(r) => r,
175         }?;
176         let valid = self.status_tx.borrow().is_live();
177         (self.validation)(&self.info, valid).await;
178         Ok(())
179     }
180 
send_query(&mut self, query: Query) -> Result<()>181     async fn send_query(&mut self, query: Query) -> Result<()> {
182         // If the associated receiver has been closed, meaning that the request has already
183         // timed out, just drop it. This check helps drain the channel quickly in the case
184         // where the network is stalled.
185         if query.response.is_closed() {
186             bail!("Abandoning expired DNS request")
187         }
188 
189         if !self.connection.wait_for_live().await {
190             let session =
191                 if self.info.use_session_resumption { self.connection.session() } else { None };
192             // Try reconnecting
193             self.connection = build_connection(
194                 &self.info,
195                 &self.tag_socket,
196                 &mut self.config,
197                 session,
198                 Cause::Reconnect,
199             )
200             .await?;
201         }
202         let request = encoding::dns_request(&query.query, &self.info.url)?;
203         let stream_fut = self.connection.query(request, Some(query.expiry)).await?;
204         task::spawn(async move {
205             let stream = match stream_fut.await {
206                 Some(stream) => stream,
207                 None => {
208                     info!("Connection died while processing request");
209                     // We don't care if the response is gone
210                     let _ =
211                         query.response.send(Response::Error { error: QueryError::ConnectionError });
212                     return;
213                 }
214             };
215             // We don't care if the response is gone.
216             let _ = if let Some(err) = stream.error {
217                 query.response.send(Response::Error { error: QueryError::Reset(err) })
218             } else {
219                 query.response.send(Response::Success { answer: stream.data })
220             };
221         });
222         Ok(())
223     }
224 }
225