From 16012e6ffe0365235f84ec49f93d3c9f341b7b78 Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Mon, 18 Sep 2023 10:49:27 -0500 Subject: [PATCH] Remove obsolete `ManagedPeriodicWork` --- .../storage/ManagedPeriodicWork.java | 118 ------------ .../storage/ManagedPeriodicWorkLock.java | 35 ---- .../resources/lua/periodic_worker/unlock.lua | 8 - .../storage/ManagedPeriodicWorkTest.java | 169 ------------------ 4 files changed, 330 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkLock.java delete mode 100644 service/src/main/resources/lua/periodic_worker/unlock.lua delete mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java deleted file mode 100644 index 9f82db96..00000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWork.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.storage; - -import static com.codahale.metrics.MetricRegistry.name; - -import io.dropwizard.lifecycle.Managed; -import io.micrometer.core.instrument.Metrics; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Util; - -public abstract class ManagedPeriodicWork implements Managed { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private static final String FUTURE_DONE_GAUGE_NAME = "futureDone"; - - private final ManagedPeriodicWorkLock lock; - private final Duration workerTtl; - private final Duration runInterval; - private final String workerId; - private final ScheduledExecutorService executorService; - - private Duration sleepDurationAfterUnexpectedException = Duration.ofSeconds(10); - - - @Nullable - private ScheduledFuture scheduledFuture; - private AtomicReference> activeExecutionFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); - - public ManagedPeriodicWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, final ScheduledExecutorService scheduledExecutorService) { - this.lock = lock; - this.workerTtl = workerTtl; - this.runInterval = runInterval; - this.workerId = UUID.randomUUID().toString(); - this.executorService = scheduledExecutorService; - } - - abstract protected void doPeriodicWork() throws Exception; - - @Override - public synchronized void start() throws Exception { - - if (scheduledFuture != null) { - return; - } - - scheduledFuture = executorService.scheduleAtFixedRate(() -> { - try { - execute(); - } catch (final Exception e) { - logger.warn("Error in execution", e); - - // wait a bit, in case the error is caused by external instability - Util.sleep(sleepDurationAfterUnexpectedException.toMillis()); - } - }, 0, runInterval.getSeconds(), TimeUnit.SECONDS); - - Metrics.gauge(name(getClass(), FUTURE_DONE_GAUGE_NAME), scheduledFuture, future -> future.isDone() ? 1 : 0); - } - - @Override - public synchronized void stop() throws Exception { - - if (scheduledFuture != null) { - - scheduledFuture.cancel(false); - - try { - activeExecutionFuture.get().join(); - } catch (final Exception e) { - logger.warn("error while awaiting final execution", e); - } - } - } - - public void setSleepDurationAfterUnexpectedException(final Duration sleepDurationAfterUnexpectedException) { - this.sleepDurationAfterUnexpectedException = sleepDurationAfterUnexpectedException; - } - - private void execute() { - - if (lock.claimActiveWork(workerId, workerTtl)) { - try { - - activeExecutionFuture.set(new CompletableFuture<>()); - - logger.info("Starting execution"); - doPeriodicWork(); - logger.info("Execution complete"); - - } catch (final Exception e) { - logger.warn("Periodic work failed", e); - - // wait a bit, in case the error is caused by external instability - Util.sleep(sleepDurationAfterUnexpectedException.toMillis()); - - } finally { - try { - lock.releaseActiveWork(workerId); - } finally { - activeExecutionFuture.get().complete(null); - } - } - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkLock.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkLock.java deleted file mode 100644 index 5ccbfae1..00000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkLock.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2013-2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.storage; - -import io.lettuce.core.ScriptOutputType; -import io.lettuce.core.SetArgs; -import org.whispersystems.textsecuregcm.redis.ClusterLuaScript; -import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster; -import java.io.IOException; -import java.time.Duration; -import java.util.List; - -public class ManagedPeriodicWorkLock { - - private final String activeWorkerKey; - - private final FaultTolerantRedisCluster cacheCluster; - private final ClusterLuaScript unlockClusterScript; - - public ManagedPeriodicWorkLock(final String activeWorkerKey, final FaultTolerantRedisCluster cacheCluster) throws IOException { - this.activeWorkerKey = activeWorkerKey; - this.cacheCluster = cacheCluster; - this.unlockClusterScript = ClusterLuaScript.fromResource(cacheCluster, "lua/periodic_worker/unlock.lua", ScriptOutputType.INTEGER); - } - - public boolean claimActiveWork(String workerId, Duration ttl) { - return "OK".equals(cacheCluster.withCluster(connection -> connection.sync().set(activeWorkerKey, workerId, SetArgs.Builder.nx().px(ttl.toMillis())))); - } - - public void releaseActiveWork(String workerId) { - unlockClusterScript.execute(List.of(activeWorkerKey), List.of(workerId)); - } -} diff --git a/service/src/main/resources/lua/periodic_worker/unlock.lua b/service/src/main/resources/lua/periodic_worker/unlock.lua deleted file mode 100644 index b95d15d6..00000000 --- a/service/src/main/resources/lua/periodic_worker/unlock.lua +++ /dev/null @@ -1,8 +0,0 @@ --- keys: lock_key --- argv: lock_value - -if redis.call("GET", KEYS[1]) == ARGV[1] then - return redis.call("DEL", KEYS[1]) -else - return 0 -end diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java deleted file mode 100644 index 21d1cf8b..00000000 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/ManagedPeriodicWorkTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2013-2021 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.storage; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.time.Duration; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.whispersystems.textsecuregcm.util.Util; - -class ManagedPeriodicWorkTest { - - private ScheduledExecutorService scheduledExecutorService; - private ManagedPeriodicWorkLock lock; - private TestWork testWork; - - @BeforeEach - void setup() { - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - lock = mock(ManagedPeriodicWorkLock.class); - - testWork = new TestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), - scheduledExecutorService); - } - - @AfterEach - void teardown() throws Exception { - scheduledExecutorService.shutdown(); - - assertTrue(scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS)); - } - - @Test - void test() throws Exception { - when(lock.claimActiveWork(any(), any())).thenReturn(true); - - testWork.start(); - - synchronized (testWork) { - Util.wait(testWork); - } - - testWork.stop(); - - verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class)); - verify(lock, atLeastOnce()).releaseActiveWork(anyString()); - - assertTrue(1 <= testWork.getCount()); - } - - @Test - void testSlowWorkShutdown() throws Exception { - - when(lock.claimActiveWork(any(), any())).thenReturn(true); - - testWork.setWorkSleepDuration(Duration.ofSeconds(1)); - - testWork.start(); - - synchronized (testWork) { - Util.wait(testWork); - } - - long startMillis = System.currentTimeMillis(); - - testWork.stop(); - - long runMillis = System.currentTimeMillis() - startMillis; - - assertTrue(runMillis > 500); - - verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class)); - verify(lock, atLeastOnce()).releaseActiveWork(anyString()); - - assertTrue(1 <= testWork.getCount()); - } - - @Test - void testWorkExceptionReleasesLock() throws Exception { - when(lock.claimActiveWork(any(), any())).thenReturn(true); - - testWork = new ExceptionalTestWork(lock, Duration.ofMinutes(5), Duration.ofMinutes(5), scheduledExecutorService); - - testWork.setSleepDurationAfterUnexpectedException(Duration.ZERO); - - testWork.start(); - - synchronized (testWork) { - Util.wait(testWork); - } - - testWork.stop(); - - verify(lock, atLeastOnce()).claimActiveWork(anyString(), any(Duration.class)); - verify(lock, atLeastOnce()).releaseActiveWork(anyString()); - - assertEquals(0, testWork.getCount()); - } - - - private static class TestWork extends ManagedPeriodicWork { - - private final AtomicInteger workCounter = new AtomicInteger(); - private Duration workSleepDuration = Duration.ZERO; - - public TestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, - final ScheduledExecutorService scheduledExecutorService) { - super(lock, workerTtl, runInterval, scheduledExecutorService); - } - - @Override - protected void doPeriodicWork() throws Exception { - - notifyStarted(); - - if (!workSleepDuration.isZero()) { - Util.sleep(workSleepDuration.toMillis()); - } - - workCounter.incrementAndGet(); - } - - synchronized void notifyStarted() { - notifyAll(); - } - - int getCount() { - return workCounter.get(); - } - - void setWorkSleepDuration(final Duration workSleepDuration) { - this.workSleepDuration = workSleepDuration; - } - } - - private static class ExceptionalTestWork extends TestWork { - - - public ExceptionalTestWork(final ManagedPeriodicWorkLock lock, final Duration workerTtl, final Duration runInterval, - final ScheduledExecutorService scheduledExecutorService) { - super(lock, workerTtl, runInterval, scheduledExecutorService); - } - - @Override - protected void doPeriodicWork() throws Exception { - - notifyStarted(); - - throw new RuntimeException(); - } - } - -}