MAILBOX-266 Implement a delete by query using Scroll + Bulk queries and use it
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6ba31a66 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6ba31a66 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6ba31a66 Branch: refs/heads/master Commit: 6ba31a6680d7023ed3052b5783fc9fc54dc36165 Parents: ebeafcf Author: Benoit Tellier <btell...@linagora.com> Authored: Wed Mar 23 16:24:31 2016 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Apr 6 16:18:45 2016 +0700 ---------------------------------------------------------------------- .../elasticsearch/DeleteByQueryPerformer.java | 81 ++++++++++++++++++++ .../elasticsearch/ElasticSearchIndexer.java | 10 ++- ...lasticSearchListeningMessageSearchIndex.java | 6 +- .../elasticsearch/ElasticSearchIndexerTest.java | 35 +++++---- .../ElasticSearchIntegrationTest.java | 3 +- .../host/ElasticSearchHostSystem.java | 4 +- .../james/JamesCapabilitiesServerTest.java | 1 - .../james/modules/CommonServicesModule.java | 6 +- .../server/AsyncTasksExecutorModule.java | 45 +++++++++++ 9 files changed, 164 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java new file mode 100644 index 0000000..9d266ce --- /dev/null +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java @@ -0,0 +1,81 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.elasticsearch; + +import java.util.concurrent.ExecutorService; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.apache.james.mailbox.elasticsearch.search.ScrollIterable; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; + +public class DeleteByQueryPerformer { + public static final int BATCH_SIZE = 100; + public static final TimeValue TIMEOUT = new TimeValue(60000); + + private final ClientProvider clientProvider; + private final ExecutorService executor; + + @Inject + public DeleteByQueryPerformer(ClientProvider clientProvider, @Named("AsyncExecutor") ExecutorService executor) { + this.clientProvider = clientProvider; + this.executor = executor; + } + + public void perform(QueryBuilder queryBuilder) { + executor.execute(() -> doDeleteByQuery(queryBuilder)); + } + + private Void doDeleteByQuery(QueryBuilder queryBuilder) { + try (Client client = clientProvider.get()) { + ScrollIterable scrollIterable = new ScrollIterable(client, + client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) + .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) + .setScroll(TIMEOUT) + .setNoFields() + .setQuery(queryBuilder) + .setSize(BATCH_SIZE)); + for (SearchResponse searchResponse : scrollIterable) { + deleteRetrievedIds(client, searchResponse); + } + return null; + } + } + + private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + for (SearchHit hit : searchResponse.getHits()) { + bulkRequestBuilder.add(client.prepareDelete() + .setIndex(ElasticSearchIndexer.MAILBOX_INDEX) + .setType(ElasticSearchIndexer.MESSAGE_TYPE) + .setId(hit.getId())); + } + return bulkRequestBuilder.execute(); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java index 630abb6..1b6017d 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java @@ -20,11 +20,11 @@ package org.apache.james.mailbox.elasticsearch; import javax.inject.Inject; -import org.apache.commons.lang.NotImplementedException; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; import com.google.common.base.Preconditions; @@ -34,10 +34,12 @@ public class ElasticSearchIndexer { public static final String MESSAGE_TYPE = "message"; private final ClientProvider clientProvider; + private final DeleteByQueryPerformer deleteByQueryPerformer; @Inject - public ElasticSearchIndexer(ClientProvider clientProvider) { + public ElasticSearchIndexer(ClientProvider clientProvider, DeleteByQueryPerformer deleteByQueryPerformer) { this.clientProvider = clientProvider; + this.deleteByQueryPerformer = deleteByQueryPerformer; } public IndexResponse indexMessage(String id, String content) { @@ -65,8 +67,8 @@ public class ElasticSearchIndexer { } } - public void deleteAllWithIdStarting(String idStart) { - throw new NotImplementedException(); + public void deleteAllMatchingQuery(QueryBuilder queryBuilder) { + deleteByQueryPerformer.perform(queryBuilder); } private void checkArgument(String content) { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index 30146c0..b7f34fe 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -18,6 +18,8 @@ ****************************************************************/ package org.apache.james.mailbox.elasticsearch.events; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + import java.util.Iterator; import javax.inject.Inject; @@ -25,6 +27,7 @@ import javax.mail.Flags; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; +import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson; import org.apache.james.mailbox.elasticsearch.search.ElasticSearchSearcher; import org.apache.james.mailbox.exception.MailboxException; @@ -39,7 +42,6 @@ import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ElasticSearchListeningMessageSearchIndex<Id extends MailboxId> extends ListeningMessageSearchIndex<Id> { private final static Logger LOGGER = LoggerFactory.getLogger(ElasticSearchListeningMessageSearchIndex.class); @@ -80,7 +82,7 @@ public class ElasticSearchListeningMessageSearchIndex<Id extends MailboxId> exte @Override public void delete(MailboxSession session, Mailbox<Id> mailbox, MessageRange range) throws MailboxException { if (range.getType() == Type.ALL) { - indexer.deleteAllWithIdStarting(mailbox.getMailboxId() + ID_SEPARATOR); + indexer.deleteAllMatchingQuery(termQuery(JsonMessageConstants.MAILBOX_ID, mailbox.getMailboxId().serialize())); } else { range.forEach(messageId -> { try { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java index 850494f..1e4ce1a 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java @@ -20,21 +20,25 @@ package org.apache.james.mailbox.elasticsearch; import static org.assertj.core.api.Assertions.assertThat; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; import java.io.IOException; +import java.util.concurrent.Executors; +import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; import org.apache.james.mailbox.elasticsearch.utils.TestingClientProvider; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.node.Node; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; +import com.google.common.collect.Lists; + public class ElasticSearchIndexerTest { private TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -45,11 +49,14 @@ public class ElasticSearchIndexerTest { private Node node; private ElasticSearchIndexer testee; + private DeleteByQueryPerformer deleteByQueryPerformer; @Before public void setup() throws IOException { node = embeddedElasticSearch.getNode(); - testee = new ElasticSearchIndexer(new TestingClientProvider(node)); + TestingClientProvider clientProvider = new TestingClientProvider(node); + deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor()); + testee = new ElasticSearchIndexer(clientProvider, deleteByQueryPerformer); } @Test @@ -113,16 +120,15 @@ public class ElasticSearchIndexerTest { testee.updateMessage("1", null); } - @Ignore @Test - public void deleteAllWithIdStarting() throws Exception { + public void deleteByQueryShouldWorkOnSingleMessage() throws Exception { String messageId = "1:2"; - String content = "{\"message\": \"trying out Elasticsearch\"}"; + String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}"; testee.indexMessage(messageId, content); embeddedElasticSearch.awaitForElasticSearch(); - testee.deleteAllWithIdStarting("1:"); + testee.deleteAllMatchingQuery(termQuery(JsonMessageConstants.MAILBOX_ID, "1")); embeddedElasticSearch.awaitForElasticSearch(); try (Client client = node.client()) { @@ -134,28 +140,25 @@ public class ElasticSearchIndexerTest { } } - @Ignore @Test - public void deleteAllWithIdStartingWhenMultipleMessages() throws Exception { - String messageId = "1:2"; - String content = "{\"message\": \"trying out Elasticsearch\"}"; + public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception { + String messageId = "1:1"; + String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}"; testee.indexMessage(messageId, content); - embeddedElasticSearch.awaitForElasticSearch(); String messageId2 = "1:2"; - String content2 = "{\"message\": \"trying out Elasticsearch 2\"}"; + String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}"; testee.indexMessage(messageId2, content2); - embeddedElasticSearch.awaitForElasticSearch(); String messageId3 = "2:3"; - String content3 = "{\"message\": \"trying out Elasticsearch 3\"}"; + String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}"; testee.indexMessage(messageId3, content3); embeddedElasticSearch.awaitForElasticSearch(); - - testee.deleteAllWithIdStarting("1:"); + + testee.deleteAllMatchingQuery(termQuery(JsonMessageConstants.MAILBOX_ID, "1")); embeddedElasticSearch.awaitForElasticSearch(); try (Client client = node.client()) { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java index e27f5af..2f5962e 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.ZoneId; import java.util.Date; +import java.util.concurrent.Executors; import javax.mail.Flags; @@ -167,7 +168,7 @@ public class ElasticSearchIntegrationTest { ); MailboxSessionMapperFactory<InMemoryId> mapperFactory = new InMemoryMailboxSessionMapperFactory(); elasticSearchListeningMessageSearchIndex = new ElasticSearchListeningMessageSearchIndex<>(mapperFactory, - new ElasticSearchIndexer(clientProvider), + new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor())), new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter())), new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"))); storeMailboxManager = new StoreMailboxManager<>( http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java ---------------------------------------------------------------------- diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java index d804786..d90329b 100644 --- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java +++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java @@ -21,6 +21,7 @@ package org.apache.james.mpt.imapmailbox.elasticsearch.host; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; import org.apache.james.imap.api.process.ImapProcessor; @@ -32,6 +33,7 @@ import org.apache.james.mailbox.acl.MailboxACLResolver; import org.apache.james.mailbox.acl.SimpleGroupMembershipResolver; import org.apache.james.mailbox.acl.UnionMailboxACLResolver; import org.apache.james.mailbox.elasticsearch.ClientProvider; +import org.apache.james.mailbox.elasticsearch.DeleteByQueryPerformer; import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch; import org.apache.james.mailbox.elasticsearch.IndexCreationFactory; @@ -102,7 +104,7 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem { ElasticSearchListeningMessageSearchIndex<InMemoryId> searchIndex = new ElasticSearchListeningMessageSearchIndex<>( factory, - new ElasticSearchIndexer(clientProvider), + new ElasticSearchIndexer(clientProvider, new DeleteByQueryPerformer(clientProvider, Executors.newSingleThreadExecutor())), new ElasticSearchSearcher<>(clientProvider, new QueryConverter(new CriterionConverter())), new MessageToElasticSearchJson(new DefaultTextExtractor())); http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesCapabilitiesServerTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesCapabilitiesServerTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesCapabilitiesServerTest.java index 761d1fd..b39f3af 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesCapabilitiesServerTest.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/JamesCapabilitiesServerTest.java @@ -88,7 +88,6 @@ public class JamesCapabilitiesServerTest { bind(MailboxManager.class).toInstance(mailboxManager); } - @SuppressWarnings("unused") @Provides @Singleton Session provideSession(CassandraModule cassandraModule) { http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/server/container/guice/guice-common/src/main/java/org/apache/james/modules/CommonServicesModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/CommonServicesModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/CommonServicesModule.java index 175a92b..029362c 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/CommonServicesModule.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/CommonServicesModule.java @@ -30,6 +30,7 @@ import org.apache.james.core.filesystem.FileSystemImpl; import org.apache.james.filesystem.api.FileSystem; import org.apache.james.filesystem.api.JamesDirectoriesProvider; import org.apache.james.mailbox.store.mail.model.MailboxId; +import org.apache.james.modules.server.AsyncTasksExecutorModule; import org.apache.james.modules.server.ConfigurationProviderModule; import org.apache.james.modules.server.DNSServiceModule; import org.apache.james.utils.ConfigurationProvider; @@ -56,18 +57,19 @@ public class CommonServicesModule<Id extends MailboxId> extends AbstractModule { install(new ConfigurationProviderModule()); install(new PreDestroyModule()); install(new DNSServiceModule()); + install(new AsyncTasksExecutorModule()); bind(FileSystem.class).to(FileSystemImpl.class); bind(ConfigurationProvider.class).to(FileConfigurationProvider.class); TypeLiteral<GuiceServerProbe<Id>> serverProbe = guiceGenericType.newGenericType(GuiceServerProbe.class); bind(serverProbe).in(Singleton.class); } - + @Provides @Singleton @Named(CONFIGURATION_PATH) public String configurationPath() { return FileSystem.FILE_PROTOCOL_AND_CONF; } - + @Provides @Singleton public JamesDirectoriesProvider directories() throws MissingArgumentException { String rootDirectory = Optional http://git-wip-us.apache.org/repos/asf/james-project/blob/6ba31a66/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java new file mode 100644 index 0000000..5b49d8b --- /dev/null +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/AsyncTasksExecutorModule.java @@ -0,0 +1,45 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.modules.server; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.inject.Named; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; + +public class AsyncTasksExecutorModule extends AbstractModule { + + public static final int THREAD_POOL_SIZE = 8; + + @Override + protected void configure() { + + } + + @Provides + @Singleton + @Named("AsyncExecutor") + public ExecutorService provideAsyncExecutorService() { + return Executors.newFixedThreadPool(THREAD_POOL_SIZE); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org