Author: btellier
Date: Fri Jul  3 14:37:56 2015
New Revision: 1689022

URL: http://svn.apache.org/r1689022
Log:
MAILBOX-208 Refactor flags update in Cassandra Message Mapper

Added:
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java
Modified:
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
    
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java?rev=1689022&r1=1689021&r2=1689022&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraTableManager.java
 Fri Jul  3 14:37:56 2015
@@ -90,8 +90,7 @@ public class CassandraTableManager {
                 .addColumn(CassandraMessageTable.Flag.RECENT, cboolean())
                 .addColumn(CassandraMessageTable.Flag.SEEN, cboolean())
                 .addColumn(CassandraMessageTable.Flag.USER, cboolean())
-                .addUDTListColumn(CassandraMessageTable.PROPERTIES, 
SchemaBuilder.frozen(CassandraTypesProvider.TYPE.Property.getName()))
-                .addColumn(CassandraMessageTable.FLAG_VERSION, bigint())),
+                .addUDTListColumn(CassandraMessageTable.PROPERTIES, 
SchemaBuilder.frozen(CassandraTypesProvider.TYPE.Property.getName()))),
         Subscription(CassandraSubscriptionTable.TABLE_NAME,
             SchemaBuilder.createTable(CassandraSubscriptionTable.TABLE_NAME)
                 .ifNotExists()

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1689022&r1=1689021&r2=1689022&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 Fri Jul  3 14:37:56 2015
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.decr;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
@@ -34,7 +33,6 @@ import static org.apache.james.mailbox.c
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_OCTECTS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FIELDS;
-import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FLAG_VERSION;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FULL_CONTENT_OCTETS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADER_CONTENT;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.IMAP_UID;
@@ -60,6 +58,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -68,12 +67,15 @@ import javax.mail.Flags;
 import javax.mail.Flags.Flag;
 import javax.mail.util.SharedByteArrayInputStream;
 
+import com.google.common.base.Throwables;
 import org.apache.james.mailbox.FlagsBuilder;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraConstants;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.cassandra.CassandraTypesProvider;
 import org.apache.james.mailbox.cassandra.CassandraTypesProvider.TYPE;
+import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry;
+import 
org.apache.james.mailbox.cassandra.mail.utils.MessageDeletedDuringFlagsUpdate;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
 import 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Properties;
@@ -102,8 +104,6 @@ import com.datastax.driver.core.querybui
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select.Where;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteStreams;
 import com.google.common.primitives.Bytes;
 
@@ -114,18 +114,7 @@ public class CassandraMessageMapper impl
     private final MailboxSession mailboxSession;
     private final UidProvider<CassandraId> uidProvider;
     private final CassandraTypesProvider typesProvider;
-
-    private int maxRetries;
-
-    private final static int DEFAULT_MAX_RETRIES = 10000;
-
-    public CassandraMessageMapper(Session session, UidProvider<CassandraId> 
uidProvider, ModSeqProvider<CassandraId> modSeqProvider, CassandraTypesProvider 
typesProvider) {
-        this(session, uidProvider, modSeqProvider, null, DEFAULT_MAX_RETRIES, 
typesProvider);
-    }
-
-    public CassandraMessageMapper(Session session, UidProvider<CassandraId> 
uidProvider, ModSeqProvider<CassandraId> modSeqProvider, MailboxSession 
mailboxSession, CassandraTypesProvider typesProvider) {
-        this(session, uidProvider, modSeqProvider, mailboxSession, 
DEFAULT_MAX_RETRIES, typesProvider);
-    }
+    private final int maxRetries;
 
     public CassandraMessageMapper(Session session, UidProvider<CassandraId> 
uidProvider, ModSeqProvider<CassandraId> modSeqProvider, MailboxSession 
mailboxSession, int maxRetries, CassandraTypesProvider typesProvider) {
         this.session = session;
@@ -190,10 +179,7 @@ public class CassandraMessageMapper impl
     }
 
     private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment 
operation) {
-        session.execute(
-            update(CassandraMailboxCountersTable.TABLE_NAME)
-                .with(operation)
-                .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, 
mailbox.getMailboxId().asUuid())));
+        
session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID,
 mailbox.getMailboxId().asUuid())));
     }
 
     @Override
@@ -215,9 +201,7 @@ public class CassandraMessageMapper impl
         return Arrays.stream(CassandraMessageTable.Flag.ALL)
             .filter(row::getBool)
             .map(JAVAX_MAIL_FLAG::get)
-            .reduce(
-                new Flags(),
-                (flags, flag) -> {
+            .reduce(new Flags(), (flags, flag) -> {
                     flags.add(flag);
                     return flags;
                 }, (flags1, flags2) -> {
@@ -237,7 +221,7 @@ public class CassandraMessageMapper impl
 
     private Message<CassandraId> message(Row row) {
         SimpleMessage<CassandraId> message =
-            new SimpleMessage<CassandraId>(
+            new SimpleMessage<>(
                 row.getDate(INTERNAL_DATE),
                 row.getInt(FULL_CONTENT_OCTETS),
                 row.getInt(BODY_START_OCTET),
@@ -246,6 +230,7 @@ public class CassandraMessageMapper impl
                 getPropertyBuilder(row),
                 CassandraId.of(row.getUUID(MAILBOX_ID)));
         message.setUid(row.getLong(IMAP_UID));
+        message.setModSeq(row.getLong(MOD_SEQ));
         return message;
     }
 
@@ -375,8 +360,7 @@ public class CassandraMessageMapper impl
                         .setString(Properties.NAME, x.getLocalName())
                         .setString(Properties.VALUE, x.getValue()))
                     .collect(Collectors.toList()))
-                .value(TEXTUAL_LINE_COUNT, message.getTextualLineCount())
-                .value(FLAG_VERSION, 0);
+                .value(TEXTUAL_LINE_COUNT, message.getTextualLineCount());
             PreparedStatement preparedStatement = 
session.prepare(query.toString());
 
 
@@ -388,7 +372,7 @@ public class CassandraMessageMapper impl
         }
     }
 
-    private boolean conditionalSave(Mailbox<CassandraId> mailbox, 
Message<CassandraId> message, long flagVersion) throws MailboxException {
+    private boolean conditionalSave(Message<CassandraId> message, long 
oldModSeq) {
         ResultSet resultSet = session.execute(
             update(TABLE_NAME)
                 .with(set(ANSWERED, message.isAnswered()))
@@ -398,11 +382,10 @@ public class CassandraMessageMapper impl
                 .and(set(RECENT, message.isRecent()))
                 .and(set(SEEN, message.isSeen()))
                 .and(set(USER, message.createFlags().contains(Flag.USER)))
-                .and(set(FLAG_VERSION, flagVersion + 1))
+                .and(set(MOD_SEQ, message.getModSeq()))
                 .where(eq(IMAP_UID, message.getUid()))
                 .and(eq(MAILBOX_ID, message.getMailboxId().asUuid()))
-                .onlyIf(eq(FLAG_VERSION, flagVersion))
-        );
+                .onlyIf(eq(MOD_SEQ, oldModSeq)));
         return 
resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
     }
 
@@ -412,60 +395,13 @@ public class CassandraMessageMapper impl
 
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, 
FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) throws 
MailboxException {
-        ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
-        for (Row row : session.execute(buildQuery(mailbox, set))) {
-            updateMessage(mailbox, flagsUpdateCalculator, result, row);
-        }
-        return result.build().iterator();
-    }
-
-    private void updateMessage(Mailbox<CassandraId> mailbox, 
FlagsUpdateCalculator flagsUpdateCalculator, 
ImmutableList.Builder<UpdatedFlags> result, Row row) throws MailboxException {
-        // Get the message and basic information about it
-        Message<CassandraId> message = message(row);
-        long flagVersion = row.getLong(FLAG_VERSION);
-        long uid = message.getUid();
-        // update flags
-        Flags originFlags = message.createFlags();
-        Flags updatedFlags = flagsUpdateCalculator.buildNewFlags(originFlags);
-        message.setFlags(updatedFlags);
-        // Update the ModSeq
-        long previousModSeq = message.getModSeq();
-        long modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
-        message.setModSeq(modSeq);
-        // Try a first update
-        if(!conditionalSave(mailbox, message, flagVersion)) {
-            int tries = 0;
-            // It fails. Someone updated the flag before us.
-            do {
-                tries++;
-                // Retrieve the message from uid
-                Row newRow = findMessageByUid(mailbox, uid);
-                if(newRow == null) {
-                    // Someone deleted this result while we were doing other 
stuff
-                    // Skip it
-                    break;
-                }
-                message = message(newRow);
-                flagVersion = newRow.getLong(FLAG_VERSION);
-                // update flags
-                originFlags = message.createFlags();
-                updatedFlags = 
flagsUpdateCalculator.buildNewFlags(originFlags);
-                message.setFlags(updatedFlags);
-                // Update ModSeq
-                if (previousModSeq != message.getModSeq()) {
-                    // Here someone updated the ModSeq, so we can not used the 
previously generated value
-                    previousModSeq = message.getModSeq();
-                    modSeq = modSeqProvider.nextModSeq(mailboxSession, 
mailbox);
-                }
-                message.setModSeq(modSeq);
-                // and retry
-            } while (!conditionalSave(mailbox, message, flagVersion) && tries 
< maxRetries);
-            if(tries == maxRetries) {
-                throw new MailboxException("Max retries reached when asking an 
update of flags on message " + uid + " for mailbox " + mailbox.getMailboxId());
-            }
-        }
-        manageUnseenMessageCounts(mailbox, originFlags, updatedFlags);
-        result.add(new UpdatedFlags(message.getUid(), message.getModSeq(), 
originFlags, updatedFlags));
+        return convertToStream(session.execute(buildQuery(mailbox, set)))
+            .map((row) -> updateFlagsOnMessage(mailbox, flagsUpdateCalculator, 
row))
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, 
updatedFlags.getOldFlags(), updatedFlags.getNewFlags()))
+            .collect(Collectors.toList()) // This collect is here as we need 
to consume all the stream before returning result
+            .iterator();
     }
 
     private void manageUnseenMessageCounts(Mailbox<CassandraId> mailbox, Flags 
oldFlags, Flags newFlags) {
@@ -477,12 +413,46 @@ public class CassandraMessageMapper impl
         }
     }
 
-    private Row findMessageByUid(Mailbox<CassandraId> mailbox, long uid) {
-        ResultSet resultSet = session.execute(selectMessage(mailbox, uid));
-        if ( resultSet.isExhausted() ) {
-            return null;
+    private Optional<UpdatedFlags> updateFlagsOnMessage(Mailbox<CassandraId> 
mailbox, FlagsUpdateCalculator flagUpdateCalculator, Row row) {
+        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
message(row))
+            .map(Optional::of)
+            .orElse(handleRetries(mailbox, flagUpdateCalculator, 
row.getLong(IMAP_UID)));
+    }
+
+    private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator 
flagUpdateCalculator, Mailbox<CassandraId> mailbox, Message<CassandraId> 
message) {
+        try {
+            long oldModSeq = message.getModSeq();
+            Flags oldFlags = message.createFlags();
+            Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
+            message.setFlags(newFlags);
+            message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, 
mailbox));
+            if (conditionalSave(message, oldModSeq)) {
+                return Optional.of(new UpdatedFlags(message.getUid(), 
message.getModSeq(), oldFlags, newFlags));
+            } else {
+                return Optional.empty();
+            }
+        } catch (MailboxException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private Optional<UpdatedFlags> handleRetries(Mailbox<CassandraId> mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, long uid) {
+        try {
+            return Optional.of(
+                new FunctionRunnerWithRetry(maxRetries)
+                    .executeAndRetrieveObject(() -> 
retryMessageFlagsUpdate(mailbox, uid, flagUpdateCalculator)));
+        } catch (MessageDeletedDuringFlagsUpdate e) {
+            mailboxSession.getLog().warn(e.getMessage());
+            return Optional.empty();
+        } catch (MailboxException e) {
+            throw Throwables.propagate(e);
         }
-        return resultSet.one();
+    }
+
+    private Optional<UpdatedFlags> 
retryMessageFlagsUpdate(Mailbox<CassandraId> mailbox, long uid, 
FlagsUpdateCalculator flagUpdateCalculator) {
+        return Optional.ofNullable(session.execute(selectMessage(mailbox, 
uid)).one())
+            .map((row) -> tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
message(row)))
+            .orElseThrow(() -> new 
MessageDeletedDuringFlagsUpdate(mailbox.getMailboxId(), uid));
     }
 
     @Override

Added: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java?rev=1689022&view=auto
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java
 (added)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/MessageDeletedDuringFlagsUpdate.java
 Fri Jul  3 14:37:56 2015
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import org.apache.james.mailbox.cassandra.CassandraId;
+
+public class MessageDeletedDuringFlagsUpdate extends RuntimeException {
+
+    public MessageDeletedDuringFlagsUpdate(CassandraId id, long uid) {
+        super("Can not perform flag update as message was deleted for " + 
id.serialize() + " : " + uid);
+    }
+}

Modified: 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
URL: 
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java?rev=1689022&r1=1689021&r2=1689022&view=diff
==============================================================================
--- 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
 (original)
+++ 
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
 Fri Jul  3 14:37:56 2015
@@ -36,8 +36,7 @@ public interface CassandraMessageTable {
     String BODY_CONTENT = "bodyContent";
     String HEADER_CONTENT = "headerContent";
     String PROPERTIES = "properties";
-    String FLAG_VERSION = "flagVersion";
-    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, 
Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, 
BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, FLAG_VERSION };
+    String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, 
Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, 
BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES };
 
     interface Flag {
         String ANSWERED = "flagAnswered";



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to