0
0
mirror of https://github.com/signalapp/libsignal.git synced 2024-09-20 03:52:17 +02:00

libsignal-net: happy eyeballs

This commit is contained in:
Sergey Skrobotov 2024-01-24 16:08:17 -08:00
parent b92da3a15c
commit 40b0965dd6
14 changed files with 318 additions and 60 deletions

19
Cargo.lock generated
View File

@ -666,6 +666,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "const-str"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
[[package]]
name = "convert_case"
version = "0.4.0"
@ -1635,6 +1641,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.10"
@ -1885,6 +1900,7 @@ dependencies = [
"boring",
"bytes",
"clap",
"const-str",
"derive-where",
"displaydoc",
"env_logger",
@ -1896,6 +1912,7 @@ dependencies = [
"http 1.0.0",
"http-body-util",
"hyper 1.1.0",
"itertools 0.12.0",
"lazy_static",
"libsignal-core",
"libsignal-svr3",
@ -1958,7 +1975,7 @@ dependencies = [
"hkdf",
"hmac",
"indexmap 2.1.0",
"itertools 0.10.5",
"itertools 0.12.0",
"libsignal-core",
"log",
"num_enum",

View File

@ -45,7 +45,7 @@
<h2>Overview of licenses:</h2>
<ul class="licenses-overview">
<li><a href="#MIT">MIT License</a> (281)</li>
<li><a href="#MIT">MIT License</a> (282)</li>
<li><a href="#AGPL-3.0">GNU Affero General Public License v3.0</a> (24)</li>
<li><a href="#Apache-2.0">Apache License 2.0</a> (11)</li>
<li><a href="#BSD-3-Clause">BSD 3-Clause &quot;New&quot; or &quot;Revised&quot; License</a> (8)</li>
@ -3118,8 +3118,8 @@ THE SOFTWARE.
<h4>Used by:</h4>
<ul class="license-used-by">
<li><a href="https://github.com/bluss/either">either 1.9.0</a></li>
<li><a href="https://github.com/rust-itertools/itertools">itertools 0.10.5</a></li>
<li><a href="https://github.com/rust-itertools/itertools">itertools 0.11.0</a></li>
<li><a href="https://github.com/rust-itertools/itertools">itertools 0.12.0</a></li>
<li><a href="https://github.com/petgraph/petgraph">petgraph 0.6.4</a></li>
</ul>
<pre class="license-text">Copyright (c) 2015
@ -6057,6 +6057,7 @@ SOFTWARE.
<h4>Used by:</h4>
<ul class="license-used-by">
<li><a href="https://github.com/emk/cesu8-rs">cesu8 1.1.0</a></li>
<li><a href="https://github.com/Nugine/const-str">const-str 0.5.6</a></li>
<li><a href="https://github.com/dalek-cryptography/curve25519-dalek">curve25519-dalek-derive 0.1.0</a></li>
<li><a href="https://github.com/starkat99/half-rs">half 1.8.2</a></li>
<li><a href="https://crates.io/crates/pqcrypto-internals">pqcrypto-internals 0.2.5</a></li>

View File

@ -2918,7 +2918,7 @@ THE SOFTWARE.
```
## either 1.9.0, itertools 0.10.5, itertools 0.11.0, petgraph 0.6.4
## either 1.9.0, itertools 0.11.0, itertools 0.12.0, petgraph 0.6.4
```
Copyright (c) 2015
@ -5620,7 +5620,7 @@ SOFTWARE.
```
## cesu8 1.1.0, curve25519-dalek-derive 0.1.0, half 1.8.2, pqcrypto-internals 0.2.5, pqcrypto-kyber 0.7.9, pqcrypto-kyber 0.8.0, pqcrypto-traits 0.3.5, protobuf-parse 3.3.0, protobuf-support 3.3.0
## cesu8 1.1.0, const-str 0.5.6, curve25519-dalek-derive 0.1.0, half 1.8.2, pqcrypto-internals 0.2.5, pqcrypto-kyber 0.7.9, pqcrypto-kyber 0.8.0, pqcrypto-traits 0.3.5, protobuf-parse 3.3.0, protobuf-support 3.3.0
```
MIT License

View File

@ -3107,7 +3107,7 @@ DEALINGS IN THE SOFTWARE.
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>either 1.9.0, itertools 0.10.5, itertools 0.11.0, petgraph 0.6.4</string>
<string>either 1.9.0, itertools 0.11.0, itertools 0.12.0, petgraph 0.6.4</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
@ -6165,7 +6165,7 @@ THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRES
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>cesu8 1.1.0, curve25519-dalek-derive 0.1.0, half 1.8.2, pqcrypto-internals 0.2.5, pqcrypto-kyber 0.7.9, pqcrypto-kyber 0.8.0, pqcrypto-traits 0.3.5, protobuf-parse 3.3.0, protobuf-support 3.3.0</string>
<string>cesu8 1.1.0, const-str 0.5.6, curve25519-dalek-derive 0.1.0, half 1.8.2, pqcrypto-internals 0.2.5, pqcrypto-kyber 0.7.9, pqcrypto-kyber 0.8.0, pqcrypto-traits 0.3.5, protobuf-parse 3.3.0, protobuf-support 3.3.0</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>

View File

@ -5,6 +5,7 @@
use std::convert::TryInto as _;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use libsignal_bridge_macros::{bridge_fn, bridge_io};
@ -69,6 +70,7 @@ impl Environment {
}
fn cdsi_fallback_connection_params(self) -> Vec<ConnectionParams> {
let dns_resolver = Arc::new(DnsResolver::default());
match self {
Environment::Prod => vec![
ConnectionParams {
@ -77,7 +79,7 @@ impl Environment {
port: 443,
http_request_decorator: HttpRequestDecorator::PathPrefix("/service").into(),
certs: RootCertificates::Native,
dns_resolver: DnsResolver::System,
dns_resolver: dns_resolver.clone(),
},
ConnectionParams {
sni: "pintrest.com".into(),
@ -85,7 +87,7 @@ impl Environment {
port: 443,
http_request_decorator: HttpRequestDecoratorSeq::default(),
certs: RootCertificates::Native,
dns_resolver: DnsResolver::System,
dns_resolver: dns_resolver.clone(),
},
],
Environment::Staging => vec![ConnectionParams {
@ -94,7 +96,7 @@ impl Environment {
port: 443,
http_request_decorator: HttpRequestDecorator::PathPrefix("/service-staging").into(),
certs: RootCertificates::Native,
dns_resolver: DnsResolver::System,
dns_resolver,
}],
}
}

View File

@ -13,6 +13,7 @@ base64 = "0.21"
bincode = "1.0"
boring = { git = "https://github.com/signalapp/boring", branch = "libsignal" }
bytes = "1.4.0"
const-str = { version = "0.5.6", features = ["std"] }
derive-where = "1.2.7"
displaydoc = "0.2"
futures-util = "0.3.7"
@ -23,6 +24,7 @@ hmac = "0.12"
http = "1.0.0"
http-body-util = "0.1.0-rc.3"
hyper = { version = "1.0.0-rc.4", features = ["http1", "http2", "client"] }
itertools = "0.12.0"
lazy_static = "1.4.0"
libsignal-core = { path = "../core" }
log = "0.4.19"

View File

@ -397,7 +397,7 @@ pub(crate) mod test {
443,
Default::default(),
RootCertificates::Signal,
DnsResolver::System,
DnsResolver::default().into(),
);
SingleRouteThrottlingConnectionManager::new(connection_params, TIMEOUT_DURATION)
}

View File

@ -6,19 +6,21 @@
use std::str::FromStr;
use std::string::ToString;
use std::sync::Arc;
use std::time::Duration;
use ::http::uri::PathAndQuery;
use ::http::Uri;
use async_trait::async_trait;
use boring::ssl::{SslConnector, SslConnectorBuilder, SslMethod};
use futures_util::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_boring::SslStream;
use crate::infra::certs::RootCertificates;
use crate::infra::dns::DnsResolver;
use crate::infra::dns::DnsResolver::System;
use crate::infra::errors::NetError;
use crate::utils::first_ok;
pub mod certs;
pub mod connection_manager;
@ -30,6 +32,8 @@ pub(crate) mod tokio_executor;
pub(crate) mod tokio_io;
pub(crate) mod ws;
const CONNECTION_ATTEMPT_DELAY: Duration = Duration::from_millis(200);
/// A collection of commonly used decorators for HTTP requests.
#[derive(Clone, Debug)]
pub enum HttpRequestDecorator {
@ -69,7 +73,7 @@ pub struct ConnectionParams {
pub port: u16,
pub http_request_decorator: HttpRequestDecoratorSeq,
pub certs: RootCertificates,
pub dns_resolver: DnsResolver,
pub dns_resolver: Arc<DnsResolver>,
}
impl ConnectionParams {
@ -80,7 +84,7 @@ impl ConnectionParams {
port: u16,
http_request_decorator: HttpRequestDecoratorSeq,
certs: RootCertificates,
dns_resolver: DnsResolver,
dns_resolver: Arc<DnsResolver>,
) -> Self {
Self {
sni: Arc::from(sni),
@ -111,7 +115,7 @@ impl ConnectionParams {
port: 443,
http_request_decorator: Default::default(),
certs: RootCertificates::Signal,
dns_resolver: System,
dns_resolver: DnsResolver::default().into(),
}
}
}
@ -215,13 +219,36 @@ pub(crate) async fn connect_tcp(
.lookup_ip(host)
.await
.map_err(|_| NetError::DnsError)?;
for ip in dns_lookup.iter() {
match TcpStream::connect((*ip, port)).await {
Ok(tcp_stream) => return Ok(tcp_stream),
Err(_) => continue,
}
if dns_lookup.is_empty() {
return Err(NetError::DnsError);
}
Err(NetError::TcpConnectionFailed)
// The idea is to go through the list of candidate IP addresses
// and to attempt a connection to each of them, giving each one a `CONNECTION_ATTEMPT_DELAY` headstart
// before moving on to the next candidate.
// The process stops once we have a successful connection.
// First, for each resolved IP address, constructing a future
// that incorporates the delay based on its position in the list.
// This way we can start all futures at once and simply wait for the first one to complete successfully.
let staggered_futures = dns_lookup.into_iter().enumerate().map(|(idx, ip)| {
let delay = CONNECTION_ATTEMPT_DELAY * idx.try_into().unwrap();
async move {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
TcpStream::connect((ip, port))
.inspect_err(|e| {
log::debug!("failed to connect to IP [{}] with an error: {:?}", ip, e)
})
.await
}
});
first_ok(staggered_futures)
.await
.ok_or(NetError::TcpConnectionFailed)
}
#[cfg(test)]

View File

@ -500,7 +500,7 @@ mod test {
443,
HttpRequestDecoratorSeq::default(),
RootCertificates::Signal,
DnsResolver::System,
DnsResolver::default().into(),
)
}
}

View File

@ -3,12 +3,18 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
use std::sync::Arc;
use const_str::ip_addr;
use itertools::{Either, Itertools};
use std::collections::HashMap;
use std::iter::Map;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use std::vec::IntoIter;
use futures_util::future::BoxFuture;
use crate::utils;
pub type LookupResult = Vec<IpAddr>;
const RESOLUTION_TIMEOUT: Duration = Duration::from_secs(1);
const SIGNAL_DOMAIN_SUFFIX: &str = ".signal.org";
#[derive(displaydoc::Display, Debug, thiserror::Error)]
pub enum Error {
@ -16,39 +22,186 @@ pub enum Error {
LookupFailed,
}
pub type ResolveFn = fn(&str) -> BoxFuture<Result<LookupResult, Error>>;
#[derive(Clone, Debug)]
pub enum DnsResolver {
Static,
System,
GenericAsync(Arc<ResolveFn>),
#[derive(Debug, Default, Clone)]
pub struct LookupResult {
ipv4: Vec<Ipv4Addr>,
ipv6: Vec<Ipv6Addr>,
}
impl DnsResolver {
pub async fn lookup_ip(&self, host: &str) -> Result<LookupResult, Error> {
match self {
DnsResolver::Static => match host {
"chat.staging.signal.org" => Ok(vec![
IpAddr::V4(Ipv4Addr::new(76, 223, 72, 142)),
IpAddr::V4(Ipv4Addr::new(3, 248, 206, 115)),
]),
"cdsi.staging.signal.org" => Ok(vec![IpAddr::V4(Ipv4Addr::new(104, 43, 162, 137))]),
"chat.signal.org" => Ok(vec![
IpAddr::V4(Ipv4Addr::new(76, 223, 92, 165)),
IpAddr::V4(Ipv4Addr::new(13, 248, 212, 111)),
]),
"cdsi.signal.org" => Ok(vec![IpAddr::V4(Ipv4Addr::new(40, 122, 45, 194))]),
_ => Err(Error::LookupFailed),
},
DnsResolver::GenericAsync(async_resolver) => async_resolver(host).await,
DnsResolver::System => format!("{}:443", host)
.to_socket_addrs()
.map(|addrs| {
let c: Vec<IpAddr> = addrs.map(|sa| sa.ip()).collect();
c
})
.map_err(|_| Error::LookupFailed),
impl IntoIterator for LookupResult {
type Item = IpAddr;
type IntoIter = itertools::Interleave<
Map<IntoIter<Ipv6Addr>, fn(Ipv6Addr) -> IpAddr>,
Map<IntoIter<Ipv4Addr>, fn(Ipv4Addr) -> IpAddr>,
>;
fn into_iter(self) -> Self::IntoIter {
let v6_into_ipaddr: fn(Ipv6Addr) -> IpAddr = IpAddr::V6;
let v4_into_ipaddr: fn(Ipv4Addr) -> IpAddr = IpAddr::V4;
itertools::interleave(
self.ipv6.into_iter().map(v6_into_ipaddr),
self.ipv4.into_iter().map(v4_into_ipaddr),
)
}
}
impl LookupResult {
pub fn new(ipv4: Vec<Ipv4Addr>, ipv6: Vec<Ipv6Addr>) -> Self {
Self { ipv4, ipv6 }
}
pub(crate) fn is_empty(&self) -> bool {
self.ipv4.is_empty() && self.ipv6.is_empty()
}
}
#[derive(Debug)]
pub struct DnsResolver {
static_map: HashMap<&'static str, LookupResult>,
}
impl Default for DnsResolver {
fn default() -> Self {
Self {
static_map: static_dns_map(),
}
}
}
impl DnsResolver {
pub fn new_with_static_fallback(static_map: HashMap<&'static str, LookupResult>) -> Self {
Self { static_map }
}
pub async fn lookup_ip(&self, hostname: &str) -> Result<LookupResult, Error> {
utils::timeout(
RESOLUTION_TIMEOUT,
Error::LookupFailed,
self.dns_lookup(hostname),
)
.await
.or_else(|e| {
if hostname.ends_with(SIGNAL_DOMAIN_SUFFIX) {
log::warn!(
"DNS lookup failed for [{}], falling back to static map. Error: {:?}",
hostname,
e
)
}
self.static_map
.get(hostname)
.ok_or(Error::LookupFailed)
.cloned()
})
}
async fn dns_lookup<'a>(&self, hostname: &'a str) -> Result<LookupResult, Error> {
let lookup_result = tokio::net::lookup_host((hostname, 443))
.await
.map_err(|_| Error::LookupFailed)?;
let (ipv4s, ipv6s): (Vec<_>, Vec<_>) =
lookup_result.into_iter().partition_map(|ip| match ip {
SocketAddr::V4(v4) => Either::Left(*v4.ip()),
SocketAddr::V6(v6) => Either::Right(*v6.ip()),
});
match LookupResult::new(ipv4s, ipv6s) {
lookup_result if !lookup_result.is_empty() => Ok(lookup_result),
_ => Err(Error::LookupFailed),
}
}
}
pub(crate) fn static_dns_map() -> HashMap<&'static str, LookupResult> {
HashMap::from([
(
"chat.staging.signal.org",
LookupResult::new(
vec![ip_addr!(v4, "76.223.72.142"), ip_addr!(v4, "3.248.206.115")],
vec![],
),
),
(
"cdsi.staging.signal.org",
LookupResult::new(vec![ip_addr!(v4, "104.43.162.137")], vec![]),
),
(
"chat.signal.org",
LookupResult::new(
vec![
ip_addr!(v4, "76.223.92.165"),
ip_addr!(v4, "13.248.212.111"),
],
vec![],
),
),
(
"cdsi.signal.org",
LookupResult::new(vec![ip_addr!(v4, "40.122.45.194")], vec![]),
),
])
}
#[cfg(test)]
mod test {
use crate::infra::dns::LookupResult;
use const_str::ip_addr;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[test]
fn lookup_result_iterates_in_the_right_order() {
let ipv4_1 = ip_addr!(v4, "1.1.1.1");
let ipv4_2 = ip_addr!(v4, "2.2.2.2");
let ipv4_3 = ip_addr!(v4, "3.3.3.3");
let ipv6_1 = ip_addr!(v6, "::1");
let ipv6_2 = ip_addr!(v6, "::2");
let ipv6_3 = ip_addr!(v6, "::3");
validate_expected_order(
vec![ipv4_1, ipv4_2, ipv4_3],
vec![ipv6_1, ipv6_2, ipv6_3],
vec![
IpAddr::V6(ipv6_1),
IpAddr::V4(ipv4_1),
IpAddr::V6(ipv6_2),
IpAddr::V4(ipv4_2),
IpAddr::V6(ipv6_3),
IpAddr::V4(ipv4_3),
],
);
validate_expected_order(
vec![ipv4_1],
vec![ipv6_1, ipv6_2, ipv6_3],
vec![
IpAddr::V6(ipv6_1),
IpAddr::V4(ipv4_1),
IpAddr::V6(ipv6_2),
IpAddr::V6(ipv6_3),
],
);
validate_expected_order(
vec![ipv4_1, ipv4_2, ipv4_3],
vec![ipv6_1],
vec![
IpAddr::V6(ipv6_1),
IpAddr::V4(ipv4_1),
IpAddr::V4(ipv4_2),
IpAddr::V4(ipv4_3),
],
);
validate_expected_order(
vec![ipv4_1, ipv4_2, ipv4_3],
vec![],
vec![IpAddr::V4(ipv4_1), IpAddr::V4(ipv4_2), IpAddr::V4(ipv4_3)],
);
}
fn validate_expected_order(ipv4s: Vec<Ipv4Addr>, ipv6s: Vec<Ipv6Addr>, expected: Vec<IpAddr>) {
let lookup_result = LookupResult::new(ipv4s, ipv6s);
let actual: Vec<IpAddr> = lookup_result.into_iter().collect();
assert_eq!(expected, actual);
}
}

View File

@ -155,7 +155,7 @@ mod test {
port: FAKE_PORT,
http_request_decorator: Default::default(),
certs: crate::infra::certs::RootCertificates::Native,
dns_resolver: crate::infra::dns::DnsResolver::Static
dns_resolver: crate::infra::dns::DnsResolver::default().into()
};
}

View File

@ -401,7 +401,7 @@ mod test {
443,
HttpRequestDecoratorSeq::default(),
RootCertificates::Signal,
DnsResolver::System,
DnsResolver::default().into(),
)
}

View File

@ -4,6 +4,9 @@
//
use base64::prelude::{Engine as _, BASE64_STANDARD};
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::future;
use std::future::Future;
use std::time::Duration;
@ -29,3 +32,56 @@ where
Err(_) => Err(timeout_error),
}
}
/// Takes a series of `Future` objects that all return a `Result<T, E>`
/// and returns when the first of them completes successfully.
///
/// Errors from the failed futures are deliberately ignored by this helper method.
/// If error processing is needed, the caller should pass futures that inspect their errors.
pub async fn first_ok<T, E, F, I>(futures: I) -> Option<T>
where
F: Future<Output = Result<T, E>>,
I: IntoIterator<Item = F>,
{
FuturesUnordered::from_iter(futures)
.filter_map(|result| future::ready(result.ok()))
.next()
.await
}
#[cfg(test)]
mod test {
use crate::utils::first_ok;
use std::time::Duration;
#[tokio::test(start_paused = true)]
async fn first_ok_picks_the_result_from_earliest_finished_future() {
let future_1 = future(30, Ok(1));
let future_2 = future(10, Ok(2));
let future_3 = future(20, Ok(3));
let result = first_ok(vec![future_1, future_2, future_3]).await.unwrap();
assert_eq!(2, result);
}
#[tokio::test(start_paused = true)]
async fn first_ok_ignores_failed_futures() {
let future_1 = future(30, Ok(1));
let future_2 = future(10, Err("error"));
let future_3 = future(20, Ok(3));
let result = first_ok(vec![future_1, future_2, future_3]).await.unwrap();
assert_eq!(3, result);
}
#[tokio::test(start_paused = true)]
async fn first_ok_returns_none_if_all_failed() {
let future_1 = future(30, Err("error 1"));
let future_2 = future(10, Err("error 2"));
let future_3 = future(20, Err("error 3"));
assert!(first_ok(vec![future_1, future_2, future_3]).await.is_none())
}
async fn future(delay: u64, result: Result<u32, &str>) -> Result<u32, &str> {
tokio::time::sleep(Duration::from_millis(delay)).await;
result
}
}

View File

@ -23,7 +23,7 @@ derive-where = "1.2.5"
hkdf = "0.12"
hmac = "0.12"
indexmap = "2.1.0"
itertools = "0.10.5"
itertools = "0.12.0"
prost = "0.12"
rand = "0.8"
rayon = "1.8.0"