mirror of
https://github.com/signalapp/libsignal.git
synced 2024-09-20 03:52:17 +02:00
Fix CompletableFuture error handling
Fix the behavior of CompletableFuture.thenApply so that if the applied function throws an exception, the pending future receives the exception (instead of never completing). Add tests.
This commit is contained in:
parent
05b88ad1d1
commit
1c6e8e512d
@ -9,104 +9,9 @@ import static org.junit.Assert.*;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Function;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FutureTest {
|
||||
@Test
|
||||
public void testInitialState() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
assertFalse(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
assertThrows(TimeoutException.class, () -> future.get(1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccess() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
future.complete(42);
|
||||
assertTrue(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
assertEquals(42, (int) future.get());
|
||||
assertEquals(42, (int) future.get(1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailure() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("oh no");
|
||||
future.completeExceptionally(exception);
|
||||
assertTrue(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplySuccess() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
assertFalse(chained.isDone());
|
||||
future.complete(21);
|
||||
assertTrue(chained.isDone());
|
||||
assertEquals(false, chained.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyFailure() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
Exception exception = new RuntimeException("error!");
|
||||
assertFalse(chained.isDone());
|
||||
future.completeExceptionally(exception);
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
private class CountingFunction<T, U> implements Function<T, U> {
|
||||
public CountingFunction(Function<T, U> f) {
|
||||
this.f = f;
|
||||
}
|
||||
|
||||
public U apply(T value) {
|
||||
this.applicationCount++;
|
||||
return this.f.apply(value);
|
||||
}
|
||||
|
||||
public long getApplicationCount() {
|
||||
return this.applicationCount;
|
||||
}
|
||||
|
||||
private long applicationCount = 0;
|
||||
private Function<T, U> f;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyOnceFirstCompletionOnly() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CountingFunction<Integer, Boolean> function = new CountingFunction<>((Integer i) -> (i == 0));
|
||||
CompletableFuture<Boolean> chained = future.thenApply(function);
|
||||
|
||||
assertFalse(chained.isDone());
|
||||
future.complete(55);
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
future.complete(33);
|
||||
future.complete(0);
|
||||
|
||||
assertEquals(false, chained.get());
|
||||
assertEquals(function.getApplicationCount(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessFromRust() throws Exception {
|
||||
Future<Integer> future = Native.TESTING_FutureSuccess(1, 21);
|
||||
@ -119,43 +24,4 @@ public class FutureTest {
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
|
||||
// These multi-threaded tests are inherently racy in whether they actually have one thread wait()
|
||||
// and the other notify(). The observable behavior shouldn't be different, though.
|
||||
|
||||
@Test
|
||||
public void testSuccessMultiThreaded() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
|
||||
new Thread(
|
||||
() -> {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
future.complete(42);
|
||||
})
|
||||
.start();
|
||||
|
||||
assertEquals(42, (int) future.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureMultiThreaded() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("oh no");
|
||||
|
||||
new Thread(
|
||||
() -> {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
future.completeExceptionally(exception);
|
||||
})
|
||||
.start();
|
||||
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
}
|
||||
|
@ -103,12 +103,36 @@ public class CompletableFuture<T> implements Future<T> {
|
||||
return get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a future that will complete with the applied function applied to this future's
|
||||
* completion value.
|
||||
*
|
||||
* <p>If this future completes exceptionally, the exception will be propagated to the returned
|
||||
* future. If this future completes normally but the applied function throws, the returned future
|
||||
* will complete exceptionally with the thrown exception.
|
||||
*/
|
||||
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
|
||||
CompletableFuture<U> future = new CompletableFuture<>();
|
||||
ThenApplyCompleter completer = new ThenApplyCompleter(future, fn);
|
||||
|
||||
T result;
|
||||
Throwable exception;
|
||||
synchronized (this) {
|
||||
this.consumers.add(completer);
|
||||
if (!this.completed) {
|
||||
this.consumers.add(completer);
|
||||
return future;
|
||||
}
|
||||
result = this.result;
|
||||
exception = this.exception;
|
||||
}
|
||||
|
||||
// If this future has already completed, perform the appropriate action now.
|
||||
// This is done outside of the synchronized block to prevent deadlocks and
|
||||
// holding the lock for a potentially long period.
|
||||
if (exception != null) {
|
||||
completer.completeExceptionally.accept(exception);
|
||||
} else {
|
||||
completer.complete.accept(result);
|
||||
}
|
||||
|
||||
return future;
|
||||
@ -119,7 +143,14 @@ public class CompletableFuture<T> implements Future<T> {
|
||||
CompletableFuture<U> future, Function<? super T, ? extends U> fn) {
|
||||
this.complete =
|
||||
(T value) -> {
|
||||
future.complete(fn.apply(value));
|
||||
U output;
|
||||
try {
|
||||
output = fn.apply(value);
|
||||
} catch (Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
return;
|
||||
}
|
||||
future.complete(output);
|
||||
};
|
||||
this.completeExceptionally =
|
||||
(Throwable throwable) -> {
|
||||
|
@ -0,0 +1,220 @@
|
||||
//
|
||||
// Copyright 2023 Signal Messenger, LLC.
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
//
|
||||
|
||||
package org.signal.libsignal.internal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Function;
|
||||
import org.junit.Test;
|
||||
|
||||
public class CompletableFutureTest {
|
||||
private class CountingFunction<T, U> implements Function<T, U> {
|
||||
public CountingFunction(Function<T, U> f) {
|
||||
this.f = f;
|
||||
}
|
||||
|
||||
public U apply(T value) {
|
||||
this.applicationCount++;
|
||||
return this.f.apply(value);
|
||||
}
|
||||
|
||||
public long getApplicationCount() {
|
||||
return this.applicationCount;
|
||||
}
|
||||
|
||||
private long applicationCount = 0;
|
||||
private Function<T, U> f;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInitialState() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
assertFalse(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeout() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
assertThrows(TimeoutException.class, () -> future.get(1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuccess() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
future.complete(42);
|
||||
assertTrue(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
assertEquals(42, (int) future.get());
|
||||
assertEquals(42, (int) future.get(1, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailure() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("oh no");
|
||||
future.completeExceptionally(exception);
|
||||
assertTrue(future.isDone());
|
||||
assertFalse(future.isCancelled());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplySuccess() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
assertFalse(chained.isDone());
|
||||
future.complete(21);
|
||||
assertTrue(chained.isDone());
|
||||
assertEquals(false, chained.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyFailure() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
Exception exception = new RuntimeException("error!");
|
||||
assertFalse(chained.isDone());
|
||||
future.completeExceptionally(exception);
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyFunctionThrows() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
RuntimeException exception = new RuntimeException("error!");
|
||||
CompletableFuture<Boolean> chained =
|
||||
future.thenApply(
|
||||
(Integer i) -> {
|
||||
throw exception;
|
||||
});
|
||||
|
||||
assertFalse(chained.isDone());
|
||||
future.complete(21);
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyAfterCompletion() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
future.complete(21);
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
assertTrue(chained.isDone());
|
||||
assertEquals(false, chained.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyAfterExceptionalCompletion() {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("error!");
|
||||
future.completeExceptionally(exception);
|
||||
CompletableFuture<Boolean> chained = future.thenApply((Integer i) -> (i == 0));
|
||||
assertTrue(chained.isDone());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyAfterCompletionFunctionThrows() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
future.complete(21);
|
||||
|
||||
RuntimeException exception = new RuntimeException("error!");
|
||||
CompletableFuture<Boolean> chained =
|
||||
future.thenApply(
|
||||
(Integer i) -> {
|
||||
throw exception;
|
||||
});
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyAfterExceptionalCompletionFunctionThrows() {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("future error!");
|
||||
future.completeExceptionally(exception);
|
||||
|
||||
CompletableFuture<Boolean> chained =
|
||||
future.thenApply(
|
||||
(Integer i) -> {
|
||||
throw new RuntimeException("apply function error!");
|
||||
});
|
||||
assertTrue(chained.isDone());
|
||||
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> chained.get());
|
||||
// The function application error never gets thrown.
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThenApplyOnceFirstCompletionOnly() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
CountingFunction<Integer, Boolean> function = new CountingFunction<>((Integer i) -> (i == 0));
|
||||
CompletableFuture<Boolean> chained = future.thenApply(function);
|
||||
|
||||
assertFalse(chained.isDone());
|
||||
future.complete(55);
|
||||
|
||||
assertTrue(chained.isDone());
|
||||
future.complete(33);
|
||||
future.complete(0);
|
||||
|
||||
assertEquals(false, chained.get());
|
||||
assertEquals(function.getApplicationCount(), 1);
|
||||
}
|
||||
|
||||
// These multi-threaded tests are inherently racy in whether they actually have one thread wait()
|
||||
// and the other notify(). The observable behavior shouldn't be different, though.
|
||||
|
||||
@Test
|
||||
public void testSuccessMultiThreaded() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
|
||||
new Thread(
|
||||
() -> {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
future.complete(42);
|
||||
})
|
||||
.start();
|
||||
|
||||
assertEquals(42, (int) future.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureMultiThreaded() throws Exception {
|
||||
CompletableFuture<Integer> future = new CompletableFuture<>();
|
||||
Exception exception = new RuntimeException("oh no");
|
||||
|
||||
new Thread(
|
||||
() -> {
|
||||
try {
|
||||
Thread.sleep(200);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
future.completeExceptionally(exception);
|
||||
})
|
||||
.start();
|
||||
|
||||
ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
|
||||
assertEquals(exception, e.getCause());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user