Author: btellier Date: Mon Jun 29 08:33:09 2015 New Revision: 1688127 URL: http://svn.apache.org/r1688127 Log: MAILBOX-242 Add event listener on messages - patch contributed by Antoine Duprat
Added: james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java Modified: james/mailbox/trunk/elasticsearch/pom.xml james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeProvider.java james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml Modified: james/mailbox/trunk/elasticsearch/pom.xml URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/pom.xml?rev=1688127&r1=1688126&r2=1688127&view=diff ============================================================================== --- james/mailbox/trunk/elasticsearch/pom.xml (original) +++ james/mailbox/trunk/elasticsearch/pom.xml Mon Jun 29 08:33:09 2015 @@ -97,6 +97,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.james</groupId> + <artifactId>apache-james-mailbox-store</artifactId> + </dependency> + <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>4.10.4</version> @@ -109,6 +113,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.3.1</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>1.5.2</version> Modified: james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeProvider.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeProvider.java?rev=1688127&r1=1688126&r2=1688127&view=diff ============================================================================== --- james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeProvider.java (original) +++ james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeProvider.java Mon Jun 29 08:33:09 2015 @@ -33,7 +33,7 @@ public class NodeProvider { .clusterName(clusterName) .settings(ImmutableSettings.builder() .put(GLOBAL_NETWORK_HOST_SETTING, masterHost) - .put(SCRIPT_DISABLE_DYNAMIC, true)) + .put(SCRIPT_DISABLE_DYNAMIC, false)) .client(true) .node() .start(); Added: james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java?rev=1688127&view=auto ============================================================================== --- james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java (added) +++ james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java Mon Jun 29 08:33:09 2015 @@ -0,0 +1,106 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.elasticsearch.events; + +import java.util.Iterator; + +import javax.mail.Flags; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; +import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.MessageRange.Type; +import org.apache.james.mailbox.model.SearchQuery; +import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; +import org.apache.james.mailbox.store.mail.MessageMapperFactory; +import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.mailbox.store.mail.model.MailboxId; +import org.apache.james.mailbox.store.mail.model.Message; +import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ElasticSearchListeningMessageSearchIndex<Id extends MailboxId> extends ListeningMessageSearchIndex<Id> { + + private final static Logger LOGGER = LoggerFactory.getLogger(ElasticSearchListeningMessageSearchIndex.class); + private final static int NO_LIMIT = -1; + private final static String ID_SEPARATOR = ":"; + + private final ElasticSearchIndexer indexer; + private final MessageToElasticSearchJson messageToElasticSearchJson; + + public ElasticSearchListeningMessageSearchIndex(MessageMapperFactory<Id> factory, + ElasticSearchIndexer indexer, MessageToElasticSearchJson messageToElasticSearchJson) { + super(factory); + this.indexer = indexer; + this.messageToElasticSearchJson = messageToElasticSearchJson; + } + + @Override + public Iterator<Long> search(MailboxSession session, Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException { + throw new NotImplementedException(); + } + + @Override + public void add(MailboxSession session, Mailbox<Id> mailbox, Message<Id> message) throws MailboxException { + try { + indexer.indexMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJson(message)); + } catch (Exception e) { + LOGGER.error("Error when indexing message " + message.getUid(), e); + } + } + + @Override + public void delete(MailboxSession session, Mailbox<Id> mailbox, MessageRange range) throws MailboxException { + if (range.getType() == Type.ALL) { + indexer.deleteAllWithIdStarting(mailbox.getMailboxId() + ID_SEPARATOR); + } else { + range.forEach(messageId -> { + try { + indexer.deleteMessage(indexIdFor(mailbox, messageId)); + } catch (Exception e) { + LOGGER.error("Error when deleting index for message " + messageId, e); + } + }); + } + } + + @Override + public void update(MailboxSession session, Mailbox<Id> mailbox, MessageRange range, Flags flags) throws MailboxException { + getFactory().getMessageMapper(session) + .findInMailbox(mailbox, range, FetchType.Full, NO_LIMIT) + .forEachRemaining(message -> { + try { + message.setFlags(flags); + indexer.updateMessage(indexIdFor(mailbox, message.getUid()), messageToElasticSearchJson.convertToJson(message)); + } catch (Exception e) { + LOGGER.error("Error when updating index for message " + message.getUid(), e); + } + }); + } + + private String indexIdFor(Mailbox<Id> mailbox, long messageId) { + return String.join(ID_SEPARATOR, mailbox.getMailboxId().serialize(), String.valueOf(messageId)); + } + +} Modified: james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml?rev=1688127&r1=1688126&r2=1688127&view=diff ============================================================================== --- james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml (original) +++ james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml Mon Jun 29 08:33:09 2015 @@ -22,14 +22,28 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> + <property name="ignoreUnresolvablePlaceholders" value="true"/> + <property name ="location" value="classpath:elasticsearch.properties"/> + </bean> + + <bean id="elasticsearch-listener" class="org.apache.james.mailbox.elasticsearch.events.ElasticSearchListeningMessageSearchIndex"> + <constructor-arg index="0" ref="elasticsearch-node"/> + <constructor-arg index="1" ref="elasticsearch-indexer"/> + <constructor-arg index="2" ref="elasticsearch-json"/> + </bean> <bean id="elasticsearch-indexer" class="org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer"> <constructor-arg index="0" ref="elasticsearch-node"/> </bean> - <bean id="elasticsearch-node" class="org.apache.james.mailbox.cassandra.elasticsearch.NodeProvider" factory-method="createNodeForClusterName"> - <constructor-arg index="0" ref="${elasticsearch.clusterName}"/> - <constructor-arg index="1" ref="${elasticsearch.masterHost}"/> + <bean id="elasticsearch-node" class="org.apache.james.mailbox.elasticsearch.NodeProvider" factory-method="createNodeForClusterName"> + <constructor-arg index="0" value="${elasticsearch.clusterName}"/> + <constructor-arg index="1" value="${elasticsearch.masterHost}"/> + </bean> + + <bean id="elasticsearch-json" class="org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson"> </bean> </beans> Added: james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java?rev=1688127&view=auto ============================================================================== --- james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java (added) +++ james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java Mon Jun 29 08:33:09 2015 @@ -0,0 +1,307 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.elasticsearch.events; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.LongStream; + +import javax.mail.Flags; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; +import org.apache.james.mailbox.elasticsearch.json.MessageToElasticSearchJson; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.store.TestId; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; +import org.apache.james.mailbox.store.mail.MessageMapperFactory; +import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.mailbox.store.mail.model.Message; +import org.easymock.IMocksControl; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.update.UpdateResponse; +import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; + +public class ElasticSearchListeningMessageSearchIndexTest { + + private IMocksControl control; + + private MessageMapperFactory<TestId> mapperFactory; + private ElasticSearchIndexer indexer; + private MessageToElasticSearchJson messageToElasticSearchJson; + + private ElasticSearchListeningMessageSearchIndex<TestId> testee; + + @Before + @SuppressWarnings("unchecked") + public void setup() throws JsonProcessingException { + control = createControl(); + + mapperFactory = control.createMock(MessageMapperFactory.class); + indexer = control.createMock(ElasticSearchIndexer.class); + messageToElasticSearchJson = control.createMock(MessageToElasticSearchJson.class); + expect(messageToElasticSearchJson.convertToJson(anyObject(Message.class))).andReturn("json content").anyTimes(); + + testee = new ElasticSearchListeningMessageSearchIndex<TestId>(mapperFactory, indexer, messageToElasticSearchJson); + } + + @Test(expected=NotImplementedException.class) + public void searchShouldThrow() throws Exception { + control.replay(); + testee.search(null, null, null); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void addShouldIndex() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + + long messageId = 1; + TestId mailboxId = TestId.of(12); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + Message<TestId> message = mockedMessage(messageId, mailboxId); + + IndexResponse expectedIndexResponse = control.createMock(IndexResponse.class); + expect(indexer.indexMessage(eq(mailboxId.serialize() + ":" + messageId), anyString())) + .andReturn(expectedIndexResponse); + + control.replay(); + testee.add(session, mailbox, message); + control.verify(); + } + + @SuppressWarnings("unchecked") + private Message<TestId> mockedMessage(long messageId, TestId mailboxId) throws IOException { + Message<TestId> message = control.createMock(Message.class); + expect(message.getUid()).andReturn(messageId).anyTimes(); + return message; + } + + @Test + @SuppressWarnings("unchecked") + public void addShouldNotPropagateExceptionWhenExceptionOccurs() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + + long messageId = 1; + TestId mailboxId = TestId.of(12); + Message<TestId> message = mockedMessage(messageId, mailboxId); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + expect(indexer.indexMessage(eq(mailboxId.serialize() + ":" + messageId), anyString())) + .andThrow(new ElasticsearchException("")); + + control.replay(); + testee.add(session, mailbox, message); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void deleteShouldWork() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + long messageId = 1; + MessageRange messageRange = MessageRange.one(messageId); + TestId mailboxId = TestId.of(12); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + DeleteResponse expectedDeleteResponse = control.createMock(DeleteResponse.class); + expect(indexer.deleteMessage(mailboxId.serialize() + ":" + messageId)) + .andReturn(expectedDeleteResponse); + + control.replay(); + testee.delete(session, mailbox, messageRange); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void deleteShouldWorkWhenMultipleMessageIds() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + long firstMessageId = 1; + long lastMessageId = 10; + MessageRange messageRange = MessageRange.range(firstMessageId, lastMessageId); + TestId mailboxId = TestId.of(12); + expect(mailbox.getMailboxId()).andReturn(mailboxId).times(10); + + LongStream.rangeClosed(firstMessageId, lastMessageId) + .forEach(messageId -> { + DeleteResponse expectedDeleteResponse = control.createMock(DeleteResponse.class); + expect(indexer.deleteMessage(mailboxId.serialize() + ":" + messageId)) + .andReturn(expectedDeleteResponse); + }); + + control.replay(); + testee.delete(session, mailbox, messageRange); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void deleteShouldNotPropagateExceptionWhenExceptionOccurs() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + long messageId = 1; + MessageRange messageRange = MessageRange.one(messageId); + TestId mailboxId = TestId.of(12); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + expect(indexer.deleteMessage(mailboxId.serialize() + ":" + messageId)) + .andThrow(new ElasticsearchException("")); + + control.replay(); + testee.delete(session, mailbox, messageRange); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void updateShouldWork() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + Flags flags = control.createMock(Flags.class); + long messageId = 1; + TestId mailboxId = TestId.of(12); + MessageRange messageRange = MessageRange.one(messageId); + Message<TestId> message = mockedMessage(messageId, mailboxId); + + MessageMapper<TestId> messageMapper = control.createMock(MessageMapper.class); + expect(mapperFactory.getMessageMapper(session)) + .andReturn(messageMapper); + expect(messageMapper.findInMailbox(mailbox, messageRange, FetchType.Full, -1)) + .andReturn(ImmutableList.of(message).iterator()); + + message.setFlags(flags); + expectLastCall(); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + UpdateResponse expectedUpdateResponse = control.createMock(UpdateResponse.class); + expect(indexer.updateMessage(eq(mailboxId.serialize() + ":" + messageId), anyString())) + .andReturn(expectedUpdateResponse); + + control.replay(); + testee.update(session, mailbox, messageRange, flags); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void updateShouldWorkWhenMultipleMessageIds() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + Flags flags = control.createMock(Flags.class); + long firstMessageId = 1; + long lastMessageId = 10; + MessageRange messageRange = MessageRange.range(firstMessageId, lastMessageId); + + MessageMapper<TestId> messageMapper = control.createMock(MessageMapper.class); + expect(mapperFactory.getMessageMapper(session)) + .andReturn(messageMapper); + + TestId mailboxId = TestId.of(12); + Message<TestId> message1 = mockedMessage(1, mailboxId); + Message<TestId> message2 = mockedMessage(2, mailboxId); + Message<TestId> message3 = mockedMessage(3, mailboxId); + Message<TestId> message4 = mockedMessage(4, mailboxId); + Message<TestId> message5 = mockedMessage(5, mailboxId); + Message<TestId> message6 = mockedMessage(6, mailboxId); + Message<TestId> message7 = mockedMessage(7, mailboxId); + Message<TestId> message8 = mockedMessage(8, mailboxId); + Message<TestId> message9 = mockedMessage(9, mailboxId); + Message<TestId> message10 = mockedMessage(10, mailboxId); + + List<Message<TestId>> messages = ImmutableList.of(message1, message2, message3, message4, message5, message6, message7, message8, message9, message10); + expect(messageMapper.findInMailbox(mailbox, messageRange, FetchType.Full, -1)) + .andReturn(messages.iterator()); + + AtomicLong messageId = new AtomicLong(0); + messages + .forEach(message -> { + try { + message.setFlags(flags); + expectLastCall(); + + long messageIdAsLong = messageId.incrementAndGet(); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + UpdateResponse expectedUpdateResponse = control.createMock(UpdateResponse.class); + expect(indexer.updateMessage(eq(mailboxId.serialize() + ":" + messageIdAsLong), anyString())) + .andReturn(expectedUpdateResponse); + } catch (Exception e) { + Throwables.propagate(e); + } finally { + } + }); + + + control.replay(); + testee.update(session, mailbox, messageRange, flags); + control.verify(); + } + + @Test + @SuppressWarnings("unchecked") + public void updateShouldNotPropagateExceptionWhenExceptionOccurs() throws Exception { + MailboxSession session = control.createMock(MailboxSession.class); + Mailbox<TestId> mailbox = control.createMock(Mailbox.class); + Flags flags = control.createMock(Flags.class); + long messageId = 1; + TestId mailboxId = TestId.of(12); + MessageRange messageRange = MessageRange.one(messageId); + Message<TestId> message = mockedMessage(messageId, mailboxId); + + MessageMapper<TestId> messageMapper = control.createMock(MessageMapper.class); + expect(mapperFactory.getMessageMapper(session)) + .andReturn(messageMapper); + expect(messageMapper.findInMailbox(mailbox, messageRange, FetchType.Full, -1)) + .andReturn(ImmutableList.of(message).iterator()); + + message.setFlags(flags); + expectLastCall(); + expect(mailbox.getMailboxId()).andReturn(mailboxId); + + expect(indexer.updateMessage(eq(mailboxId.serialize() + ":" + messageId), anyString())) + .andThrow(new ElasticsearchException("")); + + control.replay(); + testee.update(session, mailbox, messageRange, flags); + control.verify(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org