diff --git a/protobuf/Makefile b/protobuf/Makefile index 117b6672..6f143a0c 100644 --- a/protobuf/Makefile +++ b/protobuf/Makefile @@ -1,3 +1,3 @@ all: - protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto PubSubMessage.proto StoredMessage.proto + protoc --java_out=../src/main/java/ OutgoingMessageSignal.proto PubSubMessage.proto \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 10195ba8..df4027c6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -67,10 +67,11 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private DirectoryConfiguration directory; - @NotNull @Valid + @NotNull @JsonProperty - private MessageStoreConfiguration messageStore; + private DataSourceFactory messageStore; + @Valid @JsonProperty @@ -135,7 +136,7 @@ public class WhisperServerConfiguration extends Configuration { return directory; } - public MessageStoreConfiguration getMessageStoreConfiguration() { + public DataSourceFactory getMessageStoreConfiguration() { return messageStore; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 5e1978bf..1efca0b1 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -67,18 +67,20 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; import org.whispersystems.textsecuregcm.storage.DirectoryManager; import org.whispersystems.textsecuregcm.storage.Keys; +import org.whispersystems.textsecuregcm.storage.Messages; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PendingAccounts; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; import org.whispersystems.textsecuregcm.storage.PendingDevices; import org.whispersystems.textsecuregcm.storage.PendingDevicesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.UrlSigner; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.ProvisioningConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.DirectoryCommand; +import org.whispersystems.textsecuregcm.liquibase.NameableMigrationsBundle; import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; import org.whispersystems.websocket.setup.WebSocketEnvironment; @@ -96,7 +98,6 @@ import io.dropwizard.client.JerseyClientBuilder; import io.dropwizard.db.DataSourceFactory; import io.dropwizard.jdbi.DBIFactory; import io.dropwizard.metrics.graphite.GraphiteReporterFactory; -import io.dropwizard.migrations.MigrationsBundle; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; import redis.clients.jedis.JedisPool; @@ -111,12 +112,19 @@ public class WhisperServerService extends Application bootstrap) { bootstrap.addCommand(new DirectoryCommand()); bootstrap.addCommand(new VacuumCommand()); - bootstrap.addBundle(new MigrationsBundle() { + bootstrap.addBundle(new NameableMigrationsBundle("accountdb", "accountsdb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { return configuration.getDataSourceFactory(); } }); + + bootstrap.addBundle(new NameableMigrationsBundle("messagedb", "messagedb.xml") { + @Override + public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { + return configuration.getMessageStoreConfiguration(); + } + }); } @Override @@ -132,16 +140,17 @@ public class WhisperServerService extends Application authorizationKey; @@ -77,7 +76,7 @@ public class AccountController { AccountsManager accounts, RateLimiters rateLimiters, SmsSender smsSenderFactory, - StoredMessages storedMessages, + MessagesManager messagesManager, TimeProvider timeProvider, Optional authorizationKey) { @@ -85,7 +84,7 @@ public class AccountController { this.accounts = accounts; this.rateLimiters = rateLimiters; this.smsSender = smsSenderFactory; - this.storedMessages = storedMessages; + this.messagesManager = messagesManager; this.timeProvider = timeProvider; this.authorizationKey = authorizationKey; } @@ -257,7 +256,7 @@ public class AccountController { account.addDevice(device); accounts.create(account); - storedMessages.clear(new WebsocketAddress(number, Device.MASTER_ID)); + messagesManager.clear(number); pendingAccounts.remove(number); logger.debug("Stored device..."); diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/AbstractLiquibaseCommand.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/AbstractLiquibaseCommand.java new file mode 100644 index 00000000..8e7a06d7 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/AbstractLiquibaseCommand.java @@ -0,0 +1,65 @@ +package org.whispersystems.textsecuregcm.liquibase; + +import com.codahale.metrics.MetricRegistry; +import net.sourceforge.argparse4j.inf.Namespace; + +import java.sql.SQLException; + +import io.dropwizard.Configuration; +import io.dropwizard.cli.ConfiguredCommand; +import io.dropwizard.db.DataSourceFactory; +import io.dropwizard.db.DatabaseConfiguration; +import io.dropwizard.db.ManagedDataSource; +import io.dropwizard.setup.Bootstrap; +import liquibase.Liquibase; +import liquibase.exception.LiquibaseException; +import liquibase.exception.ValidationFailedException; + +public abstract class AbstractLiquibaseCommand extends ConfiguredCommand { + + private final DatabaseConfiguration strategy; + private final Class configurationClass; + private final String migrations; + + protected AbstractLiquibaseCommand(String name, + String description, + String migrations, + DatabaseConfiguration strategy, + Class configurationClass) { + super(name, description); + this.migrations = migrations; + this.strategy = strategy; + this.configurationClass = configurationClass; + } + + @Override + protected Class getConfigurationClass() { + return configurationClass; + } + + @Override + @SuppressWarnings("UseOfSystemOutOrSystemErr") + protected void run(Bootstrap bootstrap, Namespace namespace, T configuration) throws Exception { + final DataSourceFactory dbConfig = strategy.getDataSourceFactory(configuration); + dbConfig.setMaxSize(1); + dbConfig.setMinSize(1); + dbConfig.setInitialSize(1); + + try (final CloseableLiquibase liquibase = openLiquibase(dbConfig, namespace)) { + run(namespace, liquibase); + } catch (ValidationFailedException e) { + e.printDescriptiveError(System.err); + throw e; + } + } + + private CloseableLiquibase openLiquibase(final DataSourceFactory dataSourceFactory, final Namespace namespace) + throws ClassNotFoundException, SQLException, LiquibaseException + { + final ManagedDataSource dataSource = dataSourceFactory.build(new MetricRegistry(), "liquibase"); + return new CloseableLiquibase(dataSource, migrations); + } + + protected abstract void run(Namespace namespace, Liquibase liquibase) throws Exception; + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/CloseableLiquibase.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/CloseableLiquibase.java new file mode 100644 index 00000000..f54bc50b --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/CloseableLiquibase.java @@ -0,0 +1,28 @@ +package org.whispersystems.textsecuregcm.liquibase; + +import java.sql.SQLException; + +import io.dropwizard.db.ManagedDataSource; +import liquibase.Liquibase; +import liquibase.database.jvm.JdbcConnection; +import liquibase.exception.LiquibaseException; +import liquibase.resource.ClassLoaderResourceAccessor; + + +public class CloseableLiquibase extends Liquibase implements AutoCloseable { + private final ManagedDataSource dataSource; + + public CloseableLiquibase(ManagedDataSource dataSource, String migrations) + throws LiquibaseException, ClassNotFoundException, SQLException + { + super(migrations, + new ClassLoaderResourceAccessor(), + new JdbcConnection(dataSource.getConnection())); + this.dataSource = dataSource; + } + + @Override + public void close() throws Exception { + dataSource.stop(); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbMigrateCommand.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbMigrateCommand.java new file mode 100644 index 00000000..eed6c020 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbMigrateCommand.java @@ -0,0 +1,72 @@ +package org.whispersystems.textsecuregcm.liquibase; + + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; + +import java.io.OutputStreamWriter; +import java.util.List; + +import io.dropwizard.Configuration; +import io.dropwizard.db.DatabaseConfiguration; +import liquibase.Liquibase; + +public class DbMigrateCommand extends AbstractLiquibaseCommand { + + public DbMigrateCommand(String migration, DatabaseConfiguration strategy, Class configurationClass) { + super("migrate", "Apply all pending change sets.", migration, strategy, configurationClass); + } + + @Override + public void configure(Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("-n", "--dry-run") + .action(Arguments.storeTrue()) + .dest("dry-run") + .setDefault(Boolean.FALSE) + .help("output the DDL to stdout, don't run it"); + + subparser.addArgument("-c", "--count") + .type(Integer.class) + .dest("count") + .help("only apply the next N change sets"); + + subparser.addArgument("-i", "--include") + .action(Arguments.append()) + .dest("contexts") + .help("include change sets from the given context"); + } + + @Override + @SuppressWarnings("UseOfSystemOutOrSystemErr") + public void run(Namespace namespace, Liquibase liquibase) throws Exception { + final String context = getContext(namespace); + final Integer count = namespace.getInt("count"); + final Boolean dryRun = namespace.getBoolean("dry-run"); + if (count != null) { + if (dryRun) { + liquibase.update(count, context, new OutputStreamWriter(System.out, Charsets.UTF_8)); + } else { + liquibase.update(count, context); + } + } else { + if (dryRun) { + liquibase.update(context, new OutputStreamWriter(System.out, Charsets.UTF_8)); + } else { + liquibase.update(context); + } + } + } + + private String getContext(Namespace namespace) { + final List contexts = namespace.getList("contexts"); + if (contexts == null) { + return ""; + } + return Joiner.on(',').join(contexts); + } +} \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbStatusCommand.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbStatusCommand.java new file mode 100644 index 00000000..045e863a --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/DbStatusCommand.java @@ -0,0 +1,51 @@ +package org.whispersystems.textsecuregcm.liquibase; + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; + +import java.io.OutputStreamWriter; +import java.util.List; + +import io.dropwizard.Configuration; +import io.dropwizard.db.DatabaseConfiguration; +import liquibase.Liquibase; + +public class DbStatusCommand extends AbstractLiquibaseCommand { + + public DbStatusCommand(String migrations, DatabaseConfiguration strategy, Class configurationClass) { + super("status", "Check for pending change sets.", migrations, strategy, configurationClass); + } + + @Override + public void configure(Subparser subparser) { + super.configure(subparser); + + subparser.addArgument("-v", "--verbose") + .action(Arguments.storeTrue()) + .dest("verbose") + .help("Output verbose information"); + subparser.addArgument("-i", "--include") + .action(Arguments.append()) + .dest("contexts") + .help("include change sets from the given context"); + } + + @Override + @SuppressWarnings("UseOfSystemOutOrSystemErr") + public void run(Namespace namespace, Liquibase liquibase) throws Exception { + liquibase.reportStatus(namespace.getBoolean("verbose"), + getContext(namespace), + new OutputStreamWriter(System.out, Charsets.UTF_8)); + } + + private String getContext(Namespace namespace) { + final List contexts = namespace.getList("contexts"); + if (contexts == null) { + return ""; + } + return Joiner.on(',').join(contexts); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableDbCommand.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableDbCommand.java new file mode 100644 index 00000000..e188b764 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableDbCommand.java @@ -0,0 +1,44 @@ +package org.whispersystems.textsecuregcm.liquibase; + +import com.google.common.collect.Maps; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; + +import java.util.SortedMap; + +import io.dropwizard.Configuration; +import io.dropwizard.db.DatabaseConfiguration; +import liquibase.Liquibase; + +public class NameableDbCommand extends AbstractLiquibaseCommand { + private static final String COMMAND_NAME_ATTR = "subcommand"; + private final SortedMap> subcommands; + + public NameableDbCommand(String name, String migrations, DatabaseConfiguration strategy, Class configurationClass) { + super(name, "Run database migrations tasks", migrations, strategy, configurationClass); + this.subcommands = Maps.newTreeMap(); + addSubcommand(new DbMigrateCommand<>(migrations, strategy, configurationClass)); + addSubcommand(new DbStatusCommand<>(migrations, strategy, configurationClass)); + } + + private void addSubcommand(AbstractLiquibaseCommand subcommand) { + subcommands.put(subcommand.getName(), subcommand); + } + + @Override + public void configure(Subparser subparser) { + for (AbstractLiquibaseCommand subcommand : subcommands.values()) { + final Subparser cmdParser = subparser.addSubparsers() + .addParser(subcommand.getName()) + .setDefault(COMMAND_NAME_ATTR, subcommand.getName()) + .description(subcommand.getDescription()); + subcommand.configure(cmdParser); + } + } + + @Override + public void run(Namespace namespace, Liquibase liquibase) throws Exception { + final AbstractLiquibaseCommand subcommand = subcommands.get(namespace.getString(COMMAND_NAME_ATTR)); + subcommand.run(namespace, liquibase); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableMigrationsBundle.java b/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableMigrationsBundle.java new file mode 100644 index 00000000..64e277f2 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/liquibase/NameableMigrationsBundle.java @@ -0,0 +1,27 @@ +package org.whispersystems.textsecuregcm.liquibase; + +import io.dropwizard.Bundle; +import io.dropwizard.Configuration; +import io.dropwizard.db.DatabaseConfiguration; +import io.dropwizard.setup.Bootstrap; +import io.dropwizard.setup.Environment; +import io.dropwizard.util.Generics; + +public abstract class NameableMigrationsBundle implements Bundle, DatabaseConfiguration { + + private final String name; + private final String migrations; + + public NameableMigrationsBundle(String name, String migrations) { + this.name = name; + this.migrations = migrations; + } + + public final void initialize(Bootstrap bootstrap) { + Class klass = Generics.getTypeParameter(this.getClass(), Configuration.class); + bootstrap.addCommand(new NameableDbCommand(name, migrations, this, klass)); + } + + public final void run(Environment environment) { + } +} \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 74f5b950..64499dde 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -22,6 +22,7 @@ import org.whispersystems.textsecuregcm.entities.ApnMessage; import org.whispersystems.textsecuregcm.entities.CryptoEncodingException; import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage; import org.whispersystems.textsecuregcm.entities.GcmMessage; +import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; @@ -31,7 +32,7 @@ public class PushSender { private final Logger logger = LoggerFactory.getLogger(PushSender.class); - private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"alert\":{\"loc-key\":\"APN_Message\"},\"content-available\":1,\"category\":\"Signal_Message\"}}"; + private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}"; private final PushServiceClient pushServiceClient; private final WebsocketSender webSocketSender; @@ -75,11 +76,11 @@ public class PushSender { private void sendApnMessage(Account account, Device device, OutgoingMessageSignal outgoingMessage) throws TransientPushFailureException { - boolean online = webSocketSender.sendMessage(account, device, outgoingMessage, true); + DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, true); - if (!online && outgoingMessage.getType() != OutgoingMessageSignal.Type.RECEIPT_VALUE) { - ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), - (int)device.getId(), APN_PAYLOAD); + if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != OutgoingMessageSignal.Type.RECEIPT_VALUE) { + ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(), + String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth())); pushServiceClient.send(apnMessage); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java index 4ea4f3e5..d3adece6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/WebsocketSender.java @@ -24,8 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.websocket.ProvisioningAddress; import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; @@ -49,15 +49,15 @@ public class WebsocketSender { private final Meter provisioningOnlineMeter = metricRegistry.meter(name(getClass(), "provisioning_online" )); private final Meter provisioningOfflineMeter = metricRegistry.meter(name(getClass(), "provisioning_offline")); - private final StoredMessages storedMessages; - private final PubSubManager pubSubManager; + private final MessagesManager messagesManager; + private final PubSubManager pubSubManager; - public WebsocketSender(StoredMessages storedMessages, PubSubManager pubSubManager) { - this.storedMessages = storedMessages; - this.pubSubManager = pubSubManager; + public WebsocketSender(MessagesManager messagesManager, PubSubManager pubSubManager) { + this.messagesManager = messagesManager; + this.pubSubManager = pubSubManager; } - public boolean sendMessage(Account account, Device device, OutgoingMessageSignal message, boolean apn) { + public DeliveryStatus sendMessage(Account account, Device device, OutgoingMessageSignal message, boolean apn) { WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId()); PubSubMessage pubSubMessage = PubSubMessage.newBuilder() .setType(PubSubMessage.Type.DELIVER) @@ -68,17 +68,17 @@ public class WebsocketSender { if (apn) apnOnlineMeter.mark(); else websocketOnlineMeter.mark(); - return true; + return new DeliveryStatus(true, 0); } else { if (apn) apnOfflineMeter.mark(); else websocketOfflineMeter.mark(); - storedMessages.insert(address, message); + int queueDepth = messagesManager.insert(account.getNumber(), device.getId(), message); pubSubManager.publish(address, PubSubMessage.newBuilder() .setType(PubSubMessage.Type.QUERY_DB) .build()); - return false; + return new DeliveryStatus(false, queueDepth); } } @@ -96,4 +96,23 @@ public class WebsocketSender { return false; } } + + public static class DeliveryStatus { + + private final boolean delivered; + private final int messageQueueDepth; + + public DeliveryStatus(boolean delivered, int messageQueueDepth) { + this.delivered = delivered; + this.messageQueueDepth = messageQueueDepth; + } + + public boolean isDelivered() { + return delivered; + } + + public int getMessageQueueDepth() { + return messageQueueDepth; + } + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java new file mode 100644 index 00000000..6ac08ee7 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -0,0 +1,99 @@ +package org.whispersystems.textsecuregcm.storage; + +import com.google.protobuf.ByteString; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.sqlobject.Bind; +import org.skife.jdbi.v2.sqlobject.Binder; +import org.skife.jdbi.v2.sqlobject.BinderFactory; +import org.skife.jdbi.v2.sqlobject.BindingAnnotation; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.SqlUpdate; +import org.skife.jdbi.v2.sqlobject.customizers.Mapper; +import org.skife.jdbi.v2.tweak.ResultSetMapper; +import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import org.whispersystems.textsecuregcm.util.Pair; + +import java.lang.annotation.Annotation; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +public abstract class Messages { + + private static final String ID = "id"; + private static final String TYPE = "type"; + private static final String RELAY = "relay"; + private static final String TIMESTAMP = "timestamp"; + private static final String SOURCE = "source"; + private static final String SOURCE_DEVICE = "source_device"; + private static final String DESTINATION = "destination"; + private static final String DESTINATION_DEVICE = "destination_device"; + private static final String MESSAGE = "message"; + + @SqlQuery("INSERT INTO messages (" + TYPE + ", " + RELAY + ", " + TIMESTAMP + ", " + SOURCE + ", " + SOURCE_DEVICE + ", " + DESTINATION + ", " + DESTINATION_DEVICE + ", " + MESSAGE + ") " + + "VALUES (:type, :relay, :timestamp, :source, :source_device, :destination, :destination_device, :message) " + + "RETURNING (SELECT COUNT(id) FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device AND " + TYPE + " != " + OutgoingMessageSignal.Type.RECEIPT_VALUE + ")") + abstract int store(@MessageBinder OutgoingMessageSignal message, + @Bind("destination") String destination, + @Bind("destination_device") long destinationDevice); + + @Mapper(MessageMapper.class) + @SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device") + abstract List> load(@Bind("destination") String destination, + @Bind("destination_device") long destinationDevice); + + @SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id") + abstract void remove(@Bind("id") long id); + + @SqlUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination") + abstract void clear(@Bind("destination") String destination); + + public static class MessageMapper implements ResultSetMapper> { + @Override + public Pair map(int i, ResultSet resultSet, StatementContext statementContext) + throws SQLException + { + return new Pair<>(resultSet.getLong(ID), + OutgoingMessageSignal.newBuilder() + .setType(resultSet.getInt(TYPE)) + .setRelay(resultSet.getString(RELAY)) + .setTimestamp(resultSet.getLong(TIMESTAMP)) + .setSource(resultSet.getString(SOURCE)) + .setSourceDevice(resultSet.getInt(SOURCE_DEVICE)) + .setMessage(ByteString.copyFrom(resultSet.getBytes(MESSAGE))) + .build()); + } + } + + @BindingAnnotation(MessageBinder.AccountBinderFactory.class) + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.PARAMETER}) + public @interface MessageBinder { + public static class AccountBinderFactory implements BinderFactory { + @Override + public Binder build(Annotation annotation) { + return new Binder() { + @Override + public void bind(SQLStatement sql, + MessageBinder accountBinder, + OutgoingMessageSignal message) + { + sql.bind(TYPE, message.getType()); + sql.bind(RELAY, message.getRelay()); + sql.bind(TIMESTAMP, message.getTimestamp()); + sql.bind(SOURCE, message.getSource()); + sql.bind(SOURCE_DEVICE, message.getSourceDevice()); + sql.bind(MESSAGE, message.getMessage().toByteArray()); + } + }; + } + } + } + + +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java new file mode 100644 index 00000000..0f429520 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesManager.java @@ -0,0 +1,32 @@ +package org.whispersystems.textsecuregcm.storage; + + +import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; +import org.whispersystems.textsecuregcm.util.Pair; + +import java.util.List; + +public class MessagesManager { + + private final Messages messages; + + public MessagesManager(Messages messages) { + this.messages = messages; + } + + public int insert(String destination, long destinationDevice, OutgoingMessageSignal message) { + return this.messages.store(message, destination, destinationDevice) + 1; + } + + public List> getMessagesForDevice(String destination, long destinationDevice) { + return this.messages.load(destination, destinationDevice); + } + + public void clear(String destination) { + this.messages.clear(destination); + } + + public void delete(long id) { + this.messages.remove(id); + } +} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java deleted file mode 100644 index ab9ca5ed..00000000 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessageProtos.java +++ /dev/null @@ -1,624 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: StoredMessage.proto - -package org.whispersystems.textsecuregcm.storage; - -public final class StoredMessageProtos { - private StoredMessageProtos() {} - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - } - public interface StoredMessageOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // optional .textsecure.StoredMessage.Type type = 1; - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - boolean hasType(); - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType(); - - // optional bytes content = 2; - /** - * optional bytes content = 2; - */ - boolean hasContent(); - /** - * optional bytes content = 2; - */ - com.google.protobuf.ByteString getContent(); - } - /** - * Protobuf type {@code textsecure.StoredMessage} - */ - public static final class StoredMessage extends - com.google.protobuf.GeneratedMessage - implements StoredMessageOrBuilder { - // Use StoredMessage.newBuilder() to construct. - private StoredMessage(com.google.protobuf.GeneratedMessage.Builder builder) { - super(builder); - this.unknownFields = builder.getUnknownFields(); - } - private StoredMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } - - private static final StoredMessage defaultInstance; - public static StoredMessage getDefaultInstance() { - return defaultInstance; - } - - public StoredMessage getDefaultInstanceForType() { - return defaultInstance; - } - - private final com.google.protobuf.UnknownFieldSet unknownFields; - @java.lang.Override - public final com.google.protobuf.UnknownFieldSet - getUnknownFields() { - return this.unknownFields; - } - private StoredMessage( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - initFields(); - int mutable_bitField0_ = 0; - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder(); - try { - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - done = true; - } - break; - } - case 8: { - int rawValue = input.readEnum(); - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.valueOf(rawValue); - if (value == null) { - unknownFields.mergeVarintField(1, rawValue); - } else { - bitField0_ |= 0x00000001; - type_ = value; - } - break; - } - case 18: { - bitField0_ |= 0x00000002; - content_ = input.readBytes(); - break; - } - } - } - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - throw e.setUnfinishedMessage(this); - } catch (java.io.IOException e) { - throw new com.google.protobuf.InvalidProtocolBufferException( - e.getMessage()).setUnfinishedMessage(this); - } finally { - this.unknownFields = unknownFields.build(); - makeExtensionsImmutable(); - } - } - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class); - } - - public static com.google.protobuf.Parser PARSER = - new com.google.protobuf.AbstractParser() { - public StoredMessage parsePartialFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return new StoredMessage(input, extensionRegistry); - } - }; - - @java.lang.Override - public com.google.protobuf.Parser getParserForType() { - return PARSER; - } - - /** - * Protobuf enum {@code textsecure.StoredMessage.Type} - */ - public enum Type - implements com.google.protobuf.ProtocolMessageEnum { - /** - * UNKNOWN = 0; - */ - UNKNOWN(0, 0), - /** - * MESSAGE = 1; - */ - MESSAGE(1, 1), - ; - - /** - * UNKNOWN = 0; - */ - public static final int UNKNOWN_VALUE = 0; - /** - * MESSAGE = 1; - */ - public static final int MESSAGE_VALUE = 1; - - - public final int getNumber() { return value; } - - public static Type valueOf(int value) { - switch (value) { - case 0: return UNKNOWN; - case 1: return MESSAGE; - default: return null; - } - } - - public static com.google.protobuf.Internal.EnumLiteMap - internalGetValueMap() { - return internalValueMap; - } - private static com.google.protobuf.Internal.EnumLiteMap - internalValueMap = - new com.google.protobuf.Internal.EnumLiteMap() { - public Type findValueByNumber(int number) { - return Type.valueOf(number); - } - }; - - public final com.google.protobuf.Descriptors.EnumValueDescriptor - getValueDescriptor() { - return getDescriptor().getValues().get(index); - } - public final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptorForType() { - return getDescriptor(); - } - public static final com.google.protobuf.Descriptors.EnumDescriptor - getDescriptor() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDescriptor().getEnumTypes().get(0); - } - - private static final Type[] VALUES = values(); - - public static Type valueOf( - com.google.protobuf.Descriptors.EnumValueDescriptor desc) { - if (desc.getType() != getDescriptor()) { - throw new java.lang.IllegalArgumentException( - "EnumValueDescriptor is not for this type."); - } - return VALUES[desc.getIndex()]; - } - - private final int index; - private final int value; - - private Type(int index, int value) { - this.index = index; - this.value = value; - } - - // @@protoc_insertion_point(enum_scope:textsecure.StoredMessage.Type) - } - - private int bitField0_; - // optional .textsecure.StoredMessage.Type type = 1; - public static final int TYPE_FIELD_NUMBER = 1; - private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_; - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public boolean hasType() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() { - return type_; - } - - // optional bytes content = 2; - public static final int CONTENT_FIELD_NUMBER = 2; - private com.google.protobuf.ByteString content_; - /** - * optional bytes content = 2; - */ - public boolean hasContent() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - /** - * optional bytes content = 2; - */ - public com.google.protobuf.ByteString getContent() { - return content_; - } - - private void initFields() { - type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; - content_ = com.google.protobuf.ByteString.EMPTY; - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeEnum(1, type_.getNumber()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, content_); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeEnumSize(1, type_.getNumber()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, content_); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom(java.io.InputStream input) - throws java.io.IOException { - return PARSER.parseFrom(input); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return PARSER.parseFrom(input, extensionRegistry); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - return PARSER.parseDelimitedFrom(input); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return PARSER.parseDelimitedFrom(input, extensionRegistry); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return PARSER.parseFrom(input); - } - public static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return PARSER.parseFrom(input, extensionRegistry); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - /** - * Protobuf type {@code textsecure.StoredMessage} - */ - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessageOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.class, org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Builder.class); - } - - // Construct using org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; - bitField0_ = (bitField0_ & ~0x00000001); - content_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.internal_static_textsecure_StoredMessage_descriptor; - } - - public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage getDefaultInstanceForType() { - return org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance(); - } - - public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage build() { - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage buildPartial() { - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage result = new org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - result.type_ = type_; - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - result.content_ = content_; - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) { - return mergeFrom((org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage other) { - if (other == org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.getDefaultInstance()) return this; - if (other.hasType()) { - setType(other.getType()); - } - if (other.hasContent()) { - setContent(other.getContent()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage parsedMessage = null; - try { - parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); - } catch (com.google.protobuf.InvalidProtocolBufferException e) { - parsedMessage = (org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage) e.getUnfinishedMessage(); - throw e; - } finally { - if (parsedMessage != null) { - mergeFrom(parsedMessage); - } - } - return this; - } - private int bitField0_; - - // optional .textsecure.StoredMessage.Type type = 1; - private org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public boolean hasType() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type getType() { - return type_; - } - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public Builder setType(org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - type_ = value; - onChanged(); - return this; - } - /** - * optional .textsecure.StoredMessage.Type type = 1; - */ - public Builder clearType() { - bitField0_ = (bitField0_ & ~0x00000001); - type_ = org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage.Type.UNKNOWN; - onChanged(); - return this; - } - - // optional bytes content = 2; - private com.google.protobuf.ByteString content_ = com.google.protobuf.ByteString.EMPTY; - /** - * optional bytes content = 2; - */ - public boolean hasContent() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - /** - * optional bytes content = 2; - */ - public com.google.protobuf.ByteString getContent() { - return content_; - } - /** - * optional bytes content = 2; - */ - public Builder setContent(com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - content_ = value; - onChanged(); - return this; - } - /** - * optional bytes content = 2; - */ - public Builder clearContent() { - bitField0_ = (bitField0_ & ~0x00000002); - content_ = getDefaultInstance().getContent(); - onChanged(); - return this; - } - - // @@protoc_insertion_point(builder_scope:textsecure.StoredMessage) - } - - static { - defaultInstance = new StoredMessage(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:textsecure.StoredMessage) - } - - private static com.google.protobuf.Descriptors.Descriptor - internal_static_textsecure_StoredMessage_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_textsecure_StoredMessage_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - private static com.google.protobuf.Descriptors.FileDescriptor - descriptor; - static { - java.lang.String[] descriptorData = { - "\n\023StoredMessage.proto\022\ntextsecure\"p\n\rSto" + - "redMessage\022,\n\004type\030\001 \001(\0162\036.textsecure.St" + - "oredMessage.Type\022\017\n\007content\030\002 \001(\014\" \n\004Typ" + - "e\022\013\n\007UNKNOWN\020\000\022\013\n\007MESSAGE\020\001B?\n(org.whisp" + - "ersystems.textsecuregcm.storageB\023StoredM" + - "essageProtos" - }; - com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = - new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { - public com.google.protobuf.ExtensionRegistry assignDescriptors( - com.google.protobuf.Descriptors.FileDescriptor root) { - descriptor = root; - internal_static_textsecure_StoredMessage_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_textsecure_StoredMessage_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_textsecure_StoredMessage_descriptor, - new java.lang.String[] { "Type", "Content", }); - return null; - } - }; - com.google.protobuf.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new com.google.protobuf.Descriptors.FileDescriptor[] { - }, assigner); - } - - // @@protoc_insertion_point(outer_class_scope) -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java deleted file mode 100644 index d72f0348..00000000 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/StoredMessages.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright (C) 2014 Open WhisperSystems - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.whispersystems.textsecuregcm.storage; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SharedMetricRegistries; -import com.google.protobuf.InvalidProtocolBufferException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.util.Constants; -import org.whispersystems.textsecuregcm.websocket.WebsocketAddress; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static com.codahale.metrics.MetricRegistry.name; -import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal; -import static org.whispersystems.textsecuregcm.storage.StoredMessageProtos.StoredMessage; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; - -public class StoredMessages { - - private static final Logger logger = LoggerFactory.getLogger(StoredMessages.class); - - private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); - private final Histogram queueSizeHistogram = metricRegistry.histogram(name(getClass(), "queue_size")); - - private static final String QUEUE_PREFIX = "msgs"; - - private final JedisPool jedisPool; - - public StoredMessages(JedisPool jedisPool) { - this.jedisPool = jedisPool; - } - - public void clear(WebsocketAddress address) { - try (Jedis jedis = jedisPool.getResource()) { - jedis.del(getKey(address)); - } - } - - public void insert(WebsocketAddress address, OutgoingMessageSignal message) { - try (Jedis jedis = jedisPool.getResource()) { - byte[] queue = getKey(address); - StoredMessage storedMessage = StoredMessage.newBuilder() - .setType(StoredMessage.Type.MESSAGE) - .setContent(message.toByteString()) - .build(); - - long queueSize = jedis.lpush(queue, storedMessage.toByteArray()); - queueSizeHistogram.update(queueSize); - - jedis.expireAt(queue, (System.currentTimeMillis() / 1000) + TimeUnit.DAYS.toSeconds(30)); - - if (queueSize > 1000) { - jedis.ltrim(getKey(address), 0, 999); - } - } - } - - public List getMessagesForDevice(WebsocketAddress address) { - List messages = new LinkedList<>(); - - try (Jedis jedis = jedisPool.getResource()) { - byte[] message; - - while ((message = jedis.rpop(getKey(address))) != null) { - try { - StoredMessage storedMessage = StoredMessage.parseFrom(message); - - if (storedMessage.getType().getNumber() == StoredMessage.Type.MESSAGE_VALUE) { - messages.add(OutgoingMessageSignal.parseFrom(storedMessage.getContent())); - } else { - logger.warn("Unkown stored message type: " + storedMessage.getType().getNumber()); - } - - } catch (InvalidProtocolBufferException e) { - logger.warn("Error parsing protobuf", e); - } - } - - return messages; - } - } - - private byte[] getKey(WebsocketAddress address) { - return (QUEUE_PREFIX + ":" + address.serialize()).getBytes(); - } - -} \ No newline at end of file diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java index eb48748b..0d443ebd 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/AuthenticatedConnectListener.java @@ -7,8 +7,8 @@ import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Util; import org.whispersystems.websocket.session.WebSocketSessionContext; import org.whispersystems.websocket.setup.WebSocketConnectListener; @@ -19,15 +19,15 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { private final AccountsManager accountsManager; private final PushSender pushSender; - private final StoredMessages storedMessages; + private final MessagesManager messagesManager; private final PubSubManager pubSubManager; public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender, - StoredMessages storedMessages, PubSubManager pubSubManager) + MessagesManager messagesManager, PubSubManager pubSubManager) { this.accountsManager = accountsManager; this.pushSender = pushSender; - this.storedMessages = storedMessages; + this.messagesManager = messagesManager; this.pubSubManager = pubSubManager; } @@ -55,7 +55,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener { } final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender, - storedMessages, pubSubManager, + messagesManager, pubSubManager, account.get(), device.get(), context.getClient()); diff --git a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java index cd91f286..18ffd25b 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java +++ b/src/main/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnection.java @@ -15,9 +15,10 @@ import org.whispersystems.textsecuregcm.push.TransientPushFailureException; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PubSubListener; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; +import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.websocket.WebSocketClient; import org.whispersystems.websocket.messages.WebSocketResponseMessage; @@ -34,7 +35,7 @@ public class WebSocketConnection implements PubSubListener { private final AccountsManager accountsManager; private final PushSender pushSender; - private final StoredMessages storedMessages; + private final MessagesManager messagesManager; private final PubSubManager pubSubManager; private final Account account; @@ -44,7 +45,7 @@ public class WebSocketConnection implements PubSubListener { public WebSocketConnection(AccountsManager accountsManager, PushSender pushSender, - StoredMessages storedMessages, + MessagesManager messagesManager, PubSubManager pubSubManager, Account account, Device device, @@ -52,7 +53,7 @@ public class WebSocketConnection implements PubSubListener { { this.accountsManager = accountsManager; this.pushSender = pushSender; - this.storedMessages = storedMessages; + this.messagesManager = messagesManager; this.pubSubManager = pubSubManager; this.account = account; this.device = device; @@ -77,7 +78,7 @@ public class WebSocketConnection implements PubSubListener { processStoredMessages(); break; case PubSubMessage.Type.DELIVER_VALUE: - sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent())); + sendMessage(OutgoingMessageSignal.parseFrom(pubSubMessage.getContent()), Optional.absent()); break; default: logger.warn("Unknown pubsub message: " + pubSubMessage.getType().getNumber()); @@ -87,7 +88,9 @@ public class WebSocketConnection implements PubSubListener { } } - private void sendMessage(final OutgoingMessageSignal message) { + private void sendMessage(final OutgoingMessageSignal message, + final Optional storedMessageId) + { try { EncryptedOutgoingMessage encryptedMessage = new EncryptedOutgoingMessage(message, device.getSignalingKey()); Optional body = Optional.fromNullable(encryptedMessage.toByteArray()); @@ -98,16 +101,17 @@ public class WebSocketConnection implements PubSubListener { public void onSuccess(@Nullable WebSocketResponseMessage response) { boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE; - if (isSuccessResponse(response) && !isReceipt) { - sendDeliveryReceiptFor(message); - } else if (!isSuccessResponse(response)) { + if (isSuccessResponse(response)) { + if (storedMessageId.isPresent()) messagesManager.delete(storedMessageId.get()); + if (!isReceipt) sendDeliveryReceiptFor(message); + } else if (!isSuccessResponse(response) & !storedMessageId.isPresent()) { requeueMessage(message); } } @Override public void onFailure(@Nonnull Throwable throwable) { - requeueMessage(message); + if (!storedMessageId.isPresent()) requeueMessage(message); } private boolean isSuccessResponse(WebSocketResponseMessage response) { @@ -124,7 +128,7 @@ public class WebSocketConnection implements PubSubListener { pushSender.sendMessage(account, device, message); } catch (NotPushRegisteredException | TransientPushFailureException e) { logger.warn("requeueMessage", e); - storedMessages.insert(address, message); + messagesManager.insert(account.getNumber(), device.getId(), message); } } @@ -153,10 +157,11 @@ public class WebSocketConnection implements PubSubListener { } private void processStoredMessages() { - List messages = storedMessages.getMessagesForDevice(address); + List> messages = messagesManager.getMessagesForDevice(account.getNumber(), + device.getId()); - for (OutgoingMessageSignal message : messages) { - sendMessage(message); + for (Pair message : messages) { + sendMessage(message.second(), Optional.of(message.first())); } } } diff --git a/src/main/resources/migrations.xml b/src/main/resources/accountsdb.xml similarity index 100% rename from src/main/resources/migrations.xml rename to src/main/resources/accountsdb.xml diff --git a/src/main/resources/messagedb.xml b/src/main/resources/messagedb.xml new file mode 100644 index 00000000..47b8b6dd --- /dev/null +++ b/src/main/resources/messagedb.xml @@ -0,0 +1,60 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java index 060f8375..ee2c0d5a 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/controllers/AccountControllerTest.java @@ -15,8 +15,9 @@ import org.whispersystems.textsecuregcm.providers.TimeProvider; import org.whispersystems.textsecuregcm.sms.SmsSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PendingAccountsManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; +//import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.tests.util.AuthHelper; import javax.ws.rs.core.MediaType; @@ -35,7 +36,7 @@ public class AccountControllerTest { private RateLimiters rateLimiters = mock(RateLimiters.class ); private RateLimiter rateLimiter = mock(RateLimiter.class ); private SmsSender smsSender = mock(SmsSender.class ); - private StoredMessages storedMessages = mock(StoredMessages.class ); + private MessagesManager storedMessages = mock(MessagesManager.class ); private TimeProvider timeProvider = mock(TimeProvider.class ); private static byte[] authorizationKey = decodeHex("3a078586eea8971155f5c1ebd73c8c923cbec1c3ed22a54722e4e88321dc749f"); diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java index 65cd33ee..02c565f7 100644 --- a/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/websocket/WebSocketConnectionTest.java @@ -12,9 +12,10 @@ import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.MessagesManager; import org.whispersystems.textsecuregcm.storage.PubSubManager; -import org.whispersystems.textsecuregcm.storage.StoredMessages; import org.whispersystems.textsecuregcm.util.Base64; +import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener; import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.websocket.WebSocketConnection; @@ -58,9 +59,9 @@ public class WebSocketConnectionTest { @Test public void testCredentials() throws Exception { - StoredMessages storedMessages = mock(StoredMessages.class); + MessagesManager storedMessages = mock(MessagesManager.class); WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator); - AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, storedMessages, pubSubManager); + AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, storedMessages, pubSubManager); WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class); when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD)))) @@ -113,12 +114,12 @@ public class WebSocketConnectionTest { @Test public void testOpen() throws Exception { - StoredMessages storedMessages = mock(StoredMessages.class); + MessagesManager storedMessages = mock(MessagesManager.class); - List outgoingMessages = new LinkedList() {{ - add(createMessage("sender1", 1111, false, "first")); - add(createMessage("sender1", 2222, false, "second")); - add(createMessage("sender2", 3333, false, "third")); + List> outgoingMessages = new LinkedList> () {{ + add(new Pair<>(1L, createMessage("sender1", 1111, false, "first"))); + add(new Pair<>(2L, createMessage("sender1", 2222, false, "second"))); + add(new Pair<>(3L, createMessage("sender2", 3333, false, "third"))); }}; when(device.getId()).thenReturn(2L); @@ -139,7 +140,7 @@ public class WebSocketConnectionTest { when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1)); when(accountsManager.get("sender2")).thenReturn(Optional.absent()); - when(storedMessages.getMessagesForDevice(new WebsocketAddress(account.getNumber(), device.getId()))) + when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId())) .thenReturn(outgoingMessages); final List> futures = new LinkedList<>(); @@ -177,7 +178,7 @@ public class WebSocketConnectionTest { add(createMessage("sender2", 3333, false, "third")); }}; - verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(OutgoingMessageSignal.class)); +// verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(OutgoingMessageSignal.class)); verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class)); connection.onConnectionLost();