0
0
mirror of https://github.com/signalapp/libsignal.git synced 2024-09-20 03:52:17 +02:00

net: Pass "fatal" connection errors up through chat connect*()

Applies 38a5f01f to the chat server connection methods as well, so
that the errors added in 9a8429da actually get recognized.
This commit is contained in:
Jordan Rose 2024-06-11 15:00:05 -07:00
parent 408e470724
commit 0a96d32813
5 changed files with 153 additions and 32 deletions

View File

@ -24,7 +24,8 @@ where
C: ServiceConnector + Send + Sync + 'static,
C::Service: ChatService + Clone + Sync + Send + 'static,
C::Channel: Send + Sync,
C::ConnectError: Send + Sync + Debug + LogSafeDisplay + ErrorClassifier,
C::ConnectError:
Send + Sync + Debug + LogSafeDisplay + ErrorClassifier + Into<ChatServiceError>,
C::StartError: Send + Sync + Debug + LogSafeDisplay,
{
async fn send(&self, msg: Request, timeout: Duration) -> Result<Response, ChatServiceError> {
@ -47,7 +48,8 @@ where
C: ServiceConnector + Send + Sync + 'static,
C::Service: ChatService + RemoteAddressInfo + Clone + Sync + Send + 'static,
C::Channel: Send + Sync,
C::ConnectError: Send + Sync + Debug + LogSafeDisplay + ErrorClassifier,
C::ConnectError:
Send + Sync + Debug + LogSafeDisplay + ErrorClassifier + Into<ChatServiceError>,
C::StartError: Send + Sync + Debug + LogSafeDisplay,
{
async fn send_and_debug(

View File

@ -3,9 +3,10 @@
// SPDX-License-Identifier: AGPL-3.0-only
//
use crate::infra::errors::LogSafeDisplay;
use crate::infra::connection_manager::{ErrorClass, ErrorClassifier};
use crate::infra::errors::{LogSafeDisplay, TransportConnectError};
use crate::infra::reconnect;
use crate::infra::ws::WebSocketServiceError;
use crate::infra::ws::{WebSocketConnectError, WebSocketServiceError};
#[derive(Debug, thiserror::Error, displaydoc::Display)]
pub enum ChatServiceError {
@ -41,20 +42,57 @@ impl LogSafeDisplay for ChatServiceError {}
impl From<WebSocketServiceError> for ChatServiceError {
fn from(e: WebSocketServiceError) -> Self {
Self::WebSocket(e)
}
}
impl From<WebSocketConnectError> for ChatServiceError {
fn from(e: WebSocketConnectError) -> Self {
if !matches!(e.classify(), ErrorClass::Fatal) {
log::warn!(
"intermittent WebSocketConnectError should be retried, not returned as a ChatServiceError ({e})"
);
}
match e {
WebSocketServiceError::Http(response) if response.status() == 499 => Self::AppExpired,
WebSocketServiceError::Http(response) if response.status() == 403 => {
// Technically this only applies to identified sockets,
// but unidentified sockets should never produce a 403 anyway.
Self::DeviceDeregistered
WebSocketConnectError::Transport(e) => match e {
TransportConnectError::InvalidConfiguration => {
WebSocketServiceError::Other("invalid configuration")
}
TransportConnectError::TcpConnectionFailed => {
WebSocketServiceError::Other("TCP connection failed")
}
TransportConnectError::DnsError => WebSocketServiceError::Other("DNS error"),
TransportConnectError::SslError(_)
| TransportConnectError::SslFailedHandshake(_) => {
WebSocketServiceError::Other("TLS failure")
}
TransportConnectError::CertError => {
WebSocketServiceError::Other("failed to load certificates")
}
}
.into(),
WebSocketConnectError::Timeout => Self::Timeout,
WebSocketConnectError::WebSocketError(e) => {
match e {
tungstenite::Error::Http(response) if response.status() == 499 => {
Self::AppExpired
}
tungstenite::Error::Http(response) if response.status() == 403 => {
// Technically this only applies to identified sockets,
// but unidentified sockets should never produce a 403 anyway.
Self::DeviceDeregistered
}
_ => Self::WebSocket(e.into()),
}
}
e => Self::WebSocket(e),
}
}
}
impl From<reconnect::ReconnectError> for ChatServiceError {
fn from(e: reconnect::ReconnectError) -> Self {
impl<E: LogSafeDisplay + Into<ChatServiceError>> From<reconnect::ReconnectError<E>>
for ChatServiceError
{
fn from(e: reconnect::ReconnectError<E>) -> Self {
match e {
reconnect::ReconnectError::Timeout { attempts } => {
Self::TimeoutEstablishingConnection { attempts }
@ -62,6 +100,7 @@ impl From<reconnect::ReconnectError> for ChatServiceError {
reconnect::ReconnectError::AllRoutesFailed { attempts } => {
Self::AllConnectionRoutesFailed { attempts }
}
reconnect::ReconnectError::RejectedByServer(e) => e.into(),
reconnect::ReconnectError::Inactive => Self::ServiceInactive,
}
}

View File

@ -579,7 +579,14 @@ mod test {
let route_1_healthy = route_1_attempt.fetch_add(1, Ordering::Relaxed)
== route_1_attempts_until_cooldown;
simulate_connect(connection_params, route_1_healthy)
simulate_connect(
connection_params,
if route_1_healthy {
None
} else {
Some(TestError::Expected)
},
)
})
.await;
@ -632,7 +639,9 @@ mod test {
let multi_route_manager =
MultiRouteConnectionManager::new(vec![first_manager.clone(), second_manager.clone()]);
let res = multi_route_manager
.connect_or_wait(|connection_params| simulate_connect(connection_params, false))
.connect_or_wait(|connection_params| {
simulate_connect(connection_params, Some(TestError::Expected))
})
.await;
assert_matches!(res, ConnectionAttemptOutcome::WaitUntil(_));
assert_eq!(
@ -645,6 +654,37 @@ mod test {
);
}
#[tokio::test(start_paused = true)]
async fn multi_route_manager_propagates_post_connection_failure() {
let connection_params = example_connection_params(ROUTE_1);
let first_manager_attempts_until_cooldown = 5;
let second_manager_attempts_until_cooldown = 3;
let first_manager = CooldownAfterSomeAttempts::new(
first_manager_attempts_until_cooldown,
connection_params.clone(),
);
let second_manager = CooldownAfterSomeAttempts::new(
second_manager_attempts_until_cooldown,
connection_params,
);
let multi_route_manager =
MultiRouteConnectionManager::new(vec![first_manager.clone(), second_manager.clone()]);
let res = multi_route_manager
.connect_or_wait(|connection_params| {
simulate_connect(
connection_params,
Some(ClassifiableTestError(ErrorClass::Fatal)),
)
})
.await;
assert_matches!(
res,
ConnectionAttemptOutcome::Attempted(Err(ClassifiableTestError(ErrorClass::Fatal)))
);
assert_eq!(1, first_manager.attempts_made.load(Ordering::Relaxed));
assert_eq!(0, second_manager.attempts_made.load(Ordering::Relaxed));
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn multi_route_manager_all_in_cooldown() {
const SHORT_DELAY: Duration = Duration::from_secs(5);
@ -656,7 +696,9 @@ mod test {
let now = Instant::now();
let attempt_outcome: ConnectionAttemptOutcome<&str, TestError> = multi_route_manager
.connect_or_wait(|connection_params| simulate_connect(connection_params, true))
.connect_or_wait(|connection_params| {
simulate_connect(connection_params, Some(TestError::Expected))
})
.await;
let wait_until =
assert_matches!(attempt_outcome, ConnectionAttemptOutcome::WaitUntil(t) => t);
@ -670,7 +712,15 @@ mod test {
) {
let attempt_outcome: ConnectionAttemptOutcome<&str, TestError> = multi_route_manager
.connect_or_wait(|connection_params| async move {
simulate_connect(connection_params, route1_healthy).await
simulate_connect(
connection_params,
if route1_healthy {
None
} else {
Some(TestError::Expected)
},
)
.await
})
.await;
assert_matches!(
@ -679,13 +729,13 @@ mod test {
);
}
async fn simulate_connect(
async fn simulate_connect<E>(
connection_params: &ConnectionParams,
route1_healthy: bool,
) -> Result<&str, TestError> {
let route1_response = match route1_healthy {
true => Ok(ROUTE_1),
false => Err(TestError::Expected),
route1_error: Option<E>,
) -> Result<&str, E> {
let route1_response = match route1_error {
None => Ok(ROUTE_1),
Some(err) => Err(err),
};
match connection_params.host.borrow() {
ROUTE_1 => future::ready(route1_response).await,
@ -694,7 +744,7 @@ mod test {
tokio::time::sleep(LONG_CONNECTION_TIME).await;
future::ready(Ok(ROUTE_THAT_TIMES_OUT)).await
}
_ => future::ready(Err(TestError::Unexpected("not configured for the route"))).await,
_ => panic!("not configured for the route"),
}
}

View File

@ -261,18 +261,25 @@ pub(crate) struct ServiceWithReconnect<C: ServiceConnector, M> {
}
#[derive(Debug, Display)]
pub(crate) enum ReconnectError {
pub(crate) enum ReconnectError<E: LogSafeDisplay> {
/// Operation timed out
Timeout { attempts: u16 },
/// All attempted routes failed to connect
AllRoutesFailed { attempts: u16 },
/// Rejected by server: {0}
RejectedByServer(E),
/// Service is in the inactive state
Inactive,
}
impl ErrorClassifier for ReconnectError {
impl<E: LogSafeDisplay> ErrorClassifier for ReconnectError<E> {
fn classify(&self) -> ErrorClass {
ErrorClass::Intermittent
match self {
ReconnectError::Timeout { .. } | ReconnectError::AllRoutesFailed { .. } => {
ErrorClass::Intermittent
}
ReconnectError::RejectedByServer(_) | ReconnectError::Inactive => ErrorClass::Fatal,
}
}
}
@ -339,15 +346,17 @@ where
self.data.reconnect_count.load(Ordering::Relaxed)
}
pub(crate) async fn reconnect_if_active(&self) -> Result<(), ReconnectError> {
pub(crate) async fn reconnect_if_active(&self) -> Result<(), ReconnectError<C::ConnectError>> {
self.connect(true).await
}
pub(crate) async fn connect_from_inactive(&self) -> Result<(), ReconnectError> {
pub(crate) async fn connect_from_inactive(
&self,
) -> Result<(), ReconnectError<C::ConnectError>> {
self.connect(false).await
}
async fn connect(&self, respect_inactive: bool) -> Result<(), ReconnectError> {
async fn connect(&self, respect_inactive: bool) -> Result<(), ReconnectError<C::ConnectError>> {
let mut attempts: u16 = 0;
let start_of_connection_process = Instant::now();
let deadline = start_of_connection_process + self.data.connection_timeout;
@ -407,13 +416,33 @@ where
}
}
ServiceState::Error(e) => {
// short-circuiting mechanism is responsibility of the `ConnectionManager`,
// so here we're just going to keep trying until we get into
// one of the non-retryable states, `Cooldown` or time out.
if attempts > 0 {
// Only log about errors that happened on *this* connect attempt.
log::info!("Connection attempt resulted in an error: {}", e);
}
match e.classify() {
ErrorClass::Intermittent => {
// short-circuiting mechanism is responsibility of the `ConnectionManager`,
// so here we're just going to keep trying until we get into
// one of the non-retryable states, `Cooldown` or time out.
}
ErrorClass::RetryAt(next_attempt_time) => {
*guard = ServiceState::Cooldown(next_attempt_time);
continue;
}
ErrorClass::Fatal => {
if !respect_inactive {
return Err(ReconnectError::AllRoutesFailed { attempts });
}
let state = std::mem::replace(&mut *guard, ServiceState::Inactive);
let ServiceState::Error(e) = state else {
unreachable!("we checked this above, matching on &*guard");
};
return Err(ReconnectError::RejectedByServer(e));
}
}
}
};

View File

@ -60,6 +60,7 @@ impl<T: TransportConnector, E> WebSocketClientConnector<T, E> {
}
}
/// A simplified version of [`tungstenite::Error`] that supports [`LogSafeDisplay`].
#[derive(Debug, thiserror::Error)]
pub enum WebSocketServiceError {
ChannelClosed,