mirror of
https://github.com/signalapp/Signal-Server.git
synced 2024-09-19 19:42:18 +02:00
remove support for deprecated messages DynamoDB key schema
This commit is contained in:
parent
bd57c1c7e7
commit
020c21f4ef
@ -118,38 +118,24 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
|
public CompletableFuture<Boolean> mayHaveMessages(final UUID accountIdentifier, final Device device) {
|
||||||
final List<CompletableFuture<Boolean>> mayHaveMessagesFutures =
|
return
|
||||||
Stream.of(convertPartitionKeyDeprecated(accountIdentifier, device), convertPartitionKey(accountIdentifier, device))
|
dbAsyncClient.query(QueryRequest.builder()
|
||||||
.distinct()
|
.tableName(tableName)
|
||||||
.map(partitionKey -> dbAsyncClient.query(QueryRequest.builder()
|
.consistentRead(false)
|
||||||
.tableName(tableName)
|
.limit(1)
|
||||||
.consistentRead(false)
|
.keyConditionExpression("#part = :part")
|
||||||
.limit(1)
|
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
|
||||||
.keyConditionExpression("#part = :part")
|
.expressionAttributeValues(Map.of(":part", convertPartitionKey(accountIdentifier, device))).build())
|
||||||
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
|
.thenApply(queryResponse -> queryResponse.count() > 0);
|
||||||
.expressionAttributeValues(Map.of(":part", partitionKey)).build())
|
|
||||||
.thenApply(queryResponse -> queryResponse.count() > 0))
|
|
||||||
.toList();
|
|
||||||
|
|
||||||
return CompletableFuture.allOf(mayHaveMessagesFutures.toArray(EMPTY_FUTURE_ARRAY))
|
|
||||||
.thenApply(ignored -> mayHaveMessagesFutures.stream().anyMatch(CompletableFuture::join));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
|
public Publisher<MessageProtos.Envelope> load(final UUID destinationAccountUuid, final Device device, final Integer limit) {
|
||||||
return Flux.concat(
|
|
||||||
Stream.of(convertPartitionKeyDeprecated(destinationAccountUuid, device), convertPartitionKey(destinationAccountUuid, device))
|
|
||||||
.distinct()
|
|
||||||
.map(pk -> load(limit, pk))
|
|
||||||
.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Publisher<MessageProtos.Envelope> load(final Integer limit, final AttributeValue partitionKey) {
|
|
||||||
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
|
QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
|
||||||
.tableName(tableName)
|
.tableName(tableName)
|
||||||
.consistentRead(true)
|
.consistentRead(true)
|
||||||
.keyConditionExpression("#part = :part")
|
.keyConditionExpression("#part = :part")
|
||||||
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
|
.expressionAttributeNames(Map.of("#part", KEY_PARTITION))
|
||||||
.expressionAttributeValues(Map.of(":part", partitionKey));
|
.expressionAttributeValues(Map.of(":part", convertPartitionKey(destinationAccountUuid, device)));
|
||||||
|
|
||||||
if (limit != null) {
|
if (limit != null) {
|
||||||
// some callers don’t take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise,
|
// some callers don’t take advantage of reactive streams, so we want to support limiting the fetch size. Otherwise,
|
||||||
@ -173,19 +159,7 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||||||
|
|
||||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
|
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
|
||||||
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
|
final UUID destinationAccountUuid, final Device destinationDevice, final UUID messageUuid) {
|
||||||
return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice),
|
final AttributeValue partitionKey = convertPartitionKey(destinationAccountUuid, destinationDevice);
|
||||||
convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice))
|
|
||||||
.distinct()
|
|
||||||
.map(pk -> deleteMessageByDestinationAndGuid(pk, messageUuid))
|
|
||||||
// this combines the futures by producing a future that returns an arbitrary nonempty
|
|
||||||
// result if there is one, which should be OK because only one of the keys
|
|
||||||
// should produce a nonempty result for any given message uuid
|
|
||||||
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
|
|
||||||
.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessageByDestinationAndGuid(
|
|
||||||
final AttributeValue partitionKey, final UUID messageUuid) {
|
|
||||||
final QueryRequest queryRequest = QueryRequest.builder()
|
final QueryRequest queryRequest = QueryRequest.builder()
|
||||||
.tableName(tableName)
|
.tableName(tableName)
|
||||||
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
|
.indexName(LOCAL_INDEX_MESSAGE_UUID_NAME)
|
||||||
@ -227,22 +201,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||||||
|
|
||||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
|
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final UUID destinationAccountUuid,
|
||||||
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
|
final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
|
||||||
return Stream.of(convertPartitionKey(destinationAccountUuid, destinationDevice),
|
|
||||||
convertPartitionKeyDeprecated(destinationAccountUuid, destinationDevice))
|
|
||||||
.distinct()
|
|
||||||
.map(pk -> deleteMessage(pk, destinationDevice, messageUuid, serverTimestamp))
|
|
||||||
// this combines the futures by producing a future that returns an arbitrary nonempty
|
|
||||||
// result if there is one, which should be OK because only one of the keys
|
|
||||||
// should produce a nonempty result for any given message uuid
|
|
||||||
.reduce((f, g) -> f.thenCombine(g, (a, b) -> a.or(() -> b)))
|
|
||||||
.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Optional<MessageProtos.Envelope>> deleteMessage(final AttributeValue partitionKey, final Device destinationDevice, final UUID messageUuid, final long serverTimestamp) {
|
|
||||||
final AttributeValue sortKey = convertSortKey(serverTimestamp, messageUuid);
|
|
||||||
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
|
DeleteItemRequest.Builder deleteItemRequest = DeleteItemRequest.builder()
|
||||||
.tableName(tableName)
|
.tableName(tableName)
|
||||||
.key(Map.of(KEY_PARTITION, partitionKey, KEY_SORT, sortKey))
|
.key(Map.of(KEY_PARTITION, convertPartitionKey(destinationAccountUuid, destinationDevice), KEY_SORT, convertSortKey(serverTimestamp, messageUuid)))
|
||||||
.returnValues(ReturnValue.ALL_OLD);
|
.returnValues(ReturnValue.ALL_OLD);
|
||||||
|
|
||||||
return dbAsyncClient.deleteItem(deleteItemRequest.build())
|
return dbAsyncClient.deleteItem(deleteItemRequest.build())
|
||||||
@ -278,14 +239,6 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
|
|||||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AttributeValue convertPartitionKeyDeprecated(final UUID destinationAccountUuid, final Device destinationDevice) {
|
|
||||||
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
|
|
||||||
byteBuffer.putLong(destinationAccountUuid.getMostSignificantBits());
|
|
||||||
byteBuffer.putLong(destinationAccountUuid.getLeastSignificantBits());
|
|
||||||
byteBuffer.putLong(destinationDevice.getCreated() & ~0x7f + destinationDevice.getId());
|
|
||||||
return AttributeValues.fromByteBuffer(byteBuffer.flip());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static AttributeValue convertSortKey(final long serverTimestamp, final UUID messageUuid) {
|
private static AttributeValue convertSortKey(final long serverTimestamp, final UUID messageUuid) {
|
||||||
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
|
final ByteBuffer byteBuffer = ByteBuffer.allocate(24);
|
||||||
byteBuffer.putLong(serverTimestamp);
|
byteBuffer.putLong(serverTimestamp);
|
||||||
|
Loading…
Reference in New Issue
Block a user