diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java index 46f1c068..deec76ea 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesDynamoDb.java @@ -26,6 +26,8 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Predicate; +import java.util.stream.Stream; + import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +137,14 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { } public Publisher load(final UUID destinationAccountUuid, final Device device, final Integer limit) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, device); + return Flux.concat( + Stream.of(convertPartitionKeyDeprecated(destinationAccountUuid, device), convertPartitionKey(destinationAccountUuid, device)) + .distinct() + .map(pk -> load(limit, pk)) + .toList()); + } + + public Publisher load(final Integer limit, final AttributeValue partitionKey) { QueryRequest.Builder queryRequestBuilder = QueryRequest.builder() .tableName(tableName) .consistentRead(true) @@ -165,7 +174,19 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public CompletableFuture> deleteMessageByDestinationAndGuid( final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); + return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), + convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) + .distinct() + .map(pk -> deleteMessageByDestinationAndGuid(pk, messageUuid)) + // this combines the futures by producing a future that returns an arbitrary nonempty + // result if there is one, which should be OK because only one of the keys + // should produce a nonempty result for any given message uuid + .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) + .get(); + } + + public CompletableFuture> deleteMessageByDestinationAndGuid( + final AttributeValue partitionKey, final UUID messageUuid) { final QueryRequest queryRequest = QueryRequest.builder() .tableName(tableName) .indexName(LOCAL_INDEX_MESSAGE_UUID_NAME) @@ -207,7 +228,18 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { public CompletableFuture> deleteMessage(final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { - final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice); + return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice), + convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice)) + .distinct() + .map(pk -> deleteMessage(pk, destinationDevice, messageUuid, serverTimestamp)) + // this combines the futures by producing a future that returns an arbitrary nonempty + // result if there is one, which should be OK because only one of the keys + // should produce a nonempty result for any given message uuid + .reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b))) + .get(); + } + + public CompletableFuture> deleteMessage(final AttributeValue partitionKey, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) { final AttributeValue sortKey = convertSortKey(destinationDevice.getId(), serverTimestamp, messageUuid); DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) @@ -247,6 +279,14 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore { return AttributeValues.fromByteBuffer(byteBuffer.flip()); } + private static AttributeValue convertPartitionKeyDeprecated(final UUID destinationAccountUuid, final Device destinationDevice) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(24); + byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits()); + byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits()); + byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId()); + return AttributeValues.fromByteBuffer(byteBuffer.flip()); + } + private static AttributeValue convertSortKey(final byte destinationDeviceId, final long serverTimestamp, final UUID messageUuid) { final ByteBuffer byteBuffer = ByteBuffer.allocate(24); byteBuffer.putLong(serverTimestamp);