0
0
mirror of https://github.com/signalapp/Signal-Server.git synced 2024-09-19 19:42:18 +02:00

Remove migration paths for lazy message deletion

This commit is contained in:
Jonathan Klabunde Tomer 2024-07-23 14:07:19 -07:00 committed by GitHub
parent 6eed458ceb
commit f12a6ff73f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 34 additions and 345 deletions

View File

@ -397,7 +397,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getMessages().getTableName(),
config.getDynamoDbTables().getMessages().getExpiration(),
dynamicConfigurationManager,
messageDeletionAsyncExecutor);
RemoteConfigs remoteConfigs = new RemoteConfigs(dynamoDbClient,
config.getDynamoDbTables().getRemoteConfig().getTableName());

View File

@ -29,8 +29,6 @@ import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration.DynamoKeyScheme;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import reactor.core.publisher.Flux;
@ -69,27 +67,22 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private final Timer storeTimer = timer(name(getClass(), "store"));
private final String DELETE_BY_ACCOUNT_TIMER_NAME = name(getClass(), "delete", "account");
private final String DELETE_BY_DEVICE_TIMER_NAME = name(getClass(), "delete", "device");
private final String MESSAGES_STORED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesStored");
private final String MESSAGES_LOADED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesLoaded");
private final String MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME = name(getClass(), "messagesDeleted");
private final DynamoDbAsyncClient dbAsyncClient;
private final String tableName;
private final Duration timeToLive;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfig;
private final ExecutorService messageDeletionExecutor;
private final Scheduler messageDeletionScheduler;
private static final Logger logger = LoggerFactory.getLogger(MessagesDynamoDb.class);
public MessagesDynamoDb(DynamoDbClient dynamoDb, DynamoDbAsyncClient dynamoDbAsyncClient, String tableName,
Duration timeToLive, DynamicConfigurationManager<DynamicConfiguration> dynamicConfig, ExecutorService messageDeletionExecutor) {
Duration timeToLive, ExecutorService messageDeletionExecutor) {
super(dynamoDb);
this.dbAsyncClient = dynamoDbAsyncClient;
this.tableName = tableName;
this.timeToLive = timeToLive;
this.dynamicConfig = dynamicConfig;
this.messageDeletionExecutor = messageDeletionExecutor;
this.messageDeletionScheduler = Schedulers.fromExecutor(messageDeletionExecutor);
@ -102,20 +95,18 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private void storeBatch(final List<MessageProtos.Envelope> messages, final UUID destinationAccountUuid,
final Device destinationDevice) {
final byte destinationDeviceId = destinationDevice.getId();
if (messages.size() > DYNAMO_DB_MAX_BATCH_SIZE) {
throw new IllegalArgumentException("Maximum batch size of " + DYNAMO_DB_MAX_BATCH_SIZE + " exceeded with " + messages.size() + " messages");
}
final DynamoKeyScheme scheme = dynamicConfig.getConfiguration().getMessagesConfiguration().writeKeyScheme();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
List<WriteRequest> writeItems = new ArrayList<>();
for (MessageProtos.Envelope message : messages) {
final UUID messageUuid = UUID.fromString(message.getServerGuid());
final ImmutableMap.Builder<String, AttributeValue> item = ImmutableMap.<String, AttributeValue>builder()
.put(KEY_PARTITION, partitionKey)
.put(KEY_SORT, convertSortKey(destinationDevice.getId(), message.getServerTimestamp(), messageUuid, scheme))
.put(KEY_SORT, convertSortKey(destinationDevice.getId(), message.getServerTimestamp(), messageUuid))
.put(LOCAL_INDEX_MESSAGE_UUID_KEY_SORT, convertLocalIndexMessageUuidSortKey(messageUuid))
.put(KEY_TTL, AttributeValues.fromLong(getTtlForMessage(message)))
.put(KEY_ENVELOPE_BYTES, AttributeValue.builder().b(SdkBytes.fromByteArray(message.toByteArray())).build());
@ -126,77 +117,31 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}
executeTableWriteItemsUntilComplete(Map.of(tableName, writeItems));
Metrics.counter(MESSAGES_STORED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment(writeItems.size());
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
return Flux.fromIterable(dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes())
.flatMap(scheme -> mayHaveMessages(accountIdentifier, device, scheme))
.any(mayHaveMessages -> mayHaveMessages)
.toFuture();
}
private Mono<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device, final DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device, scheme);
final AttributeValue partitionKey = convertPartitionKey(accountIdentifier, device);
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(false)
.limit(1);
.limit(1)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
queryRequestBuilder = switch (scheme) {
case TRADITIONAL -> queryRequestBuilder
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(device.getId(), scheme)));
case LAZY_DELETION -> queryRequestBuilder
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
};
return Mono.fromFuture(dbAsyncClient.query(queryRequestBuilder.build())
.thenApply(queryResponse -> queryResponse.count() > 0));
return dbAsyncClient.query(queryRequestBuilder.build())
.thenApply(queryResponse -> queryResponse.count() > 0);
}
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
return Flux.concat(
dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> load(destinationAccountUuid, device, limit, scheme))
.toList())
.map(messageAndScheme -> {
Metrics.counter(MESSAGES_LOADED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", messageAndScheme.getT2().name())).increment();
return messageAndScheme.getT1();
});
}
private Publisher<Tuple2<MessageProtos.Envelope, DynamoKeyScheme>> load(final UUID destinationAccountUuid, final Device device, final Integer limit, final DynamoKeyScheme scheme) {
final byte destinationDeviceId = device.getId();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device, scheme);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device);
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
.tableName(tableName)
.consistentRead(true);
queryRequestBuilder = switch (scheme) {
case TRADITIONAL -> queryRequestBuilder
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, scheme)));
case LAZY_DELETION -> queryRequestBuilder
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
};
.consistentRead(true)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey));
if (limit != null) {
// some callers dont take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise,
@ -215,25 +160,12 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return null;
}
})
.filter(Predicate.not(Objects::isNull))
.map(m -> Tuples.of(m, scheme));
.filter(Predicate.not(Objects::isNull));
}
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> deleteMessageByDestinationAndGuid(destinationAccountUuid, destinationDevice, messageUuid, scheme))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keying schemes
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.get(); // there is always at least one scheme
}
private CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
final QueryRequest queryRequest = QueryRequest.builder()
.tableName(tableName)
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
@ -260,7 +192,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.mapNotNull(deleteItemResponse -> {
try {
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
Metrics.counter(MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment();
return convertItemToEnvelope(deleteItemResponse.attributes());
}
} catch (final InvalidProtocolBufferException e) {
@ -276,20 +207,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
return dynamicConfig.getConfiguration().getMessagesConfiguration().dynamoKeySchemes()
.stream()
.map(scheme -> deleteMessage(destinationAccountUuid, destinationDevice, messageUuid, serverTimestamp, scheme))
// this combines the futures by producing a future that returns an arbitrary nonempty
// result if there is one, which should be OK because only one of the keying schemes
// should produce a nonempty result for any given message uuid
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
.orElseThrow(); // there is always at least one scheme
}
private CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp, final DynamoKeyScheme scheme) {
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice, scheme);
final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid, scheme);
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid);
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey))
@ -299,7 +218,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
.thenApplyAsync(deleteItemResponse -> {
if (deleteItemResponse.attributes() != null && deleteItemResponse.attributes().containsKey(KEY_PARTITION)) {
try {
Metrics.counter(MESSAGES_DELETED_BY_SCHEME_COUNTER_NAME, Tags.of("scheme", scheme.name())).increment();
return Optional.of(convertItemToEnvelope(deleteItemResponse.attributes()));
} catch (final InvalidProtocolBufferException e) {
logger.error("Failed to parse envelope", e);
@ -310,69 +228,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
}, messageDeletionExecutor);
}
// Deletes all messages stored for the supplied account that were stored under the traditional (uuid+device id) keying scheme.
// Messages stored under the lazy-message-deletion keying scheme will not be affected.
public CompletableFuture<Void> deleteAllMessagesForAccount(final UUID destinationAccountUuid) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.projectionExpression(KEY_SORT)
.consistentRead(true)
.keyConditionExpression("#part = :part")
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
.expressionAttributeValues(Map.of(":part", partitionKey))
.build())
.items())
.flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT)))
.build())),
DYNAMO_DB_MAX_BATCH_SIZE)
.then()
.doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "error")))
.toFuture();
}
// Deletes all messages stored for the supplied account and device that were stored under the
// traditional (uuid+device id) keying scheme. Messages stored under the lazy-message-deletion
// keying scheme will not be affected.
public CompletableFuture<Void> deleteAllMessagesForDevice(final UUID destinationAccountUuid,
final byte destinationDeviceId) {
final Timer.Sample sample = Timer.start();
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, null, DynamoKeyScheme.TRADITIONAL);
return Flux.from(dbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#part = :part AND begins_with ( #sort , :sortprefix )")
.expressionAttributeNames(Map.of(
"#part", KEY_PARTITION,
"#sort", KEY_SORT))
.expressionAttributeValues(Map.of(
":part", partitionKey,
":sortprefix", convertDestinationDeviceIdToSortKeyPrefix(destinationDeviceId, DynamoKeyScheme.TRADITIONAL)))
.projectionExpression(KEY_SORT)
.consistentRead(true)
.build())
.items())
.flatMap(item -> Mono.fromFuture(() -> dbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_PARTITION, partitionKey,
KEY_SORT, item.get(KEY_SORT)))
.build())),
DYNAMO_DB_MAX_BATCH_SIZE)
.then()
.doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "error")))
.toFuture();
}
@VisibleForTesting
static MessageProtos.Envelope convertItemToEnvelope(final Map<String, AttributeValue> item)
throws InvalidProtocolBufferException {
@ -384,40 +239,22 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
return message.getServerTimestamp() / 1000 + timeToLive.getSeconds();
}
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid, final Device destinationDevice, final DynamoKeyScheme scheme) {
return switch (scheme) {
case TRADITIONAL -> AttributeValues.fromUUID(destinationAccountUuid);
case LAZY_DELETION -> {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits());
byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits());
byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId());
yield AttributeValues.fromByteBuffer(byteBuffer.flip());
}
};
private static AttributeValue convertPartitionKey(final UUID destinationAccountUuid, final Device destinationDevice) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits());
byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits());
byteBuffer.putLong((destinationDevice.getCreated() & ~0x7f) + destinationDevice.getId());
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp,
final UUID messageUuid, final DynamoKeyScheme scheme) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(32);
if (scheme == DynamoKeyScheme.TRADITIONAL) {
// for compatibility - destinationDeviceId was previously `long`
byteBuffer.putLong(destinationDeviceId);
}
private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, final UUID messageUuid) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
byteBuffer.putLong(serverTimestamp);
byteBuffer.putLong(messageUuid.getMostSignificantBits());
byteBuffer.putLong(messageUuid.getLeastSignificantBits());
return AttributeValues.fromByteBuffer(byteBuffer.flip());
}
private static AttributeValue convertDestinationDeviceIdToSortKeyPrefix(final byte destinationDeviceId, final DynamoKeyScheme scheme) {
return switch (scheme) {
case TRADITIONAL -> AttributeValues.fromByteBuffer(ByteBuffer.allocate(8).putLong(destinationDeviceId).flip());
case LAZY_DELETION -> AttributeValues.b(new byte[0]);
};
}
private static AttributeValue convertLocalIndexMessageUuidSortKey(final UUID messageUuid) {
return AttributeValues.fromUUID(messageUuid);
}

View File

@ -107,15 +107,11 @@ public class MessagesManager {
}
public CompletableFuture<Void> clear(UUID destinationUuid) {
return CompletableFuture.allOf(
messagesCache.clear(destinationUuid),
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid));
return messagesCache.clear(destinationUuid);
}
public CompletableFuture<Void> clear(UUID destinationUuid, byte deviceId) {
return CompletableFuture.allOf(
messagesCache.clear(destinationUuid, deviceId),
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, deviceId));
return messagesCache.clear(destinationUuid, deviceId);
}
public CompletableFuture<Optional<Envelope>> delete(UUID destinationUuid, Device destinationDevice, UUID guid,

View File

@ -169,7 +169,6 @@ record CommandDependencies(
MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
dynamicConfigurationManager,
messageDeletionExecutor);
FaultTolerantRedisCluster messagesCluster = configuration.getMessageCacheConfiguration()
.getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder);

View File

@ -76,7 +76,7 @@ class MessagePersisterIntegrationTest {
messageDeletionExecutorService = Executors.newSingleThreadExecutor();
final MessagesDynamoDb messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(14),
dynamicConfigurationManager, messageDeletionExecutorService);
messageDeletionExecutorService);
final AccountsManager accountsManager = mock(AccountsManager.class);
notificationExecutorService = Executors.newSingleThreadExecutor();

View File

@ -25,11 +25,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables;
import org.whispersystems.textsecuregcm.tests.util.DevicesHelper;
@ -80,7 +77,6 @@ class MessagesDynamoDbTest {
}
private ExecutorService messageDeletionExecutorService;
private DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private MessagesDynamoDb messagesDynamoDb;
@RegisterExtension
@ -89,11 +85,9 @@ class MessagesDynamoDbTest {
@BeforeEach
void setup() {
messageDeletionExecutorService = Executors.newSingleThreadExecutor();
dynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(14),
dynamicConfigurationManager, messageDeletionExecutorService);
messageDeletionExecutorService);
}
@AfterEach
@ -195,61 +189,6 @@ class MessagesDynamoDbTest {
.verify();
}
@Test
void testDeleteForDestination() {
final UUID destinationUuid = UUID.randomUUID();
final UUID secondDestinationUuid = UUID.randomUUID();
final Device primary = DevicesHelper.createDevice((byte) 1);
final Device device2 = DevicesHelper.createDevice((byte) 2);
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary);
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, primary);
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, device2);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).isEqualTo(MESSAGE1);
assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).isEqualTo(MESSAGE3);
assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).join();
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().isEmpty();
assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
}
@Test
void testDeleteForDestinationDevice() {
final UUID destinationUuid = UUID.randomUUID();
final UUID secondDestinationUuid = UUID.randomUUID();
final Device primary = DevicesHelper.createDevice((byte) 1);
final Device device2 = DevicesHelper.createDevice((byte) 2);
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary);
messagesDynamoDb.store(List.of(MESSAGE2), secondDestinationUuid, primary);
messagesDynamoDb.store(List.of(MESSAGE3), destinationUuid, device2);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).isEqualTo(MESSAGE1);
assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1)
.element(0).isEqualTo(MESSAGE3);
assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, device2.getId()).join();
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull().hasSize(1)
.element(0).isEqualTo(MESSAGE1);
assertThat(load(destinationUuid, device2, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.isEmpty();
assertThat(load(secondDestinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).isNotNull()
.hasSize(1).element(0).isEqualTo(MESSAGE2);
}
@Test
void testDeleteMessageByDestinationAndGuid() throws Exception {
final UUID destinationUuid = UUID.randomUUID();
@ -330,66 +269,12 @@ class MessagesDynamoDbTest {
.block();
}
@Test
void testMessageKeySchemeMigration() throws Exception {
final UUID destinationUuid = UUID.randomUUID();
final Device primary = DevicesHelper.createDevice((byte) 1);
// store message 1 in old scheme
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- TRADITIONAL
""", DynamicConfiguration.class));
messagesDynamoDb.store(List.of(MESSAGE1), destinationUuid, primary);
// store message 2 in new scheme during migration
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- TRADITIONAL
- LAZY_DELETION
""", DynamicConfiguration.class));
messagesDynamoDb.store(List.of(MESSAGE2), destinationUuid, primary);
// in old scheme, we should only get message 1 back (we would never actually do this, it's just a way to prove we used the new scheme for message 2)
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- TRADITIONAL
""", DynamicConfiguration.class));
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE1);
// during migration we should get both messages back in order
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- TRADITIONAL
- LAZY_DELETION
""", DynamicConfiguration.class));
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE1, MESSAGE2);
// after migration we would only get message 2 back (we shouldn't do this either in practice)
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- LAZY_DELETION
""", DynamicConfiguration.class));
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE)).containsExactly(MESSAGE2);
}
@Test
void testLazyMessageDeletion() throws Exception {
final UUID destinationUuid = UUID.randomUUID();
final Device primary = DevicesHelper.createDevice((byte) 1);
primary.setCreated(System.currentTimeMillis());
when(dynamicConfigurationManager.getConfiguration()).thenReturn(SystemMapper.yamlMapper().readValue("""
messagesConfiguration:
dynamoKeySchemes:
- LAZY_DELETION
""", DynamicConfiguration.class));
messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, primary);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.as("load should return all messages stored").containsOnly(MESSAGE1, MESSAGE2, MESSAGE3);
@ -404,45 +289,22 @@ class MessagesDynamoDbTest {
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.as("deleting message by guid and timestamp should work").containsExactly(MESSAGE3);
messagesDynamoDb.deleteAllMessagesForDevice(destinationUuid, (byte) 1).get(1, TimeUnit.SECONDS);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.as("deleting all messages for device should do nothing").containsExactly(MESSAGE3);
messagesDynamoDb.deleteAllMessagesForAccount(destinationUuid).get(1, TimeUnit.SECONDS);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.as("deleting all messages for account should do nothing").containsExactly(MESSAGE3);
primary.setCreated(primary.getCreated() + 1000);
assertThat(load(destinationUuid, primary, MessagesDynamoDb.RESULT_SET_CHUNK_SIZE))
.as("devices with the same id but different create timestamps should see no messages")
.isEmpty();
}
@ParameterizedTest
@MethodSource
void mayHaveMessages(final List<DynamicMessagesConfiguration.DynamoKeyScheme> schemes) {
@Test
void mayHaveMessages() {
final UUID destinationUuid = UUID.randomUUID();
final byte destinationDeviceId = (byte) (random.nextInt(Device.MAXIMUM_DEVICE_ID) + 1);
final Device destinationDevice = DevicesHelper.createDevice(destinationDeviceId);
final DynamicConfiguration dynamicConfiguration = mock(DynamicConfiguration.class);
when(dynamicConfiguration.getMessagesConfiguration()).thenReturn(new DynamicMessagesConfiguration(schemes));
when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration);
assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isFalse();
messagesDynamoDb.store(List.of(MESSAGE1, MESSAGE2, MESSAGE3), destinationUuid, destinationDevice);
assertThat(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice).join()).isTrue();
}
private static List<List<DynamicMessagesConfiguration.DynamoKeyScheme>> mayHaveMessages() {
return List.of(
List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL),
List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION),
List.of(DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL, DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION),
List.of(DynamicMessagesConfiguration.DynamoKeyScheme.LAZY_DELETION, DynamicMessagesConfiguration.DynamoKeyScheme.TRADITIONAL)
);
}
}

View File

@ -44,7 +44,6 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AuthenticatedAccount;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
@ -88,9 +87,6 @@ class WebSocketConnectionIntegrationTest {
@BeforeEach
void setUp() throws Exception {
final DynamicConfigurationManager<DynamicConfiguration> mockDynamicConfigurationManager = mock(DynamicConfigurationManager.class);
when(mockDynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration());
sharedExecutorService = Executors.newSingleThreadExecutor();
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
@ -98,7 +94,7 @@ class WebSocketConnectionIntegrationTest {
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(),
DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7),
mockDynamicConfigurationManager, sharedExecutorService);
sharedExecutorService);
reportMessageManager = mock(ReportMessageManager.class);
account = mock(Account.class);
device = mock(Device.class);