diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java index 0606de55..35170555 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/AbstractSinglePassCrawlAccountsCommand.java @@ -70,16 +70,31 @@ public abstract class AbstractSinglePassCrawlAccountsCommand extends Environment final int segments = Objects.requireNonNull(namespace.getInt(SEGMENT_COUNT)); + environment.lifecycle().manage(new CommandStopListener(configuration.getCommandStopListener())); + logger.info("Crawling accounts with {} segments and {} processors", segments, Runtime.getRuntime().availableProcessors()); - final CommandStopListener commandStopListener = new CommandStopListener(configuration.getCommandStopListener()); try { - commandStopListener.start(); + environment.lifecycle().getManagedObjects().forEach(managedObject -> { + try { + managedObject.start(); + } catch (final Exception e) { + logger.error("Failed to start managed object", e); + throw new RuntimeException(e); + } + }); + crawlAccounts(commandDependencies.accountsManager().streamAllFromDynamo(segments, Schedulers.parallel())); } finally { - commandStopListener.stop(); + environment.lifecycle().getManagedObjects().forEach(managedObject -> { + try { + managedObject.stop(); + } catch (final Exception e) { + logger.error("Failed to stop managed object", e); + } + }); } }