0
0
mirror of https://github.com/signalapp/libsignal.git synced 2024-09-19 19:42:19 +02:00

svr3: Make partial migration easier and add a test

This commit is contained in:
moiseev-signal 2024-07-15 16:27:16 -07:00 committed by GitHub
parent eea07a5638
commit 854343294d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 853 additions and 268 deletions

View File

@ -12,9 +12,8 @@ use rand::rngs::OsRng;
use libsignal_bridge_macros::{bridge_fn, bridge_io};
use libsignal_bridge_types::net::Svr3Clients;
use libsignal_net::auth::Auth;
use libsignal_net::svr3::{
self, migrate_backup, restore_with_fallback, OpaqueMaskedShareSet, Svr3Client as _,
};
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::{self, migrate_backup, restore_with_fallback, OpaqueMaskedShareSet};
pub use libsignal_bridge_types::net::{ConnectionManager, Environment, TokioAsyncContext};
@ -108,7 +107,7 @@ async fn Svr3Migrate(
let clients = Svr3Clients::new(connection_manager, username, enclave_password);
let share_set = migrate_backup(
(clients.previous, clients.current),
(&clients.previous, &clients.current),
&password,
secret,
max_tries.into_inner(),
@ -134,7 +133,7 @@ async fn Svr3Restore(
// `DataMissing` error, similarly to how the actual migrated-from environment
// would.
let restored_secret = restore_with_fallback(
(clients.current, clients.previous),
(&clients.current, &clients.previous),
&password,
share_set,
&mut rng,

View File

@ -20,9 +20,8 @@ use libsignal_net::infra::tcp_ssl::{
};
use libsignal_net::infra::{make_ws_config, EndpointConnection};
use libsignal_net::svr::SvrConnection;
use libsignal_net::svr3::{
Error, OpaqueMaskedShareSet, Svr3Client as Svr3ClientTrait, Svr3Connect,
};
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::{Error, OpaqueMaskedShareSet};
use libsignal_net::timeouts::ONE_ROUTE_CONNECTION_TIMEOUT;
use libsignal_svr3::EvaluationResult;
use std::marker::PhantomData;
@ -219,7 +218,7 @@ impl<'a> Svr3Connect for Svr3Client<'a, CurrentVersion> {
}
#[async_trait]
impl<'a> Svr3ClientTrait for Svr3Client<'a, PreviousVersion> {
impl<'a> Backup for Svr3Client<'a, PreviousVersion> {
async fn backup(
&self,
_password: &str,
@ -229,7 +228,10 @@ impl<'a> Svr3ClientTrait for Svr3Client<'a, PreviousVersion> {
) -> Result<OpaqueMaskedShareSet, Error> {
empty_env::backup().await
}
}
#[async_trait]
impl<'a> Restore for Svr3Client<'a, PreviousVersion> {
async fn restore(
&self,
_password: &str,
@ -238,11 +240,17 @@ impl<'a> Svr3ClientTrait for Svr3Client<'a, PreviousVersion> {
) -> Result<EvaluationResult, Error> {
empty_env::restore().await
}
}
#[async_trait]
impl<'a> Remove for Svr3Client<'a, PreviousVersion> {
async fn remove(&self) -> Result<(), Error> {
empty_env::remove().await
}
}
#[async_trait]
impl<'a> Query for Svr3Client<'a, PreviousVersion> {
async fn query(&self) -> Result<u32, Error> {
empty_env::query().await
}
@ -251,7 +259,7 @@ impl<'a> Svr3ClientTrait for Svr3Client<'a, PreviousVersion> {
// These functions define the behavior of the empty `PreviousVersion`
// when there is no migration going on.
// When there _is_ migration both current and previous clients should instead
// implement `Svr3Connect` and use the blanket implementation of `Svr3Client`.
// implement `Svr3Connect` and use the blanket implementations of the traits.
mod empty_env {
use super::*;

View File

@ -22,9 +22,8 @@ use libsignal_net::enclave::PpssSetup;
use libsignal_net::env::Svr3Env;
use libsignal_net::infra::tcp_ssl::DirectConnector;
use libsignal_net::infra::TransportConnector;
use libsignal_net::svr3::{
simple_svr3_connect, Error, OpaqueMaskedShareSet, Svr3Client as _, Svr3Connect,
};
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::{Error, OpaqueMaskedShareSet};
#[derive(Parser, Debug)]
struct Args {
@ -51,7 +50,7 @@ impl Svr3Connect for Svr3Client {
async fn connect(
&self,
) -> Result<<Svr3Env as PpssSetup<Stream>>::Connections, libsignal_net::enclave::Error> {
simple_svr3_connect(&self.env, &self.auth).await
self.env.connect_directly(&self.auth).await
}
}

View File

@ -31,7 +31,8 @@ use libsignal_net::infra::dns::DnsResolver;
use libsignal_net::infra::tcp_ssl::DirectConnector as TcpSslTransportConnector;
use libsignal_net::infra::TransportConnector;
use libsignal_net::svr::SvrConnection;
use libsignal_net::svr3::{OpaqueMaskedShareSet, Svr3Client as _, Svr3Connect};
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::OpaqueMaskedShareSet;
const TEST_SERVER_CERT: RootCertificates = RootCertificates::FromDer(Cow::Borrowed(
include_bytes!("../res/sgx_test_server_cert.cer"),

View File

@ -17,9 +17,8 @@ use libsignal_net::auth::Auth;
use libsignal_net::enclave::{self, PpssSetup};
use libsignal_net::env::Svr3Env;
use libsignal_net::infra::ws::DefaultStream;
use libsignal_net::svr3::{
simple_svr3_connect, Error, OpaqueMaskedShareSet, Svr3Client, Svr3Connect,
};
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::{Error, OpaqueMaskedShareSet};
use libsignal_svr3::EvaluationResult;
use support::*;
@ -326,7 +325,7 @@ impl Svr3Connect for Client<'_> {
log::info!("💤 to avoid throttling...");
tokio::time::sleep(duration).await;
}
simple_svr3_connect(self.env, &self.auth).await
self.env.connect_directly(&self.auth).await
}
}

View File

@ -44,7 +44,7 @@ impl<'a> AsRaftConfig<'a> for &'a RaftConfig {
}
pub trait EnclaveKind {
type RaftConfigType: AsRaftConfig<'static> + Clone + Sync;
type RaftConfigType: AsRaftConfig<'static> + Clone + Sync + Send;
fn url_path(enclave: &[u8]) -> PathAndQuery;
}

View File

@ -393,17 +393,17 @@ pub struct Svr3Env<'a>(
impl<'a> Svr3Env<'a> {
#[inline]
pub fn sgx(&self) -> &EnclaveEndpoint<'a, Sgx> {
pub const fn sgx(&self) -> &EnclaveEndpoint<'a, Sgx> {
&self.0
}
#[inline]
pub fn nitro(&self) -> &EnclaveEndpoint<'a, Nitro> {
pub const fn nitro(&self) -> &EnclaveEndpoint<'a, Nitro> {
&self.1
}
#[inline]
pub fn tpm2snp(&self) -> &EnclaveEndpoint<'a, Tpm2Snp> {
pub const fn tpm2snp(&self) -> &EnclaveEndpoint<'a, Tpm2Snp> {
&self.2
}
}

View File

@ -51,7 +51,7 @@ pub struct TcpSslConnectorStream(
>,
);
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct DirectConnector {
pub dns_resolver: DnsResolver,
}

View File

@ -3,28 +3,27 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
use std::num::NonZeroU32;
use std::time::Duration;
use async_trait::async_trait;
use bincode::Options as _;
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use libsignal_svr3::{EvaluationResult, MaskedShareSet};
use crate::auth::Auth;
use crate::enclave::{self, EnclaveEndpointConnection, Nitro, PpssSetup, Sgx, Tpm2Snp};
use crate::enclave::{self, PpssSetup};
use crate::env::Svr3Env;
use crate::infra::dns::DnsResolver;
use crate::infra::errors::LogSafeDisplay;
use crate::infra::tcp_ssl::DirectConnector;
use crate::infra::ws::{
AttestedConnectionError, DefaultStream, WebSocketConnectError, WebSocketServiceError,
};
use crate::infra::AsyncDuplexStream;
use crate::svr::SvrConnection;
use bincode::Options as _;
use direct::DirectConnect;
use libsignal_svr3::{EvaluationResult, MaskedShareSet};
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU32;
use thiserror::Error;
mod ppss_ops;
pub mod direct;
pub mod traits;
use traits::*;
const MASKED_SHARE_SET_FORMAT: u8 = 0;
@ -206,184 +205,6 @@ impl From<AttestedConnectionError> for Error {
}
}
/// High level data operations on instances of `PpssSetup`
///
/// These functions are useful if we ever want to perform multiple operations
/// on the same set of open connections, as opposed to having to connect for
/// each individual operation, as implied by `Svr3Client` trait.
mod ppss_ops {
use super::{Error, OpaqueMaskedShareSet};
use crate::enclave::{IntoConnections, PpssSetup};
use crate::infra::ws::{run_attested_interaction, NextOrClose};
use crate::infra::AsyncDuplexStream;
use futures_util::future::try_join_all;
use libsignal_svr3::{Backup, EvaluationResult, Query, Restore};
use rand_core::CryptoRngCore;
use std::num::NonZeroU32;
pub async fn do_backup<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error> {
let server_ids = Env::server_ids();
let backup = Backup::new(server_ids.as_ref(), password, secret, max_tries, rng)?;
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(&backup.requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results, addresses)?;
let share_set = backup.finalize(rng, &responses)?;
Ok(OpaqueMaskedShareSet::new(share_set))
}
pub async fn do_restore<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error> {
let restore = Restore::new(password, share_set.into_inner(), rng)?;
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(&restore.requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results, addresses)?;
Ok(restore.finalize(&responses)?)
}
pub async fn do_remove<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
) -> Result<(), Error> {
let requests = std::iter::repeat(libsignal_svr3::make_remove_request());
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
// RemoveResponse's are empty, safe to ignore as long as they came
let _responses = collect_responses(results, addresses)?;
Ok(())
}
pub async fn do_query<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
) -> Result<u32, Error> {
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(Query::requests())
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results, addresses)?;
Ok(Query::finalize(&responses)?)
}
fn collect_responses<'a>(
results: impl IntoIterator<Item = NextOrClose<Vec<u8>>>,
addresses: impl IntoIterator<Item = &'a url::Host>,
) -> Result<Vec<Vec<u8>>, Error> {
results
.into_iter()
.zip(addresses)
.map(|(next_or_close, address)| {
next_or_close.next_or(Error::Protocol(format!("no response from {}", address)))
})
.collect()
}
}
#[async_trait]
pub trait Svr3Client {
async fn backup(
&self,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error>;
async fn restore(
&self,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error>;
async fn remove(&self) -> Result<(), Error>;
async fn query(&self) -> Result<u32, Error>;
}
#[async_trait]
pub trait Svr3Connect {
// Stream is needed for the blanket implementation,
// otherwise S would be an unconstrained generic parameter.
type Stream;
type Env: PpssSetup<Self::Stream>;
async fn connect(
&self,
) -> Result<<Self::Env as PpssSetup<Self::Stream>>::Connections, enclave::Error>;
}
#[async_trait]
impl<T> Svr3Client for T
where
T: Svr3Connect + Sync,
T::Stream: AsyncDuplexStream + 'static,
{
async fn backup(
&self,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error> {
ppss_ops::do_backup::<T::Stream, T::Env>(
self.connect().await?,
password,
secret,
max_tries,
rng,
)
.await
}
async fn restore(
&self,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error> {
ppss_ops::do_restore::<T::Stream, T::Env>(self.connect().await?, password, share_set, rng)
.await
}
async fn remove(&self) -> Result<(), Error> {
ppss_ops::do_remove::<T::Stream, T::Env>(self.connect().await?).await
}
async fn query(&self) -> Result<u32, Error> {
ppss_ops::do_query::<T::Stream, T::Env>(self.connect().await?).await
}
}
/// Attempt a restore from a pair of SVR3 instances.
///
/// The function is meant to be used in the registration flow, when the client
@ -398,14 +219,14 @@ where
/// respectively, "next" and "current", but ordering of parameters and actions in
/// the body of the function make "primary" and "fallback" a better fit.
pub async fn restore_with_fallback<Primary, Fallback>(
clients: (Primary, Fallback),
clients: (&Primary, &Fallback),
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error>
where
Primary: Svr3Client + Sync,
Fallback: Svr3Client + Sync,
Primary: Restore + Sync,
Fallback: Restore + Sync,
{
let (primary_conn, fallback_conn) = clients;
@ -416,33 +237,37 @@ where
fallback_conn.restore(password, share_set, rng).await
}
/// Move the backup from `From` to `To`, representing current and next SVR3
/// environments, respectively.
/// Move the backup from `RemoveFrom` to `BackupTo`, representing previous and
/// current SVR3 environments, respectively.
///
/// Despite the name, no data is _read_ from `From`, and instead must be
/// provided by the caller just like for an ordinary `backup` call.
/// No data is _read_ from `RemoveFrom` (types guarantee that), and instead must
/// be provided by the caller just like for an ordinary `backup` call.
///
/// Moving includes _attempting_ deletion from `From` that can fail, in which
/// case the error will be ignored. The other alternative implementations could
/// be:
/// - Do not attempt deleting from `From`.
/// Moving includes _attempting_ deletion from `RemoveFrom` that can fail, in
/// which case the error will be ignored. The other alternative implementations
/// could be:
/// - Do not attempt deleting from `RemoveFrom`.
/// This would leave the data for harvesting longer than necessary, even
/// though the migration period is expected to be relatively short, and the
/// set of `From` enclaves would have been deleted in the end.
/// - Ignore the successful write to `To`.
/// set of `RemoveFrom` enclaves would have been deleted in the end.
/// - Ignore the successful write to `BackupTo`.
/// Despite sounding like a better option, it would make `restore_with_fallback`
/// more complicated, as the data may have been written to `To`, thus
/// more complicated, as the data may have been written to `BackupTo`, thus
/// rendering it impossible to be used for all restores unconditionally.
pub async fn migrate_backup<From, To>(
clients: (From, To),
///
/// Using fine-grained SVR3 traits `Remove` and `Backup` guarantees that only
/// those operations will possibly happen, that is, no removes will happen from
/// `BackupTo` client, and no backups to `RemoveFrom`.
pub async fn migrate_backup<RemoveFrom, BackupTo>(
clients: (&RemoveFrom, &BackupTo),
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error>
where
From: Svr3Client + Sync,
To: Svr3Client + Sync,
RemoveFrom: Remove + Sync,
BackupTo: Backup + Sync,
{
let (from_client, to_client) = clients;
let share_set = to_client.backup(password, secret, max_tries, rng).await?;
@ -450,26 +275,16 @@ where
Ok(share_set)
}
/// Simplest way to connect to an SVR3 Environment in integration tests, command
/// line tools, and examples.
pub async fn simple_svr3_connect(
env: &Svr3Env<'static>,
auth: &Auth,
) -> Result<<Svr3Env<'static> as PpssSetup<DefaultStream>>::Connections, enclave::Error> {
let connector = DirectConnector::new(DnsResolver::default());
let sgx_connection = EnclaveEndpointConnection::new(env.sgx(), Duration::from_secs(10));
let a =
SvrConnection::<Sgx, _>::connect(auth.clone(), &sgx_connection, connector.clone()).await?;
let nitro_connection = EnclaveEndpointConnection::new(env.nitro(), Duration::from_secs(10));
let b = SvrConnection::<Nitro, _>::connect(auth.clone(), &nitro_connection, connector.clone())
.await?;
let tpm2snp_connection = EnclaveEndpointConnection::new(env.tpm2snp(), Duration::from_secs(10));
let c =
SvrConnection::<Tpm2Snp, _>::connect(auth.clone(), &tpm2snp_connection, connector).await?;
Ok((a, b, c))
impl Svr3Env<'static> {
/// Simplest way to connect to an SVR3 Environment in integration tests, command
/// line tools, and examples.
pub async fn connect_directly(
&self,
auth: &Auth,
) -> Result<<Self as PpssSetup<DefaultStream>>::Connections, enclave::Error> {
let endpoints = (self.sgx(), self.nitro(), self.tpm2snp());
endpoints.connect(auth).await
}
}
#[cfg(test)]
@ -477,6 +292,7 @@ mod test {
use super::*;
use assert_matches::assert_matches;
use async_trait::async_trait;
use nonzero_ext::nonzero;
use rand_core::{OsRng, RngCore};
@ -534,7 +350,7 @@ mod test {
}
#[async_trait]
impl Svr3Client for TestSvr3Client {
impl Backup for TestSvr3Client {
async fn backup(
&self,
_password: &str,
@ -544,7 +360,17 @@ mod test {
) -> Result<OpaqueMaskedShareSet, Error> {
(self.backup_fn)()
}
}
#[async_trait]
impl Remove for TestSvr3Client {
async fn remove(&self) -> Result<(), Error> {
(self.remove_fn)()
}
}
#[async_trait]
impl Restore for TestSvr3Client {
async fn restore(
&self,
_password: &str,
@ -553,11 +379,10 @@ mod test {
) -> Result<EvaluationResult, Error> {
(self.restore_fn)()
}
}
async fn remove(&self) -> Result<(), Error> {
(self.remove_fn)()
}
#[async_trait]
impl Query for TestSvr3Client {
async fn query(&self) -> Result<u32, Error> {
unreachable!()
}
@ -590,7 +415,7 @@ mod test {
let mut rng = OsRng;
let result =
restore_with_fallback((primary, fallback), "", new_empty_share_set(), &mut rng).await;
restore_with_fallback((&primary, &fallback), "", new_empty_share_set(), &mut rng).await;
assert_matches!(result, Ok(evaluation_result) => assert_eq!(evaluation_result, test_evaluation_result()));
}
@ -607,7 +432,7 @@ mod test {
let mut rng = OsRng;
let result =
restore_with_fallback((primary, fallback), "", new_empty_share_set(), &mut rng).await;
restore_with_fallback((&primary, &fallback), "", new_empty_share_set(), &mut rng).await;
assert_matches!(result, Err(Error::ConnectionTimedOut));
}
@ -623,7 +448,7 @@ mod test {
};
let mut rng = OsRng;
let result =
restore_with_fallback((primary, fallback), "", new_empty_share_set(), &mut rng).await;
restore_with_fallback((&primary, &fallback), "", new_empty_share_set(), &mut rng).await;
assert_matches!(result, Err(Error::RestoreFailed(31415)));
}
@ -639,7 +464,7 @@ mod test {
};
let mut rng = OsRng;
let result =
restore_with_fallback((primary, fallback), "", new_empty_share_set(), &mut rng).await;
restore_with_fallback((&primary, &fallback), "", new_empty_share_set(), &mut rng).await;
assert_matches!(result, Ok(evaluation_result) => assert_eq!(evaluation_result, test_evaluation_result()));
}
@ -651,7 +476,7 @@ mod test {
};
let mut rng = OsRng;
let result = migrate_backup(
(TestSvr3Client::default(), destination),
(&TestSvr3Client::default(), &destination),
"",
make_secret(),
nonzero!(42u32),
@ -673,7 +498,7 @@ mod test {
};
let mut rng = OsRng;
let result = migrate_backup(
(source, destination),
(&source, &destination),
"",
make_secret(),
nonzero!(42u32),
@ -695,7 +520,7 @@ mod test {
};
let mut rng = OsRng;
let result = migrate_backup(
(source, destination),
(&source, &destination),
"",
make_secret(),
nonzero!(42u32),

104
rust/net/src/svr3/direct.rs Normal file
View File

@ -0,0 +1,104 @@
//
// Copyright 2024 Signal Messenger, LLC.
// SPDX-License-Identifier: AGPL-3.0-only
//
use std::time::Duration;
use async_trait::async_trait;
use crate::auth::Auth;
use crate::enclave;
use crate::enclave::{EnclaveEndpoint, EnclaveEndpointConnection, NewHandshake, Svr3Flavor};
use crate::infra::tcp_ssl::DirectConnector;
use crate::infra::ws::DefaultStream;
use crate::infra::TransportConnector;
use crate::svr::SvrConnection;
const DIRECT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
/// This trait helps create direct SVR3 connections for various combinations of
/// enclaves kinds.
#[async_trait]
pub trait DirectConnect {
type Connections;
async fn connect(&self, auth: &Auth) -> Result<Self::Connections, enclave::Error>;
}
#[async_trait]
impl<A> DirectConnect for EnclaveEndpoint<'static, A>
where
A: Svr3Flavor + NewHandshake + Sized + Send,
{
type Connections = SvrConnection<A, DefaultStream>;
async fn connect(&self, auth: &Auth) -> Result<Self::Connections, enclave::Error> {
connect_one(self, auth, DirectConnector::default()).await
}
}
#[async_trait]
impl<A, B> DirectConnect for (&EnclaveEndpoint<'static, A>, &EnclaveEndpoint<'static, B>)
where
A: Svr3Flavor + NewHandshake + Sized + Send,
B: Svr3Flavor + NewHandshake + Sized + Send,
{
type Connections = (
SvrConnection<A, DefaultStream>,
SvrConnection<B, DefaultStream>,
);
async fn connect(&self, auth: &Auth) -> Result<Self::Connections, enclave::Error> {
let transport = DirectConnector::default();
let (a, b) = futures_util::future::join(
connect_one(self.0, auth, transport.clone()),
connect_one(self.1, auth, transport),
)
.await;
Ok((a?, b?))
}
}
#[async_trait]
impl<A, B, C> DirectConnect
for (
&EnclaveEndpoint<'static, A>,
&EnclaveEndpoint<'static, B>,
&EnclaveEndpoint<'static, C>,
)
where
A: Svr3Flavor + NewHandshake + Sized + Send,
B: Svr3Flavor + NewHandshake + Sized + Send,
C: Svr3Flavor + NewHandshake + Sized + Send,
{
type Connections = (
SvrConnection<A, DefaultStream>,
SvrConnection<B, DefaultStream>,
SvrConnection<C, DefaultStream>,
);
async fn connect(&self, auth: &Auth) -> Result<Self::Connections, enclave::Error> {
let transport = DirectConnector::default();
let (a, b, c) = futures_util::future::join3(
connect_one(self.0, auth, transport.clone()),
connect_one(self.1, auth, transport.clone()),
connect_one(self.2, auth, transport),
)
.await;
Ok((a?, b?, c?))
}
}
async fn connect_one<Enclave, Transport>(
endpoint: &EnclaveEndpoint<'static, Enclave>,
auth: &Auth,
connector: Transport,
) -> Result<SvrConnection<Enclave, DefaultStream>, enclave::Error>
where
Enclave: Svr3Flavor + NewHandshake + Sized,
Transport: TransportConnector<Stream = DefaultStream>,
{
let ep_connection = EnclaveEndpointConnection::new(endpoint, DIRECT_CONNECTION_TIMEOUT);
SvrConnection::connect(auth.clone(), &ep_connection, connector).await
}

View File

@ -0,0 +1,104 @@
//
// Copyright 2024 Signal Messenger, LLC.
// SPDX-License-Identifier: AGPL-3.0-only
//
//! High level data operations on instances of `PpssSetup`
//!
//! These functions are useful if we ever want to perform multiple operations
//! on the same set of open connections, as opposed to having to connect for
//! each individual operation, as implied by `Svr3Client` trait.
use super::{Error, OpaqueMaskedShareSet};
use crate::enclave::{IntoConnections, PpssSetup};
use crate::infra::ws::{run_attested_interaction, NextOrClose};
use crate::infra::AsyncDuplexStream;
use futures_util::future::try_join_all;
use libsignal_svr3::{Backup, EvaluationResult, Query, Restore};
use rand_core::CryptoRngCore;
use std::num::NonZeroU32;
pub async fn do_backup<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error> {
let server_ids = Env::server_ids();
let backup = Backup::new(server_ids.as_ref(), password, secret, max_tries, rng)?;
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(&backup.requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results?, addresses)?;
let share_set = backup.finalize(rng, &responses)?;
Ok(OpaqueMaskedShareSet::new(share_set))
}
pub async fn do_restore<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error> {
let restore = Restore::new(password, share_set.into_inner(), rng)?;
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(&restore.requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results, addresses)?;
Ok(restore.finalize(&responses)?)
}
pub async fn do_remove<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
) -> Result<(), Error> {
let requests = std::iter::repeat(libsignal_svr3::make_remove_request());
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(requests)
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
// RemoveResponse's are empty, safe to ignore as long as they came
let _responses = collect_responses(results, addresses)?;
Ok(())
}
pub async fn do_query<S: AsyncDuplexStream + 'static, Env: PpssSetup<S>>(
connections: Env::Connections,
) -> Result<u32, Error> {
let mut connections = connections.into_connections();
let futures = connections
.as_mut()
.iter_mut()
.zip(Query::requests())
.map(|(connection, request)| run_attested_interaction(connection, request));
let results = try_join_all(futures).await?;
let addresses = connections.as_ref().iter().map(|c| c.remote_address());
let responses = collect_responses(results, addresses)?;
Ok(Query::finalize(&responses)?)
}
fn collect_responses<'a>(
results: impl IntoIterator<Item = NextOrClose<Vec<u8>>>,
addresses: impl IntoIterator<Item = &'a url::Host>,
) -> Result<Vec<Vec<u8>>, Error> {
results
.into_iter()
.zip(addresses)
.map(|(next_or_close, address)| {
next_or_close.next_or(Error::Protocol(format!("no response from {}", address)))
})
.collect()
}

126
rust/net/src/svr3/traits.rs Normal file
View File

@ -0,0 +1,126 @@
//
// Copyright 2024 Signal Messenger, LLC.
// SPDX-License-Identifier: AGPL-3.0-only
//
//! Most of the traits in this module are likely to be used together
//! therefore the module exists as a sort of a "prelude" to make importing them
//! all in bulk easier.
use std::num::NonZeroU32;
use async_trait::async_trait;
use rand_core::CryptoRngCore;
use libsignal_svr3::EvaluationResult;
use crate::enclave;
use crate::enclave::PpssSetup;
use crate::infra::AsyncDuplexStream;
use super::{ppss_ops, Error, OpaqueMaskedShareSet};
#[async_trait]
pub trait Backup {
async fn backup(
&self,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error>;
}
#[async_trait]
pub trait Restore {
async fn restore(
&self,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error>;
}
#[async_trait]
pub trait Query {
async fn query(&self) -> Result<u32, Error>;
}
#[async_trait]
pub trait Remove {
async fn remove(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait Svr3Connect {
// Stream is needed for the blanket implementation,
// otherwise S would be an unconstrained generic parameter.
type Stream;
type Env: PpssSetup<Self::Stream>;
async fn connect(
&self,
) -> Result<<Self::Env as PpssSetup<Self::Stream>>::Connections, enclave::Error>;
}
#[async_trait]
impl<T> Backup for T
where
T: Svr3Connect + Sync,
T::Stream: AsyncDuplexStream + 'static,
{
async fn backup(
&self,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, Error> {
ppss_ops::do_backup::<T::Stream, T::Env>(
self.connect().await?,
password,
secret,
max_tries,
rng,
)
.await
}
}
#[async_trait]
impl<T> Restore for T
where
T: Svr3Connect + Sync,
T::Stream: AsyncDuplexStream + 'static,
{
async fn restore(
&self,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, Error> {
ppss_ops::do_restore::<T::Stream, T::Env>(self.connect().await?, password, share_set, rng)
.await
}
}
#[async_trait]
impl<T> Remove for T
where
T: Svr3Connect + Sync,
T::Stream: AsyncDuplexStream + 'static,
{
async fn remove(&self) -> Result<(), Error> {
ppss_ops::do_remove::<T::Stream, T::Env>(self.connect().await?).await
}
}
#[async_trait]
impl<T> Query for T
where
T: Svr3Connect + Sync,
T::Stream: AsyncDuplexStream + 'static,
{
async fn query(&self) -> Result<u32, Error> {
ppss_ops::do_query::<T::Stream, T::Env>(self.connect().await?).await
}
}

View File

@ -0,0 +1,420 @@
//
// Copyright 2024 Signal Messenger, LLC.
// SPDX-License-Identifier: AGPL-3.0-only
//
//! These tests demonstrate both full and partial SVR3 migration.
//! (The tests require a LIBSIGNAL_TESTING_ENCLAVE_SECRET environment variable
//! to be set. Similar to the integration tests of SVR3 APIs in Java, etc. If the
//! variable is not set, the tests will just silently succeed.)
//!
//! Full migration means all three enclaves are being updated, whereas partial
//! only updates <3 enclaves.
//!
//! Partial migration is of a special interest. Let's consider a scenario of
//! migrating only one enclave (SGX in this test). Prev environment will consist
//! of (Sgx, Nitro, Tpm2Snp) enclaves, and the Current one will contain
//! (Sgx', Nitro, Tpm2Snp). Note the "prime" on Sgx. The migration function will
//! first write the data to the Current, and then remove it from the Prev. If
//! done naively, using "whole" environments, it will leave us with only Sgx'
//! surviving with any data, and no chance of ever restoring the secret.
//! This explains the existence of a partial environment. It will be passed into
//! the `migrate_backup` function to guarantee that only the Sgx (no prime) data
//! will be removed during migration.
//!
//! The trick to test any migration with only one actual SVR3 environment is to
//! utilize the fact that user-id is an implicit argument to all the SVR3
//! operations. Thus, `Env::STAGING.Svr3 + UID1` and `Env::STAGING.Svr3 + UID2`
//! will effectively be two different, non-overlapping, environments.
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use assert_matches::assert_matches;
use async_trait::async_trait;
use base64::prelude::{Engine as _, BASE64_STANDARD};
use nonzero_ext::nonzero;
use rand_core::{CryptoRngCore, OsRng};
use libsignal_net::auth::Auth;
use libsignal_net::enclave::{EnclaveEndpoint, EnclaveKind, Error, PpssSetup, Sgx};
use libsignal_net::env::Svr3Env;
use libsignal_net::infra::tcp_ssl::DirectConnector;
use libsignal_net::infra::TransportConnector;
use libsignal_net::svr::SvrConnection;
use libsignal_net::svr3::direct::DirectConnect;
use libsignal_net::svr3::traits::*;
use libsignal_net::svr3::{migrate_backup, restore_with_fallback, OpaqueMaskedShareSet};
use libsignal_svr3::EvaluationResult;
const PASS: &str = "password";
const TRIES: NonZeroU32 = nonzero!(10u32);
const PREV_ENV: Svr3Env = libsignal_net::env::STAGING.svr3;
const REM_ENV: SingletonEnv<'static, Sgx> = SingletonEnv(PREV_ENV.sgx());
type Stream = <DirectConnector as TransportConnector>::Stream;
#[derive(Clone)]
struct FullClient {
auth: Auth,
}
#[async_trait]
impl Svr3Connect for FullClient {
type Stream = Stream;
type Env = Svr3Env<'static>;
async fn connect(&self) -> Result<<Self::Env as PpssSetup<Self::Stream>>::Connections, Error> {
PREV_ENV.connect_directly(&self.auth).await
}
}
#[derive(Clone)]
struct PartialClient {
auth: Auth,
}
/// Single-enclave environment. Allows to connect to each of the SVR3 enclaves individually.
struct SingletonEnv<'a, E: EnclaveKind>(&'a EnclaveEndpoint<'a, E>);
// This will be our "removing" setup.
impl<S: Send> PpssSetup<S> for SingletonEnv<'_, Sgx> {
type Stream = S;
type Connections = SvrConnection<Sgx, S>;
type ServerIds = [u64; 1];
fn server_ids() -> Self::ServerIds {
[1]
}
}
#[async_trait]
impl Svr3Connect for PartialClient {
type Stream = Stream;
type Env = SingletonEnv<'static, Sgx>;
async fn connect(&self) -> Result<<Self::Env as PpssSetup<Self::Stream>>::Connections, Error> {
REM_ENV.0.connect(&self.auth).await
}
}
#[derive(Clone)]
struct ValidatingClient<T> {
inner: T,
backup_calls: Arc<AtomicUsize>,
restore_calls: Arc<AtomicUsize>,
query_calls: Arc<AtomicUsize>,
remove_calls: Arc<AtomicUsize>,
}
impl<T> ValidatingClient<T> {
fn new(inner: T) -> Self {
Self {
inner,
backup_calls: Arc::default(),
restore_calls: Arc::default(),
query_calls: Arc::default(),
remove_calls: Arc::default(),
}
}
fn backup_calls(&self) -> usize {
self.backup_calls.load(Ordering::Relaxed)
}
fn restore_calls(&self) -> usize {
self.restore_calls.load(Ordering::Relaxed)
}
fn query_calls(&self) -> usize {
self.query_calls.load(Ordering::Relaxed)
}
fn remove_calls(&self) -> usize {
self.remove_calls.load(Ordering::Relaxed)
}
// This is to simplify assertions
// Counters are in the same order as fields: (backup, restore, query, remove)
fn get_counts(&self) -> (usize, usize, usize, usize) {
(
self.backup_calls(),
self.restore_calls(),
self.query_calls(),
self.remove_calls(),
)
}
}
#[async_trait]
impl<T: Backup + Sync + Send> Backup for ValidatingClient<T> {
async fn backup(
&self,
password: &str,
secret: [u8; 32],
max_tries: NonZeroU32,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<OpaqueMaskedShareSet, libsignal_net::svr3::Error> {
self.backup_calls.fetch_add(1, Ordering::Relaxed);
self.inner.backup(password, secret, max_tries, rng).await
}
}
#[async_trait]
impl<T: Restore + Sync + Send> Restore for ValidatingClient<T> {
async fn restore(
&self,
password: &str,
share_set: OpaqueMaskedShareSet,
rng: &mut (impl CryptoRngCore + Send),
) -> Result<EvaluationResult, libsignal_net::svr3::Error> {
self.restore_calls.fetch_add(1, Ordering::Relaxed);
self.inner.restore(password, share_set, rng).await
}
}
#[async_trait]
impl<T: Query + Sync + Send> Query for ValidatingClient<T> {
async fn query(&self) -> Result<u32, libsignal_net::svr3::Error> {
self.query_calls.fetch_add(1, Ordering::Relaxed);
self.inner.query().await
}
}
#[async_trait]
impl<T: Remove + Sync + Send> Remove for ValidatingClient<T> {
async fn remove(&self) -> Result<(), libsignal_net::svr3::Error> {
self.remove_calls.fetch_add(1, Ordering::Relaxed);
self.inner.remove().await
}
}
#[tokio::test]
async fn svr3_single_enclave_migration() {
init_logger();
let Some(enclave_secret) = get_enclave_secret() else {
log::info!(
"LIBSIGNAL_TESTING_ENCLAVE_SECRET environment variable is not set. The test will be ignored."
);
return;
};
let mut rng = OsRng;
let secret = random_bytes(&mut rng);
log::info!("Secret to be stored: {}", &hex::encode(secret));
log::info!("Creating clients...");
let prev_uid = random_bytes(&mut rng);
let prev_auth = Auth::from_uid_and_secret(prev_uid, enclave_secret);
let prev_client = ValidatingClient::new(FullClient {
auth: prev_auth.clone(),
});
let current_uid = random_bytes(&mut rng);
let current_auth = Auth::from_uid_and_secret(current_uid, enclave_secret);
let current_client = ValidatingClient::new(FullClient { auth: current_auth });
assert_ne!(&prev_uid, &current_uid);
let removing_client = ValidatingClient::new(PartialClient { auth: prev_auth });
log::info!("DONE");
log::info!("Writing the initial backup...");
let share_set = prev_client
.backup(PASS, secret, TRIES, &mut rng)
.await
.expect("can backup");
log::info!("DONE");
log::info!("Validating the initial backup...");
let restored = restore_with_fallback(
(&current_client, &prev_client),
PASS,
share_set.clone(),
&mut rng,
)
.await
.expect("can restore");
assert_eq!(&restored.value, &secret);
log::info!("OK");
log::info!("Checking the current environment pre-migration...");
assert_matches!(
current_client.query().await,
Err(libsignal_net::svr3::Error::DataMissing)
);
log::info!("DONE");
log::info!("Checking the previous sgx pre-migration...");
// Removing client refers to the Sgx part of the "previous" environment.
// So there should be data there.
removing_client
.clone()
.query()
.await
.expect("Prev SGX should have data");
log::info!("DONE");
log::info!("Migrating...");
let new_share_set = migrate_backup(
(&removing_client, &current_client),
PASS,
secret,
TRIES,
&mut rng,
)
.await
.expect("can migrate");
log::info!("DONE");
log::info!("Validating the final state...");
log::info!("- Data should be gone from the prev sgx");
assert_matches!(
removing_client.query().await,
Err(libsignal_net::svr3::Error::DataMissing)
);
log::info!("- Query/restore from prev env should fail with DataMissing");
assert_matches!(
prev_client.query().await,
Err(libsignal_net::svr3::Error::DataMissing)
);
log::info!("- Can restore from the current env with the right remaining_tries");
let restored = restore_with_fallback(
(&current_client, &prev_client),
PASS,
new_share_set.clone(),
&mut rng,
)
.await
.expect("can restore after migration");
assert_eq!(restored.tries_remaining, TRIES.get() - 1);
assert_eq!(&restored.value, &secret);
assert_eq!((1, 1, 1, 0), prev_client.get_counts());
assert_eq!((1, 2, 1, 0), current_client.get_counts());
assert_eq!((0, 0, 2, 1), removing_client.get_counts());
log::info!("OK");
log::info!("Cleaning up...");
// Need to overwrite the values first in order to remove from all enclaves,
// otherwise the error from SGX will terminate other remove requests.
let _ = prev_client.backup(PASS, secret, TRIES, &mut rng).await;
let _ = prev_client.remove().await;
let _ = current_client.remove().await;
log::info!("DONE");
}
#[tokio::test]
async fn svr3_full_migration() {
init_logger();
let Some(enclave_secret) = get_enclave_secret() else {
log::info!(
"LIBSIGNAL_TESTING_ENCLAVE_SECRET environment variable is not set. The test will be ignored."
);
return;
};
let mut rng = OsRng;
let secret = random_bytes(&mut rng);
log::info!("Secret to be stored: {}", &hex::encode(secret));
log::info!("Creating clients...");
let prev_uid = random_bytes(&mut rng);
let prev_auth = Auth::from_uid_and_secret(prev_uid, enclave_secret);
let prev_client = FullClient {
auth: prev_auth.clone(),
};
let current_uid = random_bytes(&mut rng);
let current_auth = Auth::from_uid_and_secret(current_uid, enclave_secret);
let current_client = FullClient { auth: current_auth };
assert_ne!(&prev_uid, &current_uid);
log::info!("DONE");
log::info!("Writing the initial backup...");
let share_set = prev_client
.backup(PASS, secret, TRIES, &mut rng)
.await
.expect("can backup");
log::info!("DONE");
log::info!("Validating the initial backup...");
let restored = prev_client
.restore(PASS, share_set.clone(), &mut rng)
.await
.expect("can restore");
assert_eq!(&restored.value, &secret);
log::info!("OK");
log::info!("Checking the current environment pre-migration...");
assert_matches!(
current_client.query().await,
Err(libsignal_net::svr3::Error::DataMissing)
);
log::info!("DONE");
log::info!("Migrating...");
let new_share_set = migrate_backup(
(&prev_client, &current_client),
PASS,
secret,
TRIES,
&mut rng,
)
.await
.expect("can migrate");
log::info!("DONE");
log::info!("Validating the final state...");
log::info!("- Query/restore from prev env should fail with DataMissing");
assert_matches!(
prev_client.query().await,
Err(libsignal_net::svr3::Error::DataMissing)
);
log::info!("- Can restore from the current env with the right remaining_tries");
let restored = current_client
.restore(PASS, new_share_set.clone(), &mut rng)
.await
.expect("can restore after migration");
assert_eq!(restored.tries_remaining, TRIES.get() - 1);
assert_eq!(&restored.value, &secret);
log::info!("OK");
log::info!("Cleaning up...");
let _ = current_client.remove().await;
log::info!("DONE");
}
fn random_bytes<const N: usize>(rng: &mut impl CryptoRngCore) -> [u8; N] {
let mut bytes = [0u8; N];
rng.fill_bytes(&mut bytes[..]);
bytes
}
fn parse_auth_secret(b64: &str) -> [u8; 32] {
BASE64_STANDARD
.decode(b64)
.expect("valid b64")
.try_into()
.expect("secret is 32 bytes")
}
fn init_logger() {
let _ = env_logger::builder().is_test(true).try_init();
}
fn get_enclave_secret() -> Option<[u8; 32]> {
std::env::var("LIBSIGNAL_TESTING_ENCLAVE_SECRET")
.map(|b64| parse_auth_secret(&b64))
.ok()
}