From cb50b44d8f4f5e066b9602af3f9ec387a4224c16 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Thu, 10 Nov 2022 12:53:52 -0500 Subject: [PATCH] Allow the account cleaner to operate on multiple accounts in parallel --- .../textsecuregcm/WhisperServerService.java | 3 +- .../textsecuregcm/storage/AccountCleaner.java | 36 +++++++++++++++---- .../storage/AccountCleanerTest.java | 15 +++++++- 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 27f5b8d1..10131462 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -403,6 +403,7 @@ public class WhisperServerService extends Application fromUuid, List chunkAccounts) { + final List> deletionFutures = new ArrayList<>(); + for (Account account : chunkAccounts) { if (isExpired(account) || needsExplicitRemoval(account)) { final String deletionReason = needsExplicitRemoval(account) ? "newlyExpired" : "previouslyExpired"; - try { - accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED); - Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, DELETION_REASON_TAG_NAME, deletionReason).increment(); - } catch (final Exception e) { - log.warn("Failed to delete account {}", account.getUuid(), e); - } + deletionFutures.add(CompletableFuture.runAsync(() -> { + try { + accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED); + } catch (final InterruptedException e) { + throw new CompletionException(e); + } + }, deletionExecutor) + .whenComplete((ignored, throwable) -> { + if (throwable != null) { + log.warn("Failed to delete account {}", account.getUuid(), throwable); + } else { + Metrics.counter(DELETED_ACCOUNT_COUNTER_NAME, DELETION_REASON_TAG_NAME, deletionReason).increment(); + } + })); } } + + try { + CompletableFuture.allOf(deletionFutures.toArray(new CompletableFuture[0])).join(); + } catch (final Exception e) { + log.debug("Failed to delete one or more accounts in chunk", e); + } } private boolean needsExplicitRemoval(Account account) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java index ef64e4a8..0100983f 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/AccountCleanerTest.java @@ -15,7 +15,10 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason; @@ -32,6 +35,8 @@ class AccountCleanerTest { private final Device undeletedDisabledDevice = mock(Device.class ); private final Device undeletedEnabledDevice = mock(Device.class ); + private ExecutorService deletionExecutor; + @BeforeEach void setup() { @@ -61,11 +66,19 @@ class AccountCleanerTest { when(undeletedEnabledAccount.getNumber()).thenReturn("+14153333333"); when(undeletedEnabledAccount.getLastSeen()).thenReturn(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(364)); when(undeletedEnabledAccount.getUuid()).thenReturn(UUID.randomUUID()); + + deletionExecutor = Executors.newFixedThreadPool(2); + } + + @AfterEach + void tearDown() throws InterruptedException { + deletionExecutor.shutdown(); + deletionExecutor.awaitTermination(2, TimeUnit.SECONDS); } @Test void testAccounts() throws AccountDatabaseCrawlerRestartException, InterruptedException { - AccountCleaner accountCleaner = new AccountCleaner(accountsManager); + AccountCleaner accountCleaner = new AccountCleaner(accountsManager, deletionExecutor); accountCleaner.onCrawlStart(); accountCleaner.timeAndProcessCrawlChunk(Optional.empty(), Arrays.asList(deletedDisabledAccount, undeletedDisabledAccount, undeletedEnabledAccount)); accountCleaner.onCrawlEnd(Optional.empty());