This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push:
new 35bd908641 JAMES-2586 Implement BlobReferenceSource(s) for postgres-app
35bd908641 is described below
commit 35bd90864133354ff2b0810839ee005e26292b4c
Author: hung phan <[email protected]>
AuthorDate: Thu Jan 4 17:11:37 2024 +0700
JAMES-2586 Implement BlobReferenceSource(s) for postgres-app
---
.../backends/postgres/PostgresConfiguration.java | 177 ++++++++----
.../utils/JamesPostgresConnectionFactory.java | 1 +
.../backends/postgres/utils/PostgresExecutor.java | 1 +
.../postgres/PostgresConfigurationTest.java | 115 ++++----
.../james/backends/postgres/PostgresExtension.java | 21 +-
.../mail/PostgresMessageBlobReferenceSource.java | 25 +-
.../postgres/mail/dao/PostgresMessageDAO.java | 11 +-
.../PostgresMessageBlobReferenceSourceTest.java | 100 +++++++
.../sample-configuration/postgres.properties | 21 +-
.../org/apache/james/PostgresJamesServerMain.java | 8 -
.../modules/mailbox/PostgresMailboxModule.java | 8 +
.../james/modules/data/PostgresCommonModule.java | 45 ++-
.../modules/data/PostgresMailRepositoryModule.java | 7 +
.../postgres/PostgresMailRepository.java | 301 +--------------------
.../PostgresMailRepositoryBlobReferenceSource.java | 24 +-
....java => PostgresMailRepositoryContentDAO.java} | 78 +++---
.../postgres/PostgresMailRepositoryFactory.java | 2 +-
...tgresMailRepositoryBlobReferenceSourceTest.java | 94 +++++++
.../postgres/PostgresMailRepositoryTest.java | 2 +-
19 files changed, 554 insertions(+), 487 deletions(-)
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
index 7ffeb8be40..82683044ff 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
@@ -19,31 +19,34 @@
package org.apache.james.backends.postgres;
-import java.net.URI;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.configuration2.Configuration;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
public class PostgresConfiguration {
- public static final String URL = "url";
public static final String DATABASE_NAME = "database.name";
public static final String DATABASE_NAME_DEFAULT_VALUE = "postgres";
public static final String DATABASE_SCHEMA = "database.schema";
public static final String DATABASE_SCHEMA_DEFAULT_VALUE = "public";
+ public static final String HOST = "database.host";
+ public static final String HOST_DEFAULT_VALUE = "localhost";
+ public static final String PORT = "database.port";
+ public static final int PORT_DEFAULT_VALUE = 5432;
+ public static final String USERNAME = "database.username";
+ public static final String PASSWORD = "database.password";
+ public static final String NON_RLS_USERNAME = "database.non-rls.username";
+ public static final String NON_RLS_PASSWORD = "database.non-rls.password";
public static final String RLS_ENABLED = "row.level.security.enabled";
public static class Credential {
private final String username;
private final String password;
- Credential(String username, String password) {
+
+ public Credential(String username, String password) {
this.username = username;
this.password = password;
}
@@ -58,16 +61,16 @@ public class PostgresConfiguration {
}
public static class Builder {
- private Optional<String> url = Optional.empty();
private Optional<String> databaseName = Optional.empty();
private Optional<String> databaseSchema = Optional.empty();
+ private Optional<String> host = Optional.empty();
+ private Optional<Integer> port = Optional.empty();
+ private Optional<String> username = Optional.empty();
+ private Optional<String> password = Optional.empty();
+ private Optional<String> nonRLSUser = Optional.empty();
+ private Optional<String> nonRLSPassword = Optional.empty();
private Optional<Boolean> rowLevelSecurityEnabled = Optional.empty();
- public Builder url(String url) {
- this.url = Optional.of(url);
- return this;
- }
-
public Builder databaseName(String databaseName) {
this.databaseName = Optional.of(databaseName);
return this;
@@ -88,6 +91,66 @@ public class PostgresConfiguration {
return this;
}
+ public Builder host(String host) {
+ this.host = Optional.of(host);
+ return this;
+ }
+
+ public Builder host(Optional<String> host) {
+ this.host = host;
+ return this;
+ }
+
+ public Builder port(Integer port) {
+ this.port = Optional.of(port);
+ return this;
+ }
+
+ public Builder port(Optional<Integer> port) {
+ this.port = port;
+ return this;
+ }
+
+ public Builder username(String username) {
+ this.username = Optional.of(username);
+ return this;
+ }
+
+ public Builder username(Optional<String> username) {
+ this.username = username;
+ return this;
+ }
+
+ public Builder password(String password) {
+ this.password = Optional.of(password);
+ return this;
+ }
+
+ public Builder password(Optional<String> password) {
+ this.password = password;
+ return this;
+ }
+
+ public Builder nonRLSUser(String nonRLSUser) {
+ this.nonRLSUser = Optional.of(nonRLSUser);
+ return this;
+ }
+
+ public Builder nonRLSUser(Optional<String> nonRLSUser) {
+ this.nonRLSUser = nonRLSUser;
+ return this;
+ }
+
+ public Builder nonRLSPassword(String nonRLSPassword) {
+ this.nonRLSPassword = Optional.of(nonRLSPassword);
+ return this;
+ }
+
+ public Builder nonRLSPassword(Optional<String> nonRLSPassword) {
+ this.nonRLSPassword = nonRLSPassword;
+ return this;
+ }
+
public Builder rowLevelSecurityEnabled(boolean rlsEnabled) {
this.rowLevelSecurityEnabled = Optional.of(rlsEnabled);
return this;
@@ -99,36 +162,22 @@ public class PostgresConfiguration {
}
public PostgresConfiguration build() {
- Preconditions.checkArgument(url.isPresent() &&
!url.get().isBlank(), "You need to specify Postgres URI");
- URI postgresURI = asURI(url.get());
+ Preconditions.checkArgument(username.isPresent() &&
!username.get().isBlank(), "You need to specify username");
+ Preconditions.checkArgument(password.isPresent() &&
!password.get().isBlank(), "You need to specify password");
+
+ if (rowLevelSecurityEnabled.isPresent() &&
rowLevelSecurityEnabled.get()) {
+ Preconditions.checkArgument(nonRLSUser.isPresent() &&
!nonRLSUser.get().isBlank(), "You need to specify nonRLSUser");
+ Preconditions.checkArgument(nonRLSPassword.isPresent() &&
!nonRLSPassword.get().isBlank(), "You need to specify nonRLSPassword");
+ }
- return new PostgresConfiguration(postgresURI,
- parseCredential(postgresURI),
+ return new PostgresConfiguration(host.orElse(HOST_DEFAULT_VALUE),
+ port.orElse(PORT_DEFAULT_VALUE),
databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE),
databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE),
+ new Credential(username.get(), password.get()),
+ new Credential(nonRLSUser.orElse(username.get()),
nonRLSPassword.orElse(password.get())),
rowLevelSecurityEnabled.orElse(false));
}
-
- private Credential parseCredential(URI postgresURI) {
- Preconditions.checkArgument(postgresURI.getUserInfo() != null,
"Postgres URI need to contains user credential");
-
Preconditions.checkArgument(postgresURI.getUserInfo().contains(":"), "User info
needs a password part");
-
- List<String> parts = Splitter.on(':')
- .splitToList(postgresURI.getUserInfo());
- ImmutableList<String> passwordParts = parts.stream()
- .skip(1)
- .collect(ImmutableList.toImmutableList());
-
- return new Credential(parts.get(0),
Joiner.on(':').join(passwordParts));
- }
-
- private URI asURI(String uri) {
- try {
- return URI.create(uri);
- } catch (Exception e) {
- throw new IllegalArgumentException("You need to specify a
valid Postgres URI", e);
- }
- }
}
public static Builder builder() {
@@ -137,33 +186,43 @@ public class PostgresConfiguration {
public static PostgresConfiguration from(Configuration
propertiesConfiguration) {
return builder()
- .url(propertiesConfiguration.getString(URL, null))
.databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME)))
.databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA)))
+ .host(Optional.ofNullable(propertiesConfiguration.getString(HOST)))
+ .port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE))
+
.username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME)))
+
.password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD)))
+
.nonRLSUser(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_USERNAME)))
+
.nonRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_PASSWORD)))
.rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false))
.build();
}
- private final URI uri;
- private final Credential credential;
+ private final String host;
+ private final int port;
private final String databaseName;
private final String databaseSchema;
+ private final Credential credential;
+ private final Credential nonRLSCredential;
private final boolean rowLevelSecurityEnabled;
- private PostgresConfiguration(URI uri, Credential credential, String
databaseName, String databaseSchema, boolean rowLevelSecurityEnabled) {
- this.uri = uri;
- this.credential = credential;
+ private PostgresConfiguration(String host, int port, String databaseName,
String databaseSchema,
+ Credential credential, Credential
nonRLSCredential, boolean rowLevelSecurityEnabled) {
+ this.host = host;
+ this.port = port;
this.databaseName = databaseName;
this.databaseSchema = databaseSchema;
+ this.credential = credential;
+ this.nonRLSCredential = nonRLSCredential;
this.rowLevelSecurityEnabled = rowLevelSecurityEnabled;
}
- public URI getUri() {
- return uri;
+ public String getHost() {
+ return host;
}
- public Credential getCredential() {
- return credential;
+ public int getPort() {
+ return port;
}
public String getDatabaseName() {
@@ -174,26 +233,36 @@ public class PostgresConfiguration {
return databaseSchema;
}
+ public Credential getCredential() {
+ return credential;
+ }
+
+ public Credential getNonRLSCredential() {
+ return nonRLSCredential;
+ }
+
public boolean rowLevelSecurityEnabled() {
return rowLevelSecurityEnabled;
}
+ @Override
+ public final int hashCode() {
+ return Objects.hash(host, port, databaseName, databaseSchema,
credential, nonRLSCredential, rowLevelSecurityEnabled);
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof PostgresConfiguration) {
PostgresConfiguration that = (PostgresConfiguration) o;
return Objects.equals(this.rowLevelSecurityEnabled,
that.rowLevelSecurityEnabled)
- && Objects.equals(this.uri, that.uri)
+ && Objects.equals(this.host, that.host)
+ && Objects.equals(this.port, that.port)
&& Objects.equals(this.credential, that.credential)
+ && Objects.equals(this.nonRLSCredential, that.nonRLSCredential)
&& Objects.equals(this.databaseName, that.databaseName)
&& Objects.equals(this.databaseSchema, that.databaseSchema);
}
return false;
}
-
- @Override
- public final int hashCode() {
- return Objects.hash(uri, credential, databaseName, databaseSchema,
rowLevelSecurityEnabled);
- }
}
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
index 8d8391e209..c196f80642 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
@@ -28,6 +28,7 @@ import reactor.core.publisher.Mono;
public interface JamesPostgresConnectionFactory {
String DOMAIN_ATTRIBUTE = "app.current_domain";
+ String NON_RLS_INJECT = "non_rls";
default Mono<Connection> getConnection(Domain domain) {
return getConnection(Optional.ofNullable(domain));
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 67f6c2067b..268e14a08a 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -47,6 +47,7 @@ import reactor.util.retry.Retry;
public class PostgresExecutor {
public static final String DEFAULT_INJECT = "default";
+ public static final String NON_RLS_INJECT = "non_rls";
public static final int MAX_RETRY_ATTEMPTS = 5;
public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
index 248eb0dd66..b47f66abe4 100644
---
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
@@ -22,89 +22,98 @@ package org.apache.james.backends.postgres;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.Test;
class PostgresConfigurationTest {
@Test
- void shouldThrowWhenMissingPostgresURI() {
+ void shouldReturnCorrespondingProperties() {
+ PostgresConfiguration configuration = PostgresConfiguration.builder()
+ .host("1.1.1.1")
+ .port(1111)
+ .databaseName("db")
+ .databaseSchema("sc")
+ .username("james")
+ .password("1")
+ .nonRLSUser("nonrlsjames")
+ .nonRLSPassword("2")
+ .rowLevelSecurityEnabled()
+ .build();
+
+ assertThat(configuration.getHost()).isEqualTo("1.1.1.1");
+ assertThat(configuration.getPort()).isEqualTo(1111);
+ assertThat(configuration.getDatabaseName()).isEqualTo("db");
+ assertThat(configuration.getDatabaseSchema()).isEqualTo("sc");
+
assertThat(configuration.getCredential().getUsername()).isEqualTo("james");
+ assertThat(configuration.getCredential().getPassword()).isEqualTo("1");
+
assertThat(configuration.getNonRLSCredential().getUsername()).isEqualTo("nonrlsjames");
+
assertThat(configuration.getNonRLSCredential().getPassword()).isEqualTo("2");
+ assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(true);
+ }
+
+ @Test
+ void shouldUseDefaultValues() {
+ PostgresConfiguration configuration = PostgresConfiguration.builder()
+ .username("james")
+ .password("1")
+ .build();
+
+
assertThat(configuration.getHost()).isEqualTo(PostgresConfiguration.HOST_DEFAULT_VALUE);
+
assertThat(configuration.getPort()).isEqualTo(PostgresConfiguration.PORT_DEFAULT_VALUE);
+
assertThat(configuration.getDatabaseName()).isEqualTo(PostgresConfiguration.DATABASE_NAME_DEFAULT_VALUE);
+
assertThat(configuration.getDatabaseSchema()).isEqualTo(PostgresConfiguration.DATABASE_SCHEMA_DEFAULT_VALUE);
+
assertThat(configuration.getNonRLSCredential().getUsername()).isEqualTo("james");
+
assertThat(configuration.getNonRLSCredential().getPassword()).isEqualTo("1");
+ assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(false);
+ }
+
+ @Test
+ void shouldThrowWhenMissingUsername() {
assertThatThrownBy(() -> PostgresConfiguration.builder()
.build())
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("You need to specify Postgres URI");
+ .hasMessage("You need to specify username");
}
@Test
- void shouldThrowWhenInvalidURI() {
+ void shouldThrowWhenMissingPassword() {
assertThatThrownBy(() -> PostgresConfiguration.builder()
- .url(":invalid")
+ .username("james")
.build())
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("You need to specify a valid Postgres URI");
+ .hasMessage("You need to specify password");
}
@Test
- void shouldThrowWhenURIMissingCredential() {
+ void shouldThrowWhenMissingNonRLSUserAndRLSIsEnabled() {
assertThatThrownBy(() -> PostgresConfiguration.builder()
- .url("postgresql://localhost:5432")
+ .username("james")
+ .password("1")
+ .rowLevelSecurityEnabled()
.build())
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Postgres URI need to contains user credential");
+ .hasMessage("You need to specify nonRLSUser");
}
@Test
- void shouldParseValidURI() {
- PostgresConfiguration configuration = PostgresConfiguration.builder()
- .url("postgresql://username:password@postgreshost:5672")
- .build();
-
- assertThat(configuration.getUri().getHost()).isEqualTo("postgreshost");
- assertThat(configuration.getUri().getPort()).isEqualTo(5672);
-
assertThat(configuration.getCredential().getUsername()).isEqualTo("username");
-
assertThat(configuration.getCredential().getPassword()).isEqualTo("password");
+ void shouldThrowWhenMissingNonRLSPasswordAndRLSIsEnabled() {
+ assertThatThrownBy(() -> PostgresConfiguration.builder()
+ .username("james")
+ .password("1")
+ .nonRLSUser("nonrlsjames")
+ .rowLevelSecurityEnabled()
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("You need to specify nonRLSPassword");
}
@Test
void rowLevelSecurityShouldBeDisabledByDefault() {
PostgresConfiguration configuration = PostgresConfiguration.builder()
- .url("postgresql://username:password@postgreshost:5672")
+ .username("james")
+ .password("1")
.build();
assertThat(configuration.rowLevelSecurityEnabled()).isFalse();
}
-
- @Test
- void databaseNameShouldFallbackToDefaultWhenNotSet() {
- PostgresConfiguration configuration = PostgresConfiguration.builder()
- .url("postgresql://username:password@postgreshost:5672")
- .build();
-
- assertThat(configuration.getDatabaseName()).isEqualTo("postgres");
- }
-
- @Test
- void databaseSchemaShouldFallbackToDefaultWhenNotSet() {
- PostgresConfiguration configuration = PostgresConfiguration.builder()
- .url("postgresql://username:password@postgreshost:5672")
- .build();
-
- assertThat(configuration.getDatabaseSchema()).isEqualTo("public");
- }
-
- @Test
- void shouldReturnCorrespondingProperties() {
- PostgresConfiguration configuration = PostgresConfiguration.builder()
- .url("postgresql://username:password@postgreshost:5672")
- .rowLevelSecurityEnabled()
- .databaseName("databaseName")
- .databaseSchema("databaseSchema")
- .build();
-
- SoftAssertions.assertSoftly(softly -> {
-
softly.assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(true);
-
softly.assertThat(configuration.getDatabaseName()).isEqualTo("databaseName");
-
softly.assertThat(configuration.getDatabaseSchema()).isEqualTo("databaseSchema");
- });
- }
}
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 672a770d6e..2a2c6b9a33 100644
---
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -25,9 +25,9 @@ import static
org.apache.james.backends.postgres.PostgresFixture.Database.ROW_LE
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.http.client.utils.URIBuilder;
import org.apache.james.GuiceModuleTestExtension;
import
org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
@@ -114,23 +114,22 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
PG_CONTAINER.execInContainer("psql", "-U", selectedDatabase.dbUser(),
selectedDatabase.dbName(), "-c", String.format("CREATE EXTENSION IF NOT EXISTS
hstore SCHEMA %s;", selectedDatabase.schema()));
}
- private void initPostgresSession() throws URISyntaxException {
+ private void initPostgresSession() {
postgresConfiguration = PostgresConfiguration.builder()
- .url(new URIBuilder()
- .setScheme("postgresql")
- .setHost(getHost())
- .setPort(getMappedPort())
- .setUserInfo(selectedDatabase.dbUser(),
selectedDatabase.dbPassword())
- .build()
- .toString())
.databaseName(selectedDatabase.dbName())
.databaseSchema(selectedDatabase.schema())
+ .host(getHost())
+ .port(getMappedPort())
+ .username(selectedDatabase.dbUser())
+ .password(selectedDatabase.dbPassword())
+ .nonRLSUser(DEFAULT_DATABASE.dbUser())
+ .nonRLSPassword(DEFAULT_DATABASE.dbPassword())
.rowLevelSecurityEnabled(rlsEnabled)
.build();
connectionFactory = new
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
- .host(postgresConfiguration.getUri().getHost())
- .port(postgresConfiguration.getUri().getPort())
+ .host(postgresConfiguration.getHost())
+ .port(postgresConfiguration.getPort())
.username(postgresConfiguration.getCredential().getUsername())
.password(postgresConfiguration.getCredential().getPassword())
.database(postgresConfiguration.getDatabaseName())
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
similarity index 64%
copy from
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
copy to
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
index 8d8391e209..d4136a081e 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
@@ -17,21 +17,26 @@
* under the License. *
****************************************************************/
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.mailbox.postgres.mail;
-import java.util.Optional;
+import javax.inject.Inject;
-import org.apache.james.core.Domain;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
-public interface JamesPostgresConnectionFactory {
- String DOMAIN_ATTRIBUTE = "app.current_domain";
+public class PostgresMessageBlobReferenceSource implements BlobReferenceSource
{
+ private PostgresMessageDAO postgresMessageDAO;
- default Mono<Connection> getConnection(Domain domain) {
- return getConnection(Optional.ofNullable(domain));
+ @Inject
+ public PostgresMessageBlobReferenceSource(PostgresMessageDAO
postgresMessageDAO) {
+ this.postgresMessageDAO = postgresMessageDAO;
}
- Mono<Connection> getConnection(Optional<Domain> domain);
+ @Override
+ public Flux<BlobId> listReferencedBlobs() {
+ return postgresMessageDAO.listBlobs();
+ }
}
diff --git
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
index c68b0e3792..d4aca8b5a9 100644
---
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
@@ -46,6 +46,7 @@ import java.time.LocalDateTime;
import java.util.Optional;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
@@ -60,6 +61,7 @@ import
org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.jooq.Record;
import org.jooq.postgres.extensions.types.Hstore;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -85,7 +87,8 @@ public class PostgresMessageDAO {
private final PostgresExecutor postgresExecutor;
private final BlobId.Factory blobIdFactory;
- public PostgresMessageDAO(PostgresExecutor postgresExecutor,
BlobId.Factory blobIdFactory) {
+ @Inject
+ public PostgresMessageDAO(@Named(PostgresExecutor.NON_RLS_INJECT)
PostgresExecutor postgresExecutor, BlobId.Factory blobIdFactory) {
this.postgresExecutor = postgresExecutor;
this.blobIdFactory = blobIdFactory;
}
@@ -144,4 +147,10 @@ public class PostgresMessageDAO {
.map(record -> blobIdFactory.from(record.get(BODY_BLOB_ID)));
}
+ public Flux<BlobId> listBlobs() {
+ return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(BODY_BLOB_ID)
+ .from(TABLE_NAME)))
+ .map(record -> blobIdFactory.from(record.get(BODY_BLOB_ID)));
+ }
+
}
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
new file mode 100644
index 0000000000..37b5a91117
--- /dev/null
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.UUID;
+
+import javax.mail.Flags;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.ByteContent;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PostgresMessageBlobReferenceSourceTest {
+ private static final int BODY_START = 16;
+ private static final PostgresMailboxId MAILBOX_ID =
PostgresMailboxId.generate();
+ private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+ private static final String CONTENT_2 = "Subject: Test3 \n\nBody23\n.\n";
+ private static final MessageUid MESSAGE_UID = MessageUid.of(1);
+
+ @RegisterExtension
+ static PostgresExtension postgresExtension =
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
+
+ PostgresMessageBlobReferenceSource blobReferenceSource;
+ PostgresMessageDAO postgresMessageDAO;
+
+ @BeforeEach
+ void beforeEach() {
+ postgresMessageDAO = new
PostgresMessageDAO(postgresExtension.getPostgresExecutor(), new
HashBlobId.Factory());
+ blobReferenceSource = new
PostgresMessageBlobReferenceSource(postgresMessageDAO);
+ }
+
+ @Test
+ void blobReferencesShouldBeEmptyByDefault() {
+
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
+ .isEmpty();
+ }
+
+ @Test
+ void blobReferencesShouldReturnAllBlobs() {
+ MessageId messageId1 = PostgresMessageId.Factory.of(UUID.randomUUID());
+ SimpleMailboxMessage message = createMessage(messageId1,
ThreadId.fromBaseMessageId(messageId1), CONTENT, BODY_START, new
PropertyBuilder());
+ MessageId messageId2 = PostgresMessageId.Factory.of(UUID.randomUUID());
+ MailboxMessage message2 = createMessage(messageId2,
ThreadId.fromBaseMessageId(messageId2), CONTENT_2, BODY_START, new
PropertyBuilder());
+ postgresMessageDAO.insert(message, "1") .block();
+ postgresMessageDAO.insert(message2, "2") .block();
+
+
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
+ .hasSize(2);
+ }
+
+ private SimpleMailboxMessage createMessage(MessageId messageId, ThreadId
threadId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
+ return SimpleMailboxMessage.builder()
+ .messageId(messageId)
+ .threadId(threadId)
+ .mailboxId(MAILBOX_ID)
+ .uid(MESSAGE_UID)
+ .internalDate(new Date())
+ .bodyStartOctet(bodyStart)
+ .size(content.length())
+ .content(new ByteContent(content.getBytes(StandardCharsets.UTF_8)))
+ .flags(new Flags())
+ .properties(propertyBuilder)
+ .build();
+ }
+
+}
diff --git a/server/apps/postgres-app/sample-configuration/postgres.properties
b/server/apps/postgres-app/sample-configuration/postgres.properties
index 0bfe376f4d..b93071532e 100644
--- a/server/apps/postgres-app/sample-configuration/postgres.properties
+++ b/server/apps/postgres-app/sample-configuration/postgres.properties
@@ -1,11 +1,26 @@
-# String. Required. PostgreSQL URI in the format
postgresql://username:password@host:port
-url=postgresql://james:secret1@postgres:5432
-
# String. Optional, default to 'postgres'. Database name.
database.name=james
# String. Optional, default to 'public'. Database schema.
database.schema=public
+# String. Optional, default to 'localhost'. Database host.
+database.host=postgres
+
+# Integer. Optional, default to 5432. Database port.
+database.port=5432
+
+# String. Required. Database username.
+database.username=james
+
+# String. Required. Database password of the user.
+database.password=secret1
+
+# String. It is required when row.level.security.enabled is true. Database
username with the permission of bypassing RLS.
+database.non-rls.username=nonrlsjames
+
+# String. It is required when row.level.security.enabled is true. Database
password of non-rls user.
+database.non-rls.password=secret1
+
# Boolean. Optional, default to false. Whether to enable row level security.
row.level.security.enabled=true
diff --git
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 5da7524604..76f6244de2 100644
---
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -21,7 +21,6 @@ package org.apache.james;
import java.util.List;
-import org.apache.james.blob.api.BlobReferenceSource;
import org.apache.james.data.UsersRepositoryModuleChooser;
import org.apache.james.modules.BlobExportMechanismModule;
import org.apache.james.modules.MailboxModule;
@@ -63,12 +62,10 @@ import org.apache.james.modules.server.TaskManagerModule;
import
org.apache.james.modules.server.WebAdminReIndexingTaskSerializationModule;
import org.apache.james.modules.server.WebAdminServerModule;
import org.apache.james.modules.vault.DeletedMessageVaultRoutesModule;
-import org.apache.james.server.blob.deduplication.StorageStrategy;
import org.apache.james.vault.VaultConfiguration;
import com.google.common.collect.ImmutableList;
import com.google.inject.Module;
-import com.google.inject.multibindings.Multibinder;
import com.google.inject.util.Modules;
public class PostgresJamesServerMain implements JamesServerMain {
@@ -145,11 +142,6 @@ public class PostgresJamesServerMain implements
JamesServerMain {
.addAll(BlobStoreModulesChooser.chooseModules(configuration.blobStoreConfiguration()))
.add(new BlobStoreCacheModulesChooser.CacheDisabledModule());
- // should remove this after
https://github.com/linagora/james-project/issues/4998
- if
(configuration.blobStoreConfiguration().storageStrategy().equals(StorageStrategy.DEDUPLICATION))
{
- builder.add(binder -> Multibinder.newSetBinder(binder,
BlobReferenceSource.class));
- }
-
return builder.build();
}
diff --git
a/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
b/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
index 97f4716a4a..b1a955f6ac 100644
---
a/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
+++
b/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
@@ -29,6 +29,7 @@ import
org.apache.james.adapter.mailbox.QuotaUsernameChangeTaskStep;
import org.apache.james.adapter.mailbox.UserRepositoryAuthenticator;
import org.apache.james.adapter.mailbox.UserRepositoryAuthorizator;
import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobReferenceSource;
import org.apache.james.events.EventListener;
import org.apache.james.mailbox.AttachmentContentLoader;
import org.apache.james.mailbox.Authenticator;
@@ -49,6 +50,8 @@ import org.apache.james.mailbox.postgres.PostgresMailboxId;
import org.apache.james.mailbox.postgres.PostgresMailboxSessionMapperFactory;
import org.apache.james.mailbox.postgres.PostgresMessageId;
import org.apache.james.mailbox.postgres.mail.PostgresMailboxManager;
+import
org.apache.james.mailbox.postgres.mail.PostgresMessageBlobReferenceSource;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
import org.apache.james.mailbox.store.MailboxManagerConfiguration;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.NoMailboxPathLocker;
@@ -117,6 +120,8 @@ public class PostgresMailboxModule extends AbstractModule {
bind(ReIndexer.class).to(ReIndexerImpl.class);
+ bind(PostgresMessageDAO.class).in(Scopes.SINGLETON);
+
Multibinder.newSetBinder(binder(),
MailboxManagerDefinition.class).addBinding().to(PostgresMailboxManagerDefinition.class);
Multibinder.newSetBinder(binder(),
EventListener.GroupEventListener.class)
@@ -141,6 +146,9 @@ public class PostgresMailboxModule extends AbstractModule {
Multibinder<DeleteUserDataTaskStep> deleteUserDataTaskStepMultibinder
= Multibinder.newSetBinder(binder(), DeleteUserDataTaskStep.class);
deleteUserDataTaskStepMultibinder.addBinding().to(MailboxUserDeletionTaskStep.class);
+
+ Multibinder<BlobReferenceSource> blobReferenceSourceMultibinder =
Multibinder.newSetBinder(binder(), BlobReferenceSource.class);
+
blobReferenceSourceMultibinder.addBinding().to(PostgresMessageBlobReferenceSource.class);
}
@Singleton
diff --git
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
index e5f849cebb..5a2950e484 100644
---
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
+++
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
@@ -73,13 +73,24 @@ public class PostgresCommonModule extends AbstractModule {
@Provides
@Singleton
- JamesPostgresConnectionFactory
provideJamesPostgresConnectionFactory(PostgresConfiguration
postgresConfiguration, ConnectionFactory connectionFactory) {
+ JamesPostgresConnectionFactory
provideJamesPostgresConnectionFactory(PostgresConfiguration
postgresConfiguration,
+
ConnectionFactory connectionFactory,
+
@Named(JamesPostgresConnectionFactory.NON_RLS_INJECT)
JamesPostgresConnectionFactory singlePostgresConnectionFactory) {
if (postgresConfiguration.rowLevelSecurityEnabled()) {
LOGGER.info("PostgreSQL row level security enabled");
LOGGER.info("Implementation for PostgreSQL connection factory:
{}", DomainImplPostgresConnectionFactory.class.getName());
return new DomainImplPostgresConnectionFactory(connectionFactory);
}
LOGGER.info("Implementation for PostgreSQL connection factory: {}",
SinglePostgresConnectionFactory.class.getName());
+ return singlePostgresConnectionFactory;
+ }
+
+ @Provides
+ @Named(JamesPostgresConnectionFactory.NON_RLS_INJECT)
+ @Singleton
+ JamesPostgresConnectionFactory
provideJamesPostgresConnectionFactoryWithRLSBypass(PostgresConfiguration
postgresConfiguration,
+
@Named(JamesPostgresConnectionFactory.NON_RLS_INJECT) ConnectionFactory
connectionFactory) {
+ LOGGER.info("Implementation for PostgreSQL connection factory: {}",
SinglePostgresConnectionFactory.class.getName());
return new
SinglePostgresConnectionFactory(Mono.from(connectionFactory.create()).block());
}
@@ -87,8 +98,8 @@ public class PostgresCommonModule extends AbstractModule {
@Singleton
ConnectionFactory postgresqlConnectionFactory(PostgresConfiguration
postgresConfiguration) {
return new
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
- .host(postgresConfiguration.getUri().getHost())
- .port(postgresConfiguration.getUri().getPort())
+ .host(postgresConfiguration.getHost())
+ .port(postgresConfiguration.getPort())
.username(postgresConfiguration.getCredential().getUsername())
.password(postgresConfiguration.getCredential().getPassword())
.database(postgresConfiguration.getDatabaseName())
@@ -96,6 +107,20 @@ public class PostgresCommonModule extends AbstractModule {
.build());
}
+ @Provides
+ @Named(JamesPostgresConnectionFactory.NON_RLS_INJECT)
+ @Singleton
+ ConnectionFactory
postgresqlConnectionFactoryRLSBypass(PostgresConfiguration
postgresConfiguration) {
+ return new
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
+ .host(postgresConfiguration.getHost())
+ .port(postgresConfiguration.getPort())
+
.username(postgresConfiguration.getNonRLSCredential().getUsername())
+
.password(postgresConfiguration.getNonRLSCredential().getPassword())
+ .database(postgresConfiguration.getDatabaseName())
+ .schema(postgresConfiguration.getDatabaseSchema())
+ .build());
+ }
+
@Provides
@Singleton
PostgresModule composePostgresDataDefinitions(Set<PostgresModule> modules)
{
@@ -110,6 +135,13 @@ public class PostgresCommonModule extends AbstractModule {
return new PostgresTableManager(postgresExecutor, postgresModule,
postgresConfiguration);
}
+ @Provides
+ @Named(PostgresExecutor.NON_RLS_INJECT)
+ @Singleton
+ PostgresExecutor.Factory
postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT)
JamesPostgresConnectionFactory singlePostgresConnectionFactory) {
+ return new PostgresExecutor.Factory(singlePostgresConnectionFactory);
+ }
+
@Provides
@Named(DEFAULT_INJECT)
@Singleton
@@ -117,6 +149,13 @@ public class PostgresCommonModule extends AbstractModule {
return factory.create();
}
+ @Provides
+ @Named(PostgresExecutor.NON_RLS_INJECT)
+ @Singleton
+ PostgresExecutor
postgresExecutorWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT)
PostgresExecutor.Factory factory) {
+ return factory.create();
+ }
+
@Provides
@Singleton
PostgresExecutor postgresExecutor(@Named(DEFAULT_INJECT) PostgresExecutor
postgresExecutor) {
diff --git
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
index f0bbbfa3ae..550fb7c8cf 100644
---
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
+++
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
@@ -21,11 +21,14 @@ package org.apache.james.modules.data;
import org.apache.commons.configuration2.BaseHierarchicalConfiguration;
import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobReferenceSource;
import org.apache.james.mailrepository.api.MailRepositoryFactory;
import org.apache.james.mailrepository.api.MailRepositoryUrlStore;
import org.apache.james.mailrepository.api.Protocol;
import org.apache.james.mailrepository.memory.MailRepositoryStoreConfiguration;
import org.apache.james.mailrepository.postgres.PostgresMailRepository;
+import
org.apache.james.mailrepository.postgres.PostgresMailRepositoryBlobReferenceSource;
+import
org.apache.james.mailrepository.postgres.PostgresMailRepositoryContentDAO;
import org.apache.james.mailrepository.postgres.PostgresMailRepositoryFactory;
import org.apache.james.mailrepository.postgres.PostgresMailRepositoryUrlStore;
@@ -37,6 +40,7 @@ import com.google.inject.multibindings.Multibinder;
public class PostgresMailRepositoryModule extends AbstractModule {
@Override
protected void configure() {
+ bind(PostgresMailRepositoryContentDAO.class).in(Scopes.SINGLETON);
bind(PostgresMailRepositoryUrlStore.class).in(Scopes.SINGLETON);
bind(MailRepositoryUrlStore.class).to(PostgresMailRepositoryUrlStore.class);
@@ -51,5 +55,8 @@ public class PostgresMailRepositoryModule extends
AbstractModule {
.addBinding().to(PostgresMailRepositoryFactory.class);
Multibinder.newSetBinder(binder(), PostgresModule.class)
.addBinding().toInstance(org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.MODULE);
+
+ Multibinder<BlobReferenceSource> blobReferenceSourceMultibinder =
Multibinder.newSetBinder(binder(), BlobReferenceSource.class);
+
blobReferenceSourceMultibinder.addBinding().to(PostgresMailRepositoryBlobReferenceSource.class);
}
}
diff --git
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
index 241fb21536..1f9da8f4c7 100644
---
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
+++
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
@@ -19,344 +19,67 @@
package org.apache.james.mailrepository.postgres;
-import static
org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME;
-import static
org.apache.james.backends.postgres.PostgresCommons.LOCAL_DATE_TIME_DATE_FUNCTION;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.ATTRIBUTES;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.BODY_BLOB_ID;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.ERROR;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.HEADER_BLOB_ID;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.KEY;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.LAST_UPDATED;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.PER_RECIPIENT_SPECIFIC_HEADERS;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.RECIPIENTS;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.REMOTE_ADDRESS;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.REMOTE_HOST;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.SENDER;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.STATE;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.TABLE_NAME;
-import static
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.URL;
-import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
-
-import java.time.LocalDateTime;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
import javax.inject.Inject;
import javax.mail.MessagingException;
-import javax.mail.internet.MimeMessage;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.postgres.utils.PostgresExecutor;
-import org.apache.james.backends.postgres.utils.PostgresUtils;
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.Store;
-import org.apache.james.blob.mail.MimeMessagePartsId;
-import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.core.MailAddress;
-import org.apache.james.core.MaybeSender;
import org.apache.james.mailrepository.api.MailKey;
import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
-import org.apache.james.server.core.MailImpl;
-import org.apache.james.server.core.MimeMessageWrapper;
-import org.apache.james.util.AuditTrail;
-import org.apache.mailet.Attribute;
-import org.apache.mailet.AttributeName;
-import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
-import org.apache.mailet.PerRecipientHeaders;
-import org.jooq.Record;
-import org.jooq.postgres.extensions.types.Hstore;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class PostgresMailRepository implements MailRepository {
- private static final String HEADERS_SEPARATOR = "; ";
-
- private final PostgresExecutor postgresExecutor;
private final MailRepositoryUrl url;
- private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
- private final BlobId.Factory blobIdFactory;
+ private final PostgresMailRepositoryContentDAO
postgresMailRepositoryContentDAO;
@Inject
- public PostgresMailRepository(PostgresExecutor postgresExecutor,
- MailRepositoryUrl url,
- MimeMessageStore.Factory
mimeMessageStoreFactory,
- BlobId.Factory blobIdFactory) {
- this.postgresExecutor = postgresExecutor;
+ public PostgresMailRepository(MailRepositoryUrl url,
+ PostgresMailRepositoryContentDAO
postgresMailRepositoryContentDAO) {
this.url = url;
- this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
- this.blobIdFactory = blobIdFactory;
+ this.postgresMailRepositoryContentDAO =
postgresMailRepositoryContentDAO;
}
@Override
public long size() throws MessagingException {
- return sizeReactive().block();
+ return postgresMailRepositoryContentDAO.size(url);
}
@Override
public Mono<Long> sizeReactive() {
- return postgresExecutor.executeCount(context ->
Mono.from(context.selectCount()
- .from(TABLE_NAME)
- .where(URL.eq(url.asString()))))
- .map(Integer::longValue);
+ return postgresMailRepositoryContentDAO.sizeReactive(url);
}
@Override
public MailKey store(Mail mail) throws MessagingException {
- MailKey mailKey = MailKey.forMail(mail);
-
- return storeMailBlob(mail)
- .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey,
mimeMessagePartsId)
- .doOnSuccess(auditTrailStoredMail(mail))
-
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e ->
Mono.from(mimeMessageStore.delete(mimeMessagePartsId))
- .thenReturn(mailKey)))
- .block();
- }
-
- private Mono<MimeMessagePartsId> storeMailBlob(Mail mail) throws
MessagingException {
- return mimeMessageStore.save(mail.getMessage());
- }
-
- private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey,
MimeMessagePartsId mimeMessagePartsId) {
- return postgresExecutor.executeVoid(context ->
Mono.from(context.insertInto(TABLE_NAME)
- .set(URL, url.asString())
- .set(KEY, mailKey.asString())
- .set(HEADER_BLOB_ID,
mimeMessagePartsId.getHeaderBlobId().asString())
- .set(BODY_BLOB_ID,
mimeMessagePartsId.getBodyBlobId().asString())
- .set(STATE, mail.getState())
- .set(ERROR, mail.getErrorMessage())
- .set(SENDER, mail.getMaybeSender().asString())
- .set(RECIPIENTS, asStringArray(mail.getRecipients()))
- .set(REMOTE_ADDRESS, mail.getRemoteAddr())
- .set(REMOTE_HOST, mail.getRemoteHost())
- .set(LAST_UPDATED,
DATE_TO_LOCAL_DATE_TIME.apply(mail.getLastUpdated()))
- .set(ATTRIBUTES, asHstore(mail.attributes()))
- .set(PER_RECIPIENT_SPECIFIC_HEADERS,
asHstore(mail.getPerRecipientSpecificHeaders().getHeadersByRecipient()))
- .onConflict(URL, KEY)
- .doUpdate()
- .set(HEADER_BLOB_ID,
mimeMessagePartsId.getHeaderBlobId().asString())
- .set(BODY_BLOB_ID,
mimeMessagePartsId.getBodyBlobId().asString())
- .set(STATE, mail.getState())
- .set(ERROR, mail.getErrorMessage())
- .set(SENDER, mail.getMaybeSender().asString())
- .set(RECIPIENTS, asStringArray(mail.getRecipients()))
- .set(REMOTE_ADDRESS, mail.getRemoteAddr())
- .set(REMOTE_HOST, mail.getRemoteHost())
- .set(LAST_UPDATED,
DATE_TO_LOCAL_DATE_TIME.apply(mail.getLastUpdated()))
- .set(ATTRIBUTES, asHstore(mail.attributes()))
- .set(PER_RECIPIENT_SPECIFIC_HEADERS,
asHstore(mail.getPerRecipientSpecificHeaders().getHeadersByRecipient()))
- ))
- .thenReturn(mailKey);
- }
-
- private Consumer<MailKey> auditTrailStoredMail(Mail mail) {
- return Throwing.consumer(any -> AuditTrail.entry()
- .protocol("mailrepository")
- .action("store")
- .parameters(Throwing.supplier(() -> ImmutableMap.of("mailId",
mail.getName(),
- "mimeMessageId", Optional.ofNullable(mail.getMessage())
- .map(Throwing.function(MimeMessage::getMessageID))
- .orElse(""),
- "sender", mail.getMaybeSender().asString(),
- "recipients", StringUtils.join(mail.getRecipients()))))
- .log("PostgresMailRepository stored mail."));
- }
-
- private String[] asStringArray(Collection<MailAddress> mailAddresses) {
- return mailAddresses.stream()
- .map(MailAddress::asString)
- .toArray(String[]::new);
- }
-
- private Hstore asHstore(Multimap<MailAddress, PerRecipientHeaders.Header>
multimap) {
- return Hstore.hstore(multimap
- .asMap()
- .entrySet()
- .stream()
- .map(recipientToHeaders ->
Pair.of(recipientToHeaders.getKey().asString(),
- asString(recipientToHeaders.getValue())))
- .collect(ImmutableMap.toImmutableMap(Pair::getLeft,
Pair::getRight)));
- }
-
- private String asString(Collection<PerRecipientHeaders.Header> headers) {
- return StringUtils.join(headers.stream()
- .map(PerRecipientHeaders.Header::asString)
- .collect(ImmutableList.toImmutableList()), HEADERS_SEPARATOR);
- }
-
- private Hstore asHstore(Stream<Attribute> attributes) {
- return Hstore.hstore(attributes
- .flatMap(attribute -> attribute.getValue()
- .toJson()
- .map(JsonNode::toString)
- .map(value -> Pair.of(attribute.getName().asString(),
value)).stream())
- .collect(ImmutableMap.toImmutableMap(Pair::getLeft,
Pair::getRight)));
+ return postgresMailRepositoryContentDAO.store(mail, url);
}
@Override
public Iterator<MailKey> list() throws MessagingException {
- return listMailKeys()
- .toStream()
- .iterator();
- }
-
- private Flux<MailKey> listMailKeys() {
- return postgresExecutor.executeRows(context ->
Flux.from(context.select(KEY)
- .from(TABLE_NAME)
- .where(URL.eq(url.asString()))))
- .map(record -> new MailKey(record.get(KEY)));
+ return postgresMailRepositoryContentDAO.list(url);
}
@Override
public Mail retrieve(MailKey key) {
- return postgresExecutor.executeRow(context ->
Mono.from(context.select()
- .from(TABLE_NAME)
- .where(URL.eq(url.asString()))
- .and(KEY.eq(key.asString()))))
- .flatMap(this::toMail)
- .blockOptional()
- .orElse(null);
- }
-
- private Mono<Mail> toMail(Record record) {
- return mimeMessageStore.read(toMimeMessagePartsId(record))
- .map(Throwing.function(mimeMessage -> toMail(record,
mimeMessage)));
- }
-
- private Mail toMail(Record record, MimeMessage mimeMessage) throws
MessagingException {
- List<MailAddress> recipients = Arrays.stream(record.get(RECIPIENTS))
- .map(Throwing.function(MailAddress::new))
- .collect(ImmutableList.toImmutableList());
-
- PerRecipientHeaders perRecipientHeaders =
getPerRecipientHeaders(record);
-
- List<Attribute> attributes = Hstore.hstore(record.get(ATTRIBUTES,
LinkedHashMap.class))
- .data()
- .entrySet()
- .stream()
- .map(Throwing.function(entry -> new
Attribute(AttributeName.of(entry.getKey()),
- AttributeValue.fromJsonString(entry.getValue()))))
- .collect(ImmutableList.toImmutableList());
-
- MailImpl mail = MailImpl.builder()
- .name(record.get(KEY))
- .sender(MaybeSender.getMailSender(record.get(SENDER)))
- .addRecipients(recipients)
-
.lastUpdated(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(LAST_UPDATED,
LocalDateTime.class)))
- .errorMessage(record.get(ERROR))
- .remoteHost(record.get(REMOTE_HOST))
- .remoteAddr(record.get(REMOTE_ADDRESS))
- .state(record.get(STATE))
- .addAllHeadersForRecipients(perRecipientHeaders)
- .addAttributes(attributes)
- .build();
-
- if (mimeMessage instanceof MimeMessageWrapper) {
- mail.setMessageNoCopy((MimeMessageWrapper) mimeMessage);
- } else {
- mail.setMessage(mimeMessage);
- }
-
- return mail;
- }
-
- private PerRecipientHeaders getPerRecipientHeaders(Record record) {
- PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
-
- Hstore.hstore(record.get(PER_RECIPIENT_SPECIFIC_HEADERS,
LinkedHashMap.class))
- .data()
- .entrySet()
- .stream()
- .flatMap(this::recipientToHeaderStream)
- .forEach(recipientToHeaderPair ->
perRecipientHeaders.addHeaderForRecipient(
- recipientToHeaderPair.getRight(),
- recipientToHeaderPair.getLeft()));
-
- return perRecipientHeaders;
- }
-
- private Stream<Pair<MailAddress, PerRecipientHeaders.Header>>
recipientToHeaderStream(Map.Entry<String, String> recipientToHeadersString) {
- List<String> headers = Splitter.on(HEADERS_SEPARATOR)
- .splitToList(recipientToHeadersString.getValue());
-
- return headers
- .stream()
- .map(headerAsString -> Pair.of(
- asMailAddress(recipientToHeadersString.getKey()),
- PerRecipientHeaders.Header.fromString(headerAsString)));
- }
-
- private MailAddress asMailAddress(String mailAddress) {
- return Throwing.supplier(() -> new MailAddress(mailAddress))
- .get();
- }
-
- private MimeMessagePartsId toMimeMessagePartsId(Record record) {
- return MimeMessagePartsId.builder()
- .headerBlobId(blobIdFactory.from(record.get(HEADER_BLOB_ID)))
- .bodyBlobId(blobIdFactory.from(record.get(BODY_BLOB_ID)))
- .build();
+ return postgresMailRepositoryContentDAO.retrieve(key, url);
}
@Override
public void remove(MailKey key) {
- removeReactive(key).block();
- }
-
- private Mono<Void> removeReactive(MailKey key) {
- return getMimeMessagePartsId(key)
- .flatMap(mimeMessagePartsId -> deleteMailMetadata(key)
- .then(deleteMailBlob(mimeMessagePartsId)));
- }
-
- private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key) {
- return postgresExecutor.executeRow(context ->
Mono.from(context.select(HEADER_BLOB_ID, BODY_BLOB_ID)
- .from(TABLE_NAME)
- .where(URL.eq(url.asString()))
- .and(KEY.eq(key.asString()))))
- .map(this::toMimeMessagePartsId);
- }
-
- private Mono<Void> deleteMailMetadata(MailKey key) {
- return postgresExecutor.executeVoid(context ->
Mono.from(context.deleteFrom(TABLE_NAME)
- .where(URL.eq(url.asString()))
- .and(KEY.eq(key.asString()))));
- }
-
- private Mono<Void> deleteMailBlob(MimeMessagePartsId mimeMessagePartsId) {
- return Mono.from(mimeMessageStore.delete(mimeMessagePartsId));
+ postgresMailRepositoryContentDAO.remove(key, url);
}
@Override
public void remove(Collection<MailKey> keys) {
- Flux.fromIterable(keys)
- .concatMap(this::removeReactive)
- .then()
- .block();
+ postgresMailRepositoryContentDAO.remove(keys, url);
}
@Override
public void removeAll() {
- listMailKeys()
- .flatMap(this::removeReactive, DEFAULT_CONCURRENCY)
- .then()
- .block();
+ postgresMailRepositoryContentDAO.removeAll(url);
}
}
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
similarity index 62%
copy from
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
copy to
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
index 8d8391e209..bd5a39f8f3 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
@@ -17,21 +17,25 @@
* under the License. *
****************************************************************/
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.mailrepository.postgres;
-import java.util.Optional;
+import javax.inject.Inject;
-import org.apache.james.core.Domain;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
-public interface JamesPostgresConnectionFactory {
- String DOMAIN_ATTRIBUTE = "app.current_domain";
+public class PostgresMailRepositoryBlobReferenceSource implements
BlobReferenceSource {
+ private final PostgresMailRepositoryContentDAO
postgresMailRepositoryContentDAO;
- default Mono<Connection> getConnection(Domain domain) {
- return getConnection(Optional.ofNullable(domain));
+ @Inject
+ public
PostgresMailRepositoryBlobReferenceSource(PostgresMailRepositoryContentDAO
postgresMailRepositoryContentDAO) {
+ this.postgresMailRepositoryContentDAO =
postgresMailRepositoryContentDAO;
}
- Mono<Connection> getConnection(Optional<Domain> domain);
+ @Override
+ public Flux<BlobId> listReferencedBlobs() {
+ return postgresMailRepositoryContentDAO.listBlobs();
+ }
}
diff --git
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
similarity index 87%
copy from
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
copy to
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
index 241fb21536..2a52d4cb60 100644
---
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
+++
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
@@ -63,7 +63,6 @@ import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.core.MailAddress;
import org.apache.james.core.MaybeSender;
import org.apache.james.mailrepository.api.MailKey;
-import org.apache.james.mailrepository.api.MailRepository;
import org.apache.james.mailrepository.api.MailRepositoryUrl;
import org.apache.james.server.core.MailImpl;
import org.apache.james.server.core.MimeMessageWrapper;
@@ -86,44 +85,38 @@ import com.google.common.collect.Multimap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public class PostgresMailRepository implements MailRepository {
+public class PostgresMailRepositoryContentDAO {
private static final String HEADERS_SEPARATOR = "; ";
private final PostgresExecutor postgresExecutor;
- private final MailRepositoryUrl url;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final BlobId.Factory blobIdFactory;
@Inject
- public PostgresMailRepository(PostgresExecutor postgresExecutor,
- MailRepositoryUrl url,
- MimeMessageStore.Factory
mimeMessageStoreFactory,
- BlobId.Factory blobIdFactory) {
+ public PostgresMailRepositoryContentDAO(PostgresExecutor postgresExecutor,
+ MimeMessageStore.Factory
mimeMessageStoreFactory,
+ BlobId.Factory blobIdFactory) {
this.postgresExecutor = postgresExecutor;
- this.url = url;
this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
this.blobIdFactory = blobIdFactory;
}
- @Override
- public long size() throws MessagingException {
- return sizeReactive().block();
+ public long size(MailRepositoryUrl url) throws MessagingException {
+ return sizeReactive(url).block();
}
- @Override
- public Mono<Long> sizeReactive() {
+ public Mono<Long> sizeReactive(MailRepositoryUrl url) {
return postgresExecutor.executeCount(context ->
Mono.from(context.selectCount()
.from(TABLE_NAME)
.where(URL.eq(url.asString()))))
.map(Integer::longValue);
}
- @Override
- public MailKey store(Mail mail) throws MessagingException {
+ public MailKey store(Mail mail, MailRepositoryUrl url) throws
MessagingException {
MailKey mailKey = MailKey.forMail(mail);
return storeMailBlob(mail)
- .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey,
mimeMessagePartsId)
+ .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey,
mimeMessagePartsId, url)
.doOnSuccess(auditTrailStoredMail(mail))
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e ->
Mono.from(mimeMessageStore.delete(mimeMessagePartsId))
.thenReturn(mailKey)))
@@ -134,7 +127,7 @@ public class PostgresMailRepository implements
MailRepository {
return mimeMessageStore.save(mail.getMessage());
}
- private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey,
MimeMessagePartsId mimeMessagePartsId) {
+ private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey,
MimeMessagePartsId mimeMessagePartsId, MailRepositoryUrl url) {
return postgresExecutor.executeVoid(context ->
Mono.from(context.insertInto(TABLE_NAME)
.set(URL, url.asString())
.set(KEY, mailKey.asString())
@@ -210,22 +203,20 @@ public class PostgresMailRepository implements
MailRepository {
.collect(ImmutableMap.toImmutableMap(Pair::getLeft,
Pair::getRight)));
}
- @Override
- public Iterator<MailKey> list() throws MessagingException {
- return listMailKeys()
+ public Iterator<MailKey> list(MailRepositoryUrl url) throws
MessagingException {
+ return listMailKeys(url)
.toStream()
.iterator();
}
- private Flux<MailKey> listMailKeys() {
+ private Flux<MailKey> listMailKeys(MailRepositoryUrl url) {
return postgresExecutor.executeRows(context ->
Flux.from(context.select(KEY)
.from(TABLE_NAME)
.where(URL.eq(url.asString()))))
.map(record -> new MailKey(record.get(KEY)));
}
- @Override
- public Mail retrieve(MailKey key) {
+ public Mail retrieve(MailKey key, MailRepositoryUrl url) {
return postgresExecutor.executeRow(context ->
Mono.from(context.select()
.from(TABLE_NAME)
.where(URL.eq(url.asString()))
@@ -247,8 +238,7 @@ public class PostgresMailRepository implements
MailRepository {
PerRecipientHeaders perRecipientHeaders =
getPerRecipientHeaders(record);
- List<Attribute> attributes = Hstore.hstore(record.get(ATTRIBUTES,
LinkedHashMap.class))
- .data()
+ List<Attribute> attributes = ((LinkedHashMap<String, String>)
record.get(ATTRIBUTES, LinkedHashMap.class))
.entrySet()
.stream()
.map(Throwing.function(entry -> new
Attribute(AttributeName.of(entry.getKey()),
@@ -280,8 +270,7 @@ public class PostgresMailRepository implements
MailRepository {
private PerRecipientHeaders getPerRecipientHeaders(Record record) {
PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
- Hstore.hstore(record.get(PER_RECIPIENT_SPECIFIC_HEADERS,
LinkedHashMap.class))
- .data()
+ ((LinkedHashMap<String, String>)
record.get(PER_RECIPIENT_SPECIFIC_HEADERS, LinkedHashMap.class))
.entrySet()
.stream()
.flatMap(this::recipientToHeaderStream)
@@ -299,7 +288,7 @@ public class PostgresMailRepository implements
MailRepository {
return headers
.stream()
.map(headerAsString -> Pair.of(
- asMailAddress(recipientToHeadersString.getKey()),
+ asMailAddress(recipientToHeadersString.getKey()),
PerRecipientHeaders.Header.fromString(headerAsString)));
}
@@ -315,18 +304,17 @@ public class PostgresMailRepository implements
MailRepository {
.build();
}
- @Override
- public void remove(MailKey key) {
- removeReactive(key).block();
+ public void remove(MailKey key, MailRepositoryUrl url) {
+ removeReactive(key, url).block();
}
- private Mono<Void> removeReactive(MailKey key) {
- return getMimeMessagePartsId(key)
- .flatMap(mimeMessagePartsId -> deleteMailMetadata(key)
+ private Mono<Void> removeReactive(MailKey key, MailRepositoryUrl url) {
+ return getMimeMessagePartsId(key, url)
+ .flatMap(mimeMessagePartsId -> deleteMailMetadata(key, url)
.then(deleteMailBlob(mimeMessagePartsId)));
}
- private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key) {
+ private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key,
MailRepositoryUrl url) {
return postgresExecutor.executeRow(context ->
Mono.from(context.select(HEADER_BLOB_ID, BODY_BLOB_ID)
.from(TABLE_NAME)
.where(URL.eq(url.asString()))
@@ -334,7 +322,7 @@ public class PostgresMailRepository implements
MailRepository {
.map(this::toMimeMessagePartsId);
}
- private Mono<Void> deleteMailMetadata(MailKey key) {
+ private Mono<Void> deleteMailMetadata(MailKey key, MailRepositoryUrl url) {
return postgresExecutor.executeVoid(context ->
Mono.from(context.deleteFrom(TABLE_NAME)
.where(URL.eq(url.asString()))
.and(KEY.eq(key.asString()))));
@@ -344,19 +332,23 @@ public class PostgresMailRepository implements
MailRepository {
return Mono.from(mimeMessageStore.delete(mimeMessagePartsId));
}
- @Override
- public void remove(Collection<MailKey> keys) {
+ public void remove(Collection<MailKey> keys, MailRepositoryUrl url) {
Flux.fromIterable(keys)
- .concatMap(this::removeReactive)
+ .concatMap(mailKey -> removeReactive(mailKey, url))
.then()
.block();
}
- @Override
- public void removeAll() {
- listMailKeys()
- .flatMap(this::removeReactive, DEFAULT_CONCURRENCY)
+ public void removeAll(MailRepositoryUrl url) {
+ listMailKeys(url)
+ .flatMap(mailKey -> removeReactive(mailKey, url),
DEFAULT_CONCURRENCY)
.then()
.block();
}
+
+ public Flux<BlobId> listBlobs() {
+ return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(HEADER_BLOB_ID, BODY_BLOB_ID)
+ .from(TABLE_NAME)))
+ .flatMapIterable(record ->
ImmutableList.of(blobIdFactory.from(record.get(HEADER_BLOB_ID)),
blobIdFactory.from(record.get(BODY_BLOB_ID))));
+ }
}
diff --git
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
index d947775d9b..5b85e7b043 100644
---
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
+++
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
@@ -47,6 +47,6 @@ public class PostgresMailRepositoryFactory implements
MailRepositoryFactory {
@Override
public MailRepository create(MailRepositoryUrl url) {
- return new PostgresMailRepository(executor, url,
mimeMessageStoreFactory, blobIdFactory);
+ return new PostgresMailRepository(url, new
PostgresMailRepositoryContentDAO(executor, mimeMessageStoreFactory,
blobIdFactory));
}
}
diff --git
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
new file mode 100644
index 0000000000..93b6fa513a
--- /dev/null
+++
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
@@ -0,0 +1,94 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import javax.mail.MessagingException;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.blob.memory.MemoryBlobStoreFactory;
+import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.mailrepository.api.MailKey;
+import org.apache.james.mailrepository.api.MailRepositoryPath;
+import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.mailrepository.api.Protocol;
+import org.apache.james.server.core.MailImpl;
+import org.apache.mailet.Attribute;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PostgresMailRepositoryBlobReferenceSourceTest {
+ @RegisterExtension
+ static PostgresExtension postgresExtension =
PostgresExtension.withoutRowLevelSecurity(PostgresModule.aggregateModules(PostgresMailRepositoryModule.MODULE));
+
+ private static final MailRepositoryUrl URL =
MailRepositoryUrl.fromPathAndProtocol(new Protocol("postgres"),
MailRepositoryPath.from("testrepo"));
+
+ PostgresMailRepositoryContentDAO postgresMailRepositoryContentDAO;
+ PostgresMailRepositoryBlobReferenceSource
postgresMailRepositoryBlobReferenceSource;
+
+ @BeforeEach
+ void beforeEach() {
+ BlobId.Factory factory = new HashBlobId.Factory();
+ BlobStore blobStore = MemoryBlobStoreFactory.builder()
+ .blobIdFactory(factory)
+ .defaultBucketName()
+ .passthrough();
+ postgresMailRepositoryContentDAO = new
PostgresMailRepositoryContentDAO(postgresExtension.getPostgresExecutor(),
MimeMessageStore.factory(blobStore), factory);
+ postgresMailRepositoryBlobReferenceSource = new
PostgresMailRepositoryBlobReferenceSource(postgresMailRepositoryContentDAO);
+ }
+
+ @Test
+ void blobReferencesShouldBeEmptyByDefault() {
+
assertThat(postgresMailRepositoryBlobReferenceSource.listReferencedBlobs().collectList().block())
+ .isEmpty();
+ }
+
+ @Test
+ void blobReferencesShouldReturnAllBlobs() throws Exception {
+ postgresMailRepositoryContentDAO.store(createMail(new
MailKey("mail1")), URL);
+ postgresMailRepositoryContentDAO.store(createMail(new
MailKey("mail2")), URL);
+
+
assertThat(postgresMailRepositoryBlobReferenceSource.listReferencedBlobs().collectList().block())
+ .hasSize(4);
+ }
+
+ private MailImpl createMail(MailKey key) throws MessagingException {
+ return MailImpl.builder()
+ .name(key.asString())
+ .sender("sender@localhost")
+ .addRecipient("[email protected]")
+ .addRecipient("[email protected]")
+ .addAttribute(Attribute.convertToAttribute("testAttribute",
"testValue"))
+ .mimeMessage(MimeMessageBuilder
+ .mimeMessageBuilder()
+ .setSubject("test")
+ .setText("original body")
+ .build())
+ .build();
+ }
+
+}
diff --git
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
index 71ba41f5de..35a17357d9 100644
---
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
+++
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
@@ -58,6 +58,6 @@ public class PostgresMailRepositoryTest implements
MailRepositoryContract {
.blobIdFactory(BLOB_ID_FACTORY)
.defaultBucketName()
.passthrough();
- return new
PostgresMailRepository(postgresExtension.getPostgresExecutor(), url,
MimeMessageStore.factory(blobStore), BLOB_ID_FACTORY);
+ return new PostgresMailRepository(url, new
PostgresMailRepositoryContentDAO(postgresExtension.getPostgresExecutor(),
MimeMessageStore.factory(blobStore), BLOB_ID_FACTORY));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]