Author: btellier Date: Fri Jul 3 14:37:56 2015 New Revision: 1689022 URL: http://svn.apache.org/r1689022 Log: MAILBOX-208 Refactor flags update in Cassandra Message Mapper
Added: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java?rev=1689022&r1=1689021&r2=1689022&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java Fri Jul 3 14:37:56 2015 @@ -90,8 +90,7 @@ public class CassandraTableManager { .addColumn(CassandraMessageTable.Flag.RECENT, cboolean()) .addColumn(CassandraMessageTable.Flag.SEEN, cboolean()) .addColumn(CassandraMessageTable.Flag.USER, cboolean()) - .addUDTListColumn(CassandraMessageTable.PROPERTIES, SchemaBuilder.frozen(CassandraTypesProvider.TYPE.Property.getName())) - .addColumn(CassandraMessageTable.FLAG_VERSION, bigint())), + .addUDTListColumn(CassandraMessageTable.PROPERTIES, SchemaBuilder.frozen(CassandraTypesProvider.TYPE.Property.getName()))), Subscription(CassandraSubscriptionTable.TABLE_NAME, SchemaBuilder.createTable(CassandraSubscriptionTable.TABLE_NAME) .ifNotExists() Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1689022&r1=1689021&r2=1689022&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java Fri Jul 3 14:37:56 2015 @@ -19,7 +19,6 @@ package org.apache.james.mailbox.cassandra.mail; -import static com.datastax.driver.core.querybuilder.QueryBuilder.asc; import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.decr; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -34,7 +33,6 @@ import static org.apache.james.mailbox.c import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_OCTECTS; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FIELDS; -import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FLAG_VERSION; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FULL_CONTENT_OCTETS; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADER_CONTENT; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.IMAP_UID; @@ -60,6 +58,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -68,12 +67,15 @@ import javax.mail.Flags; import javax.mail.Flags.Flag; import javax.mail.util.SharedByteArrayInputStream; +import com.google.common.base.Throwables; import org.apache.james.mailbox.FlagsBuilder; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.CassandraConstants; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.cassandra.CassandraTypesProvider; import org.apache.james.mailbox.cassandra.CassandraTypesProvider.TYPE; +import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry; +import org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUpdate; import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable; import org.apache.james.mailbox.cassandra.table.CassandraMessageTable; import org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Properties; @@ -102,8 +104,6 @@ import com.datastax.driver.core.querybui import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select.Where; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; @@ -114,18 +114,7 @@ public class CassandraMessageMapper impl private final MailboxSession mailboxSession; private final UidProvider<CassandraId> uidProvider; private final CassandraTypesProvider typesProvider; - - private int maxRetries; - - private final static int DEFAULT_MAX_RETRIES = 10000; - - public CassandraMessageMapper(Session session, UidProvider<CassandraId> uidProvider, ModSeqProvider<CassandraId> modSeqProvider, CassandraTypesProvider typesProvider) { - this(session, uidProvider, modSeqProvider, null, DEFAULT_MAX_RETRIES, typesProvider); - } - - public CassandraMessageMapper(Session session, UidProvider<CassandraId> uidProvider, ModSeqProvider<CassandraId> modSeqProvider, MailboxSession mailboxSession, CassandraTypesProvider typesProvider) { - this(session, uidProvider, modSeqProvider, mailboxSession, DEFAULT_MAX_RETRIES, typesProvider); - } + private final int maxRetries; public CassandraMessageMapper(Session session, UidProvider<CassandraId> uidProvider, ModSeqProvider<CassandraId> modSeqProvider, MailboxSession mailboxSession, int maxRetries, CassandraTypesProvider typesProvider) { this.session = session; @@ -190,10 +179,7 @@ public class CassandraMessageMapper impl } private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) { - session.execute( - update(CassandraMailboxCountersTable.TABLE_NAME) - .with(operation) - .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); + session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); } @Override @@ -215,9 +201,7 @@ public class CassandraMessageMapper impl return Arrays.stream(CassandraMessageTable.Flag.ALL) .filter(row::getBool) .map(JAVAX_MAIL_FLAG::get) - .reduce( - new Flags(), - (flags, flag) -> { + .reduce(new Flags(), (flags, flag) -> { flags.add(flag); return flags; }, (flags1, flags2) -> { @@ -237,7 +221,7 @@ public class CassandraMessageMapper impl private Message<CassandraId> message(Row row) { SimpleMessage<CassandraId> message = - new SimpleMessage<CassandraId>( + new SimpleMessage<>( row.getDate(INTERNAL_DATE), row.getInt(FULL_CONTENT_OCTETS), row.getInt(BODY_START_OCTET), @@ -246,6 +230,7 @@ public class CassandraMessageMapper impl getPropertyBuilder(row), CassandraId.of(row.getUUID(MAILBOX_ID))); message.setUid(row.getLong(IMAP_UID)); + message.setModSeq(row.getLong(MOD_SEQ)); return message; } @@ -375,8 +360,7 @@ public class CassandraMessageMapper impl .setString(Properties.NAME, x.getLocalName()) .setString(Properties.VALUE, x.getValue())) .collect(Collectors.toList())) - .value(TEXTUAL_LINE_COUNT, message.getTextualLineCount()) - .value(FLAG_VERSION, 0); + .value(TEXTUAL_LINE_COUNT, message.getTextualLineCount()); PreparedStatement preparedStatement = session.prepare(query.toString()); @@ -388,7 +372,7 @@ public class CassandraMessageMapper impl } } - private boolean conditionalSave(Mailbox<CassandraId> mailbox, Message<CassandraId> message, long flagVersion) throws MailboxException { + private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) { ResultSet resultSet = session.execute( update(TABLE_NAME) .with(set(ANSWERED, message.isAnswered())) @@ -398,11 +382,10 @@ public class CassandraMessageMapper impl .and(set(RECENT, message.isRecent())) .and(set(SEEN, message.isSeen())) .and(set(USER, message.createFlags().contains(Flag.USER))) - .and(set(FLAG_VERSION, flagVersion + 1)) + .and(set(MOD_SEQ, message.getModSeq())) .where(eq(IMAP_UID, message.getUid())) .and(eq(MAILBOX_ID, message.getMailboxId().asUuid())) - .onlyIf(eq(FLAG_VERSION, flagVersion)) - ); + .onlyIf(eq(MOD_SEQ, oldModSeq))); return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); } @@ -412,60 +395,13 @@ public class CassandraMessageMapper impl @Override public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) throws MailboxException { - ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder(); - for (Row row : session.execute(buildQuery(mailbox, set))) { - updateMessage(mailbox, flagsUpdateCalculator, result, row); - } - return result.build().iterator(); - } - - private void updateMessage(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagsUpdateCalculator, ImmutableList.Builder<UpdatedFlags> result, Row row) throws MailboxException { - // Get the message and basic information about it - Message<CassandraId> message = message(row); - long flagVersion = row.getLong(FLAG_VERSION); - long uid = message.getUid(); - // update flags - Flags originFlags = message.createFlags(); - Flags updatedFlags = flagsUpdateCalculator.buildNewFlags(originFlags); - message.setFlags(updatedFlags); - // Update the ModSeq - long previousModSeq = message.getModSeq(); - long modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox); - message.setModSeq(modSeq); - // Try a first update - if(!conditionalSave(mailbox, message, flagVersion)) { - int tries = 0; - // It fails. Someone updated the flag before us. - do { - tries++; - // Retrieve the message from uid - Row newRow = findMessageByUid(mailbox, uid); - if(newRow == null) { - // Someone deleted this result while we were doing other stuff - // Skip it - break; - } - message = message(newRow); - flagVersion = newRow.getLong(FLAG_VERSION); - // update flags - originFlags = message.createFlags(); - updatedFlags = flagsUpdateCalculator.buildNewFlags(originFlags); - message.setFlags(updatedFlags); - // Update ModSeq - if (previousModSeq != message.getModSeq()) { - // Here someone updated the ModSeq, so we can not used the previously generated value - previousModSeq = message.getModSeq(); - modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox); - } - message.setModSeq(modSeq); - // and retry - } while (!conditionalSave(mailbox, message, flagVersion) && tries < maxRetries); - if(tries == maxRetries) { - throw new MailboxException("Max retries reached when asking an update of flags on message " + uid + " for mailbox " + mailbox.getMailboxId()); - } - } - manageUnseenMessageCounts(mailbox, originFlags, updatedFlags); - result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), originFlags, updatedFlags)); + return convertToStream(session.execute(buildQuery(mailbox, set))) + .map((row) -> updateFlagsOnMessage(mailbox, flagsUpdateCalculator, row)) + .filter(Optional::isPresent) + .map(Optional::get) + .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags())) + .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result + .iterator(); } private void manageUnseenMessageCounts(Mailbox<CassandraId> mailbox, Flags oldFlags, Flags newFlags) { @@ -477,12 +413,46 @@ public class CassandraMessageMapper impl } } - private Row findMessageByUid(Mailbox<CassandraId> mailbox, long uid) { - ResultSet resultSet = session.execute(selectMessage(mailbox, uid)); - if ( resultSet.isExhausted() ) { - return null; + private Optional<UpdatedFlags> updateFlagsOnMessage(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, Row row) { + return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message(row)) + .map(Optional::of) + .orElse(handleRetries(mailbox, flagUpdateCalculator, row.getLong(IMAP_UID))); + } + + private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox<CassandraId> mailbox, Message<CassandraId> message) { + try { + long oldModSeq = message.getModSeq(); + Flags oldFlags = message.createFlags(); + Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); + message.setFlags(newFlags); + message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox)); + if (conditionalSave(message, oldModSeq)) { + return Optional.of(new UpdatedFlags(message.getUid(), message.getModSeq(), oldFlags, newFlags)); + } else { + return Optional.empty(); + } + } catch (MailboxException e) { + throw Throwables.propagate(e); + } + } + + private Optional<UpdatedFlags> handleRetries(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, long uid) { + try { + return Optional.of( + new FunctionRunnerWithRetry(maxRetries) + .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox, uid, flagUpdateCalculator))); + } catch (MessageDeletedDuringFlagsUpdate e) { + mailboxSession.getLog().warn(e.getMessage()); + return Optional.empty(); + } catch (MailboxException e) { + throw Throwables.propagate(e); } - return resultSet.one(); + } + + private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox<CassandraId> mailbox, long uid, FlagsUpdateCalculator flagUpdateCalculator) { + return Optional.ofNullable(session.execute(selectMessage(mailbox, uid)).one()) + .map((row) -> tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message(row))) + .orElseThrow(() -> new MessageDeletedDuringFlagsUpdate(mailbox.getMailboxId(), uid)); } @Override Added: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java?rev=1689022&view=auto ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java (added) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java Fri Jul 3 14:37:56 2015 @@ -0,0 +1,29 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.utils; + +import org.apache.james.mailbox.cassandra.CassandraId; + +public class MessageDeletedDuringFlagsUpdate extends RuntimeException { + + public MessageDeletedDuringFlagsUpdate(CassandraId id, long uid) { + super("Can not perform flag update as message was deleted for " + id.serialize() + " : " + uid); + } +} Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java?rev=1689022&r1=1689021&r2=1689022&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java Fri Jul 3 14:37:56 2015 @@ -36,8 +36,7 @@ public interface CassandraMessageTable { String BODY_CONTENT = "bodyContent"; String HEADER_CONTENT = "headerContent"; String PROPERTIES = "properties"; - String FLAG_VERSION = "flagVersion"; - String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, FLAG_VERSION }; + String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES }; interface Flag { String ANSWERED = "flagAnswered"; --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org