Author: btellier Date: Fri Jul 3 14:38:22 2015 New Revision: 1689023 URL: http://svn.apache.org/r1689023 Log: MAILBOX-208 Reorder methods in Cassandra Message Mapper
Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1689023&r1=1689022&r2=1689023&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java Fri Jul 3 14:38:22 2015 @@ -162,26 +162,6 @@ public class CassandraMessageMapper impl } } - private void decrementCount(Mailbox<CassandraId> mailbox) { - updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT)); - } - - private void incrementCount(Mailbox<CassandraId> mailbox) { - updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT)); - } - - private void decrementUnseen(Mailbox<CassandraId> mailbox) { - updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN)); - } - - private void incrementUnseen(Mailbox<CassandraId> mailbox) { - updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN)); - } - - private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) { - session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); - } - @Override public Iterator<Message<CassandraId>> findInMailbox(Mailbox<CassandraId> mailbox, MessageRange set, FetchType ftype, int max) throws MailboxException { return convertToStream(session.execute(buildQuery(mailbox, set))) @@ -189,94 +169,6 @@ public class CassandraMessageMapper impl .iterator(); } - private byte[] getFullContent(Row row) { - byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()]; - byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()]; - row.getBytes(HEADER_CONTENT).get(headerContent); - row.getBytes(BODY_CONTENT).get(bodyContent); - return Bytes.concat(headerContent, bodyContent); - } - - private Flags getFlags(Row row) { - return Arrays.stream(CassandraMessageTable.Flag.ALL) - .filter(row::getBool) - .map(JAVAX_MAIL_FLAG::get) - .reduce(new Flags(), (flags, flag) -> { - flags.add(flag); - return flags; - }, (flags1, flags2) -> { - flags1.add(flags2); - return flags1; - }); - } - - private PropertyBuilder getPropertyBuilder(Row row) { - PropertyBuilder property = new PropertyBuilder( - row.getList(PROPERTIES, UDTValue.class).stream() - .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE))) - .collect(Collectors.toList())); - property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT)); - return property; - } - - private Message<CassandraId> message(Row row) { - SimpleMessage<CassandraId> message = - new SimpleMessage<>( - row.getDate(INTERNAL_DATE), - row.getInt(FULL_CONTENT_OCTETS), - row.getInt(BODY_START_OCTET), - new SharedByteArrayInputStream(getFullContent(row)), - getFlags(row), - getPropertyBuilder(row), - CassandraId.of(row.getUUID(MAILBOX_ID))); - message.setUid(row.getLong(IMAP_UID)); - message.setModSeq(row.getLong(MOD_SEQ)); - return message; - } - - private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) { - final MessageRange.Type type = set.getType(); - switch (type) { - case ALL: - return selectAll(mailbox); - case FROM: - return selectFrom(mailbox, set.getUidFrom()); - case RANGE: - return selectRange(mailbox, set.getUidFrom(), set.getUidTo()); - case ONE: - return selectMessage(mailbox, set.getUidFrom()); - } - throw new UnsupportedOperationException(); - } - - private Where selectAll(Mailbox<CassandraId> mailbox) { - return select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())); - } - - private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) { - return select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) - .and(gte(IMAP_UID, uid)); - } - - private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) { - return select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) - .and(gte(IMAP_UID, from)) - .and(lte(IMAP_UID, to)); - } - - private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) { - return select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) - .and(eq(IMAP_UID, uid)); - } - @Override public List<Long> findRecentMessageUidsInMailbox(Mailbox<CassandraId> mailbox) throws MailboxException { return convertToStream(session.execute(selectAll(mailbox).and((eq(RECENT, true))))) @@ -318,11 +210,6 @@ public class CassandraMessageMapper impl } @Override - public <T> T execute(Transaction<T> transaction) throws MailboxException { - return transaction.run(); - } - - @Override public MessageMetaData add(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException { message.setUid(uidProvider.nextUid(mailboxSession, mailbox)); message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox)); @@ -334,6 +221,107 @@ public class CassandraMessageMapper impl return messageMetaData; } + @Override + public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException { + return convertToStream(session.execute(buildQuery(mailbox, set))) + .map((row) -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, row)) + .filter(Optional::isPresent) + .map(Optional::get) + .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags())) + .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result + .iterator(); + } + + @Override + public <T> T execute(Transaction<T> transaction) throws MailboxException { + return transaction.run(); + } + + @Override + public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException { + + original.setUid(uidProvider.nextUid(mailboxSession, mailbox)); + original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox)); + incrementCount(mailbox); + if(!original.isSeen()) { + incrementUnseen(mailbox); + } + original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build()); + return save(mailbox, original); + } + + @Override + public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException { + return uidProvider.lastUid(mailboxSession, mailbox); + } + + private void decrementCount(Mailbox<CassandraId> mailbox) { + updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT)); + } + + private void incrementCount(Mailbox<CassandraId> mailbox) { + updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT)); + } + + private void decrementUnseen(Mailbox<CassandraId> mailbox) { + updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN)); + } + + private void incrementUnseen(Mailbox<CassandraId> mailbox) { + updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN)); + } + + private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) { + session.execute(update(CassandraMailboxCountersTable.TABLE_NAME) + .with(operation) + .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid()))); + } + + private Message<CassandraId> message(Row row) { + SimpleMessage<CassandraId> message = + new SimpleMessage<>( + row.getDate(INTERNAL_DATE), + row.getInt(FULL_CONTENT_OCTETS), + row.getInt(BODY_START_OCTET), + new SharedByteArrayInputStream(getFullContent(row)), + getFlags(row), + getPropertyBuilder(row), + CassandraId.of(row.getUUID(MAILBOX_ID))); + message.setUid(row.getLong(IMAP_UID)); + message.setModSeq(row.getLong(MOD_SEQ)); + return message; + } + + private byte[] getFullContent(Row row) { + byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()]; + byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()]; + row.getBytes(HEADER_CONTENT).get(headerContent); + row.getBytes(BODY_CONTENT).get(bodyContent); + return Bytes.concat(headerContent, bodyContent); + } + + private Flags getFlags(Row row) { + return Arrays.stream(CassandraMessageTable.Flag.ALL) + .filter(row::getBool) + .map(JAVAX_MAIL_FLAG::get) + .reduce(new Flags(), (flags, flag) -> { + flags.add(flag); + return flags; + }, (flags1, flags2) -> { + flags1.add(flags2); + return flags1; + }); + } + + private PropertyBuilder getPropertyBuilder(Row row) { + PropertyBuilder property = new PropertyBuilder( + row.getList(PROPERTIES, UDTValue.class).stream() + .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE))) + .collect(Collectors.toList())); + property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT)); + return property; + } + private MessageMetaData save(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException { try { Insert query = insertInto(TABLE_NAME) @@ -372,38 +360,6 @@ public class CassandraMessageMapper impl } } - private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) { - ResultSet resultSet = session.execute( - update(TABLE_NAME) - .with(set(ANSWERED, message.isAnswered())) - .and(set(DELETED, message.isDeleted())) - .and(set(DRAFT, message.isDraft())) - .and(set(FLAGGED, message.isFlagged())) - .and(set(RECENT, message.isRecent())) - .and(set(SEEN, message.isSeen())) - .and(set(USER, message.createFlags().contains(Flag.USER))) - .and(set(MOD_SEQ, message.getModSeq())) - .where(eq(IMAP_UID, message.getUid())) - .and(eq(MAILBOX_ID, message.getMailboxId().asUuid())) - .onlyIf(eq(MOD_SEQ, oldModSeq))); - return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); - } - - private ByteBuffer toByteBuffer(InputStream stream) throws IOException { - return ByteBuffer.wrap(ByteStreams.toByteArray(stream)); - } - - @Override - public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) throws MailboxException { - return convertToStream(session.execute(buildQuery(mailbox, set))) - .map((row) -> updateFlagsOnMessage(mailbox, flagsUpdateCalculator, row)) - .filter(Optional::isPresent) - .map(Optional::get) - .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags())) - .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result - .iterator(); - } - private void manageUnseenMessageCounts(Mailbox<CassandraId> mailbox, Flags oldFlags, Flags newFlags) { if (oldFlags.contains(Flag.SEEN) && !newFlags.contains(Flag.SEEN)) { incrementUnseen(mailbox); @@ -455,27 +411,72 @@ public class CassandraMessageMapper impl .orElseThrow(() -> new MessageDeletedDuringFlagsUpdate(mailbox.getMailboxId(), uid)); } - @Override - public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException { + private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) { + ResultSet resultSet = session.execute( + update(TABLE_NAME) + .with(set(ANSWERED, message.isAnswered())) + .and(set(DELETED, message.isDeleted())) + .and(set(DRAFT, message.isDraft())) + .and(set(FLAGGED, message.isFlagged())) + .and(set(RECENT, message.isRecent())) + .and(set(SEEN, message.isSeen())) + .and(set(USER, message.createFlags().contains(Flag.USER))) + .and(set(MOD_SEQ, message.getModSeq())) + .where(eq(IMAP_UID, message.getUid())) + .and(eq(MAILBOX_ID, message.getMailboxId().asUuid())) + .onlyIf(eq(MOD_SEQ, oldModSeq))); + return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); + } - original.setUid(uidProvider.nextUid(mailboxSession, mailbox)); - original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox)); - incrementCount(mailbox); - if(!original.isSeen()) { - incrementUnseen(mailbox); + private ByteBuffer toByteBuffer(InputStream stream) throws IOException { + return ByteBuffer.wrap(ByteStreams.toByteArray(stream)); + } + + private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) { + final MessageRange.Type type = set.getType(); + switch (type) { + case ALL: + return selectAll(mailbox); + case FROM: + return selectFrom(mailbox, set.getUidFrom()); + case RANGE: + return selectRange(mailbox, set.getUidFrom(), set.getUidTo()); + case ONE: + return selectMessage(mailbox, set.getUidFrom()); } - original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build()); - return save(mailbox, original); + throw new UnsupportedOperationException(); } - @Override - public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException { - return uidProvider.lastUid(mailboxSession, mailbox); + private Where selectAll(Mailbox<CassandraId> mailbox) { + return select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())); + } + + private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) { + return select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) + .and(gte(IMAP_UID, uid)); + } + + private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) { + return select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) + .and(gte(IMAP_UID, from)) + .and(lte(IMAP_UID, to)); + } + + private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) { + return select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) + .and(eq(IMAP_UID, uid)); } private Stream<Row> convertToStream(ResultSet resultSet) { return StreamSupport.stream(resultSet.spliterator(), true); } - } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org