From cf8f2a34630d6145c34c5d3ba7eba26ae0a669e0 Mon Sep 17 00:00:00 2001 From: ravi-signal <99042880+ravi-signal@users.noreply.github.com> Date: Wed, 31 Jan 2024 14:29:15 -0600 Subject: [PATCH] remove synchronized locks that may be held while blocking --- .../textsecuregcm/storage/MessagesCache.java | 31 ++++++++++++++----- .../session/WebSocketSessionContext.java | 31 ++++++++++++++----- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index e335ae6c..bdcb0139 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -75,6 +76,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private final ClusterLuaScript removeQueueScript; private final ClusterLuaScript getQueuesToPersistScript; + private final ReentrantLock messageListenersLock = new ReentrantLock(); private final Map messageListenersByQueueName = new HashMap<>(); private final Map queueNamesByMessageListener = new IdentityHashMap<>(); @@ -146,8 +148,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final Set queueNames; - synchronized (messageListenersByQueueName) { + messageListenersLock.lock(); + try { queueNames = new HashSet<>(messageListenersByQueueName.keySet()); + } finally { + messageListenersLock.unlock(); } for (final String queueName : queueNames) { @@ -402,11 +407,14 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp final String queueName = getQueueName(destinationUuid, deviceId); final CompletableFuture subscribeFuture; - synchronized (messageListenersByQueueName) { + messageListenersLock.lock(); + try { messageListenersByQueueName.put(queueName, listener); queueNamesByMessageListener.put(listener, queueName); - // Submit to the Redis queue within the synchronized block, but don’t wait until exiting + // Submit to the Redis queue while holding the lock, but don’t wait until exiting subscribeFuture = subscribeForKeyspaceNotifications(queueName); + } finally { + messageListenersLock.unlock(); } subscribeFuture.join(); @@ -414,22 +422,28 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp public void removeMessageAvailabilityListener(final MessageAvailabilityListener listener) { @Nullable final String queueName; - synchronized (messageListenersByQueueName) { + messageListenersLock.lock(); + try { queueName = queueNamesByMessageListener.get(listener); + } finally { + messageListenersLock.unlock(); } if (queueName != null) { final CompletableFuture unsubscribeFuture; - synchronized (messageListenersByQueueName) { + messageListenersLock.lock(); + try { queueNamesByMessageListener.remove(listener); if (messageListenersByQueueName.remove(queueName, listener)) { - // Submit to the Redis queue within the synchronized block, but don’t wait until exiting + // Submit to the Redis queue holding the lock, but don’t wait until exiting unsubscribeFuture = unsubscribeFromKeyspaceNotifications(queueName); } else { messageAvailabilityListenerRemovedAfterAddCounter.increment(); unsubscribeFuture = CompletableFuture.completedFuture(null); } + } finally { + messageListenersLock.unlock(); } unsubscribeFuture.join(); @@ -507,8 +521,11 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp private Optional findListener(final String keyspaceChannel) { final String queueName = getQueueNameFromKeyspaceChannel(keyspaceChannel); - synchronized (messageListenersByQueueName) { + messageListenersLock.lock(); + try { return Optional.ofNullable(messageListenersByQueueName.get(queueName)); + } finally { + messageListenersLock.unlock(); } } diff --git a/websocket-resources/src/main/java/org/whispersystems/websocket/session/WebSocketSessionContext.java b/websocket-resources/src/main/java/org/whispersystems/websocket/session/WebSocketSessionContext.java index a83b4107..8c7caebc 100644 --- a/websocket-resources/src/main/java/org/whispersystems/websocket/session/WebSocketSessionContext.java +++ b/websocket-resources/src/main/java/org/whispersystems/websocket/session/WebSocketSessionContext.java @@ -6,6 +6,7 @@ package org.whispersystems.websocket.session; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; import org.whispersystems.websocket.WebSocketClient; @@ -13,6 +14,8 @@ public class WebSocketSessionContext { private final List closeListeners = new LinkedList<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final WebSocketClient webSocketClient; private Object authenticated; @@ -39,21 +42,33 @@ public class WebSocketSessionContext { return authenticated; } - public synchronized void addWebsocketClosedListener(WebSocketEventListener listener) { - if (!closed) this.closeListeners.add(listener); - else listener.onWebSocketClose(this, 1000, "Closed"); + public void addWebsocketClosedListener(WebSocketEventListener listener) { + lock.lock(); + try { + if (!closed) + this.closeListeners.add(listener); + else + listener.onWebSocketClose(this, 1000, "Closed"); + } finally { + lock.unlock(); + } } public WebSocketClient getClient() { return webSocketClient; } - public synchronized void notifyClosed(int statusCode, String reason) { - for (WebSocketEventListener listener : closeListeners) { - listener.onWebSocketClose(this, statusCode, reason); - } + public void notifyClosed(int statusCode, String reason) { + lock.lock(); + try { + for (WebSocketEventListener listener : closeListeners) { + listener.onWebSocketClose(this, statusCode, reason); + } - closed = true; + closed = true; + } finally { + lock.unlock(); + } } public interface WebSocketEventListener {