Author: eric
Date: Sun Mar 8 12:09:42 2015
New Revision: 1664983
URL: http://svn.apache.org/r1664983
Log:
Data races are possible while updating flags in Cassandra, patch provided by
Benoit Tellier (MAILBOX-206)
Modified:
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
Modified:
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
URL:
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
---
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
(original)
+++
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
Sun Mar 8 12:09:42 2015
@@ -57,7 +57,7 @@ public class CassandraSession implements
session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +
".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +
".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," +
"unseen counter," + "nextModSeq counter" + ");");
session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message
(" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," +
"bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text,"
+ "subType text," + "fullContentOctets int," + "bodyOctets int,"
- + "textualLineCount bigint," + "bodyContent blob," +
"headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," +
"flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," +
"flagFlagged boolean," + "flagUser boolean," + "PRIMARY KEY (mailboxId, uid)" +
");");
+ + "textualLineCount bigint," + "bodyContent blob," +
"headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," +
"flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," +
"flagFlagged boolean," + "flagUser boolean," + "flagVersion bigint," + "PRIMARY
KEY (mailboxId, uid)" + ");");
session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +
".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox,
user)" + ");");
session.close();
}
Modified:
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL:
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
---
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
(original)
+++
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
Sun Mar 8 12:09:42 2015
@@ -29,6 +29,7 @@ import static com.datastax.driver.core.q
import static com.datastax.driver.core.querybuilder.QueryBuilder.lt;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_CONTENT;
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_OCTECTS;
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET;
@@ -51,6 +52,7 @@ import static org.apache.james.mailbox.c
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.RECENT;
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.SEEN;
import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.Flag.USER;
+import static
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FLAG_VERSION;
import java.io.IOException;
import java.io.InputStream;
@@ -64,6 +66,7 @@ import javax.mail.Flags;
import javax.mail.Flags.Flag;
import javax.mail.util.SharedByteArrayInputStream;
+import com.datastax.driver.core.querybuilder.Update;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable;
import org.apache.james.mailbox.cassandra.table.CassandraMessageTable;
@@ -106,15 +109,25 @@ public class CassandraMessageMapper impl
private MailboxSession mailboxSession;
private UidProvider<UUID> uidProvider;
+ private final int applied = 0;
+ private int maxRetries;
+
+ private final static int DEFAULT_MAX_RETRIES = 10000;
+
public CassandraMessageMapper(Session session, CassandraUidProvider
uidProvider, ModSeqProvider<UUID> modSeqProvider) {
+ this(session, uidProvider, modSeqProvider, null, DEFAULT_MAX_RETRIES);
+ }
+
+ public CassandraMessageMapper(Session session, CassandraUidProvider
uidProvider, ModSeqProvider<UUID> modSeqProvider, MailboxSession
mailboxSession) {
+ this(session, uidProvider, modSeqProvider, mailboxSession,
DEFAULT_MAX_RETRIES);
+ }
+
+ public CassandraMessageMapper(Session session, CassandraUidProvider
uidProvider, ModSeqProvider<UUID> modSeqProvider, MailboxSession
mailboxSession, int maxRetries) {
this.session = session;
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
- }
-
- public CassandraMessageMapper(Session session, CassandraUidProvider
uidProvider, CassandraModSeqProvider modSeqProvider, MailboxSession
mailboxSession) {
- this(session, uidProvider, modSeqProvider);
this.mailboxSession = mailboxSession;
+ this.maxRetries = maxRetries;
}
@Override
@@ -311,7 +324,7 @@ public class CassandraMessageMapper impl
Insert query = insertInto(TABLE_NAME).value(MAILBOX_ID,
mailbox.getMailboxId()).value(IMAP_UID, message.getUid()).value(MOD_SEQ,
message.getModSeq()).value(INTERNAL_DATE,
message.getInternalDate()).value(MEDIA_TYPE, message.getMediaType())
.value(BODY_START_OCTET, message.getFullContentOctets() -
message.getBodyOctets()).value(SUB_TYPE,
message.getSubType()).value(FULL_CONTENT_OCTETS,
message.getFullContentOctets()).value(BODY_OCTECTS,
message.getBodyOctets()).value(ANSWERED, message.isAnswered())
.value(DELETED, message.isDeleted()).value(DRAFT,
message.isDraft()).value(FLAGGED, message.isFlagged()).value(RECENT,
message.isRecent()).value(SEEN, message.isSeen()).value(USER,
message.createFlags().contains(Flag.USER)).value(BODY_CONTENT, bindMarker())
- .value(HEADER_CONTENT,
bindMarker()).value(TEXTUAL_LINE_COUNT, message.getTextualLineCount());
+ .value(HEADER_CONTENT,
bindMarker()).value(TEXTUAL_LINE_COUNT,
message.getTextualLineCount()).value(FLAG_VERSION, 0);
PreparedStatement preparedStatement =
session.prepare(query.toString());
BoundStatement boundStatement =
preparedStatement.bind(toByteBuffer(message.getBodyContent()),
toByteBuffer(message.getHeaderContent()));
session.execute(boundStatement);
@@ -321,28 +334,106 @@ public class CassandraMessageMapper impl
}
}
+ private boolean conditionalSave(Mailbox<UUID> mailbox, Message<UUID>
message, long flagVersion) throws MailboxException {
+ ResultSet resultSet = session.execute(
+ update(TABLE_NAME)
+ .with(set(ANSWERED, message.isAnswered()))
+ .and(set(DELETED, message.isDeleted()))
+ .and(set(DRAFT, message.isDraft()))
+ .and(set(FLAGGED, message.isFlagged()))
+ .and(set(RECENT, message.isRecent()))
+ .and(set(SEEN, message.isSeen()))
+ .and(set(USER,
message.createFlags().contains(Flag.USER)))
+ .and(set(FLAG_VERSION, flagVersion + 1))
+ .where(eq(IMAP_UID, message.getUid()))
+ .and(eq(MAILBOX_ID, message.getMailboxId()))
+ .onlyIf(eq(FLAG_VERSION, flagVersion))
+ );
+ return resultSet.one().getBool(applied);
+ }
+
private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
}
+ /**
+ *
+ * @param mailbox Mailbox were messages are located
+ * @param flags Flags used for update
+ * @param value True if you want to set provided flags to true, false to
set these flag to false
+ * @param replace true if we want to replace current flags by those
provided
+ * @param set Range of messages to update
+ * @return Updated flag information
+ * @throws MailboxException
+ */
@Override
public Iterator<UpdatedFlags> updateFlags(Mailbox<UUID> mailbox, Flags
flags, boolean value, boolean replace, MessageRange set) throws
MailboxException {
ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
for (Row row : session.execute(buildQuery(mailbox, set))) {
- Message<UUID> message = message(row);
- Flags originFlags = message.createFlags();
- Flags updatedFlags = buildFlags(message, flags, value, replace);
- message.setFlags(updatedFlags);
- message.setModSeq(modSeqProvider.nextModSeq(mailboxSession,
mailbox));
- save(mailbox, message);
- result.add(new UpdatedFlags(message.getUid(), message.getModSeq(),
originFlags, updatedFlags));
+ updateMessage(mailbox, flags, value, replace, result, row);
}
return result.build().iterator();
}
+ private void updateMessage(Mailbox<UUID> mailbox, Flags flags, boolean
value, boolean replace, ImmutableList.Builder<UpdatedFlags> result, Row row)
throws MailboxException {
+ // Get the message and basic information about it
+ Message<UUID> message = message(row);
+ long flagVersion = row.getLong(FLAG_VERSION);
+ long uid = message.getUid();
+ // update flags
+ Flags originFlags = message.createFlags();
+ Flags updatedFlags = buildFlags(message, flags, value, replace);
+ message.setFlags(updatedFlags);
+ // Update the ModSeq
+ long previousModSeq = message.getModSeq();
+ long modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+ message.setModSeq(modSeq);
+ // Try a first update
+ if(!conditionalSave(mailbox, message, flagVersion)) {
+ int tries = 0;
+ // It fails. Someone updated the flag before us.
+ do {
+ tries++;
+ // Retrieve the message from uid
+ Row newRow = findMessageByUid(mailbox, uid);
+ if(newRow == null) {
+ // Someone deleted this result while we were doing other
stuff
+ // Skip it
+ break;
+ }
+ message = message(newRow);
+ flagVersion = newRow.getLong(FLAG_VERSION);
+ // update flags
+ originFlags = message.createFlags();
+ updatedFlags = buildFlags(message, flags, value, replace);
+ message.setFlags(updatedFlags);
+ // Update ModSeq
+ if (previousModSeq != message.getModSeq()) {
+ // Here someone updated the ModSeq, so we can not used the
previously generated value
+ previousModSeq = message.getModSeq();
+ modSeq = modSeqProvider.nextModSeq(mailboxSession,
mailbox);
+ }
+ message.setModSeq(modSeq);
+ // and retry
+ } while (!conditionalSave(mailbox, message, flagVersion) && tries
< maxRetries);
+ if(tries == maxRetries) {
+ throw new MailboxException("Max retries reached when asking an
update of flags on message " + uid + " for mailbox " + mailbox.getMailboxId());
+ }
+ }
+ result.add(new UpdatedFlags(message.getUid(), message.getModSeq(),
originFlags, updatedFlags));
+ }
+
+ private Row findMessageByUid(Mailbox<UUID> mailbox, long uid) {
+ ResultSet resultSet = session.execute(selectMessage(mailbox, uid));
+ if ( resultSet.isExhausted() ) {
+ return null;
+ }
+ return resultSet.one();
+ }
+
private Flags buildFlags(Message<UUID> message, Flags flags, boolean
value, boolean replace) {
if (replace) {
- return message.createFlags();
+ return flags;
} else {
Flags updatedFlags = message.createFlags();
if (value) {
Modified:
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
URL:
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
---
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
(original)
+++
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
Sun Mar 8 12:09:42 2015
@@ -38,7 +38,8 @@ public interface CassandraMessageTable {
String TEXTUAL_LINE_COUNT = "textualLineCount";
String BODY_CONTENT = "bodyContent";
String HEADER_CONTENT = "headerContent";
- String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ,
BODY_START_OCTET, MEDIA_TYPE, SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS,
Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN,
Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT };
+ String FLAG_VERSION = "flagVersion";
+ String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ,
BODY_START_OCTET, MEDIA_TYPE, SUB_TYPE, FULL_CONTENT_OCTETS, BODY_OCTECTS,
Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN,
Flag.USER, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, FLAG_VERSION };
interface Flag {
String ANSWERED = "flagAnswered";
Modified:
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
URL:
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
---
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
(original)
+++
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
Sun Mar 8 12:09:42 2015
@@ -91,7 +91,7 @@ public final class CassandraClusterSingl
} else if (tableName.equals("message")) {
session.execute("CREATE TABLE IF NOT EXISTS " +
session.getLoggedKeyspace() + ".message (" + "mailboxId UUID," + "uid bigint,"
+ "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq
bigint," + "mediaType text," + "subType text," + "fullContentOctets int,"
+ "bodyOctets int," + "textualLineCount bigint," +
"bodyContent blob," + "headerContent blob," + "flagAnswered boolean," +
"flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," +
"flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean,"
- + "PRIMARY KEY (mailboxId, uid)" + ");");
+ + "flagVersion bigint,"+ "PRIMARY KEY (mailboxId, uid)" +
");");
} else if (tableName.equals("subscription")) {
session.execute("CREATE TABLE IF NOT EXISTS " +
session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox
text," + "PRIMARY KEY (mailbox, user)" + ");");
} else {
Modified:
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
URL:
http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java?rev=1664983&r1=1664982&r2=1664983&view=diff
==============================================================================
---
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
(original)
+++
james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
Sun Mar 8 12:09:42 2015
@@ -19,9 +19,12 @@
package org.apache.james.mailbox.cassandra.mail;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -32,8 +35,11 @@ import javax.mail.util.SharedByteArrayIn
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.CassandraClusterSingleton;
+import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.mock.MockMailboxSession;
import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.Message;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
@@ -136,6 +142,69 @@ public class CassandraMessageMapperTest
testAdd();
testGetLastUid();
testGetHighestModSeq();
+ testMessageUpdateReplace();
+ testMessageUpdateAddition();
+ }
+
+ /**
+ * Test message flag replacement
+ */
+ private void testMessageUpdateReplace() throws MailboxException {
+ LOG.info("message update : replace flags");
+ Flags flags = new Flags();
+ flags.add(Flags.Flag.ANSWERED);
+ flags.add(Flags.Flag.DRAFT);
+ messageMapper.updateFlags(MBOXES.get(1), flags, true, true,
MessageRange.all());
+ Iterator<Message<UUID>> messageIterator =
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(),
MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertTrue(message.isAnswered());
+ assertTrue(message.isDraft());
+ assertFalse(message.isDeleted());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertFalse(message.isFlagged());
+ }
+ }
+
+ /**
+ * Test message flag set to true
+ */
+ private void testMessageUpdateAddition() throws MailboxException {
+ LOG.info("message update : flag addition");
+ Flags flags = new Flags();
+ flags.add(Flags.Flag.FLAGGED);
+ messageMapper.updateFlags(MBOXES.get(1), flags, true, false,
MessageRange.all());
+ Iterator<Message<UUID>> messageIterator =
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(),
MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertTrue(message.isAnswered());
+ assertTrue(message.isDraft());
+ assertFalse(message.isDeleted());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertTrue(message.isFlagged());
+ }
+ }
+
+ /**
+ * Test message flag removal
+ */
+ private void testMessageUpdateRemove() throws MailboxException {
+ LOG.info("message update : flag removal");
+ Flags flags = new Flags();
+ flags.add(Flags.Flag.ANSWERED);
+ messageMapper.updateFlags(MBOXES.get(1), flags, false, false,
MessageRange.all());
+ Iterator<Message<UUID>> messageIterator =
messageMapper.findInMailbox(MBOXES.get(1), MessageRange.all(),
MessageMapper.FetchType.Full, 100);
+ while(messageIterator.hasNext()) {
+ Message<UUID> message = messageIterator.next();
+ assertFalse(message.isAnswered());
+ assertTrue(message.isDraft());
+ assertFalse(message.isDeleted());
+ assertFalse(message.isRecent());
+ assertFalse(message.isSeen());
+ assertTrue(message.isFlagged());
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]