Hi, Here comes a first patch, implementing quota management over cassandra. It is like the quota implemented in the store, but stored in Cassandra.
Regards, Benoit On 11/23/14 08:52, Josip Almasi wrote: > Hi, > > can you do that and make a patch? > > Regards, > Josip > > On 11/18/2014 09:26 AM, Benoit Tellier wrote: >> Hi , >> >> I am an intern at Linagora, and I was asked to continue Phillipe Benoit >> works on James. He did a contribution for a James mailbox using >> Cassandra as a backend. I already implemented quotas and a group >> membership resolver using Cassandra. >> >> I also need to implement ACLs. I noticed the use of locks when re - >> calculating acls for a given mailbox ( in the message manager ). But, as >> the datastax driver we are using does not support path locking and ACID >> transactions, I had to find an other way to do it. >> >> I finally heard about lightweight transaction. What I decided to do was >> to add to fields to the mailbox table : one field of binary data ( our >> serialized ACLs for this mailbox ) and a version number. So when we call >> setRights ( message manager ) what it should do is to retrieve the ACLs >> and the associated version number. It then re calculates the ACLs, >> serialize it, and update it with an increment of the version number only >> if the version number wasn't changed. >> >> As I really don't like copy and paste, I want to re use as much code as >> I can : >> >> - It would be nice to reuse the StoreMessageManager, with just >> overloading the setRights method to retry a lightweight transaction if >> it fails (eg : someone changed it ) . But to implement the setRights >> method I need an access to the mailbox. I need the mailbox parameter of >> the StoreMessageManager to have protected access rights. >> >> - reusing the SimpleMailboxACL seems to be a good idea, but I need to >> serialize it. I just need that it ( and the classes it contains ) >> implements the Serializable interface. >> >> That's some really simple changes that will prevent me from copying and >> pasting thousand of lines of code. >> >> Thank you very much, >> >> Benoit Tellier >> > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] >
From 5b911c0fc18cd70dca2618da311e3c1345a8df23 Mon Sep 17 00:00:00 2001 From: benwa <[email protected]> Date: Fri, 14 Nov 2014 22:03:37 +0100 Subject: [PATCH 1/1] Implementing Quota stored in Cassandra ( https://ci.open-paas.org/jira/browse/JWC-20 ) Increasing sleep time ( because the server does not always have the time to be instanciated ) --- .../james/mailbox/cassandra/CassandraSession.java | 3 + .../quota/CassandraFixedQuotaManager.java | 96 ++++++++ .../quota/CassandraListeningQuotaManager.java | 232 ++++++++++++++++++ .../quota/CassandraPerUserQuotaManager.java | 95 ++++++++ .../cassandra/quota/CassandraQuotaStorage.java | 264 +++++++++++++++++++++ .../table/CassandraDefaultMaxQuotaTable.java | 26 ++ .../cassandra/table/CassandraMaxQuotaTable.java | 26 ++ .../cassandra/table/CassandraQuotaTable.java | 26 ++ .../cassandra/CassandraClusterSingleton.java | 27 ++- .../cassandra/quota/CassandraQuotaStorageTest.java | 163 +++++++++++++ 10 files changed, 957 insertions(+), 1 deletion(-) create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraFixedQuotaManager.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraListeningQuotaManager.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserQuotaManager.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorage.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraDefaultMaxQuotaTable.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMaxQuotaTable.java create mode 100644 cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraQuotaTable.java create mode 100644 cassandra/src/test/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorageTest.java diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java index b1426cb..f2c3003 100644 --- a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java @@ -59,6 +59,9 @@ public class CassandraSession implements Session { session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int," + "bodyOctets int," + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," + "PRIMARY KEY (mailboxId, uid)" + ");"); session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".quota (" + "user text PRIMARY KEY," + "size_quota counter," + "count_quota counter" + ");"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".quota_max (" + "user text PRIMARY KEY," + "size_quota_max bigint," + "count_quota_max bigint" + ");"); + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".default_max_quota_table (" + "id int PRIMARY KEY," + "default_max_size_quota bigint," + "default_max_count_quota bigint" + ");"); session.close(); } diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraFixedQuotaManager.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraFixedQuotaManager.java new file mode 100644 index 0000000..d4018b9 --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraFixedQuotaManager.java @@ -0,0 +1,96 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.quota; + +import com.datastax.driver.core.Session; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.store.StoreMailboxManager; + +/** + * Allow you to create a FixedQuotaManager where all users have the same limit. + */ +public class CassandraFixedQuotaManager extends CassandraListeningQuotaManager { + + + public CassandraFixedQuotaManager(StoreMailboxManager<?> manager, Session session) throws MailboxException { + super(manager, session); + } + + /** + * Return the maximum storage which is allowed for the given {@link org.apache.james.mailbox.MailboxSession} (in fact the user which the session is bound to) + * + * The returned valued must be in <strong>bytes</strong> + * + * @param session + * @return maxBytes + * @throws MailboxException + */ + protected long getMaxStorage(MailboxSession session) throws MailboxException { + return quotaStorage.getDefaultMaxStorage(); + } + + + /** + * Return the maximum message count which is allowed for the given {@link org.apache.james.mailbox.MailboxSession} (in fact the user which the session is bound to) + * + * @param session + * @return maximum of allowed message count + * @throws MailboxException + */ + protected long getMaxMessage(MailboxSession session) throws MailboxException { + return quotaStorage.getDefaultMaxMessageCount(); + } + + /** + * Proxy method allowing you to set the maximum storage quota for a given user + * + * @param user This user + * @param maxStorageQuota The new storage quota ( in bytes ) for this user + */ + public void setUserMaxStorage(String user, long maxStorageQuota) { + + } + + /** + * Proxy method allowing you to set the maximum message count allowed for this user + * @param user This user + * @param maxMessageCount The new message count allowed for this user. + */ + public void setUserMaxMessage(String user, long maxMessageCount) { + + } + + /** + * Proxy method allowing you to set the default maximum storage in bytes. + * @param defaultMaxStorage new default maximum storage + */ + public void setDefaultMaxStorage(long defaultMaxStorage) { + quotaStorage.setDefaultMaxStorage(defaultMaxStorage); + } + + /** + * Proxy method allowing you to set the default maximum message count allowed + * @param defaultMaxMessageCount new default message count + */ + public void setDefaultMaxMessage(long defaultMaxMessageCount) { + quotaStorage.setDefaultMaxMessageCount(defaultMaxMessageCount); + } + +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraListeningQuotaManager.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraListeningQuotaManager.java new file mode 100644 index 0000000..8982015 --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraListeningQuotaManager.java @@ -0,0 +1,232 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.quota; + +import com.datastax.driver.core.Session; +import org.apache.james.mailbox.MailboxListener; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.QuotaManager; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.Quota; +import org.apache.james.mailbox.store.MailboxSessionMapperFactory; +import org.apache.james.mailbox.store.StoreMailboxManager; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.mailbox.store.mail.model.Message; +import org.apache.james.mailbox.store.quota.QuotaImpl; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link QuotaManager} which will keep track of quota by listing for {@link org.apache.james.mailbox.MailboxListener.Event}'s. + * + * The whole quota is kept in Cassandra after it was lazy-fetched on the first access + * * + */ +public abstract class CassandraListeningQuotaManager implements QuotaManager, MailboxListener { + private MailboxSessionMapperFactory factory; + private boolean calculateWhenUnlimited = false; + protected CassandraQuotaStorage quotaStorage; + + public CassandraListeningQuotaManager(StoreMailboxManager<?> manager, Session session) throws MailboxException { + this.factory = manager.getMapperFactory(); + manager.addGlobalListener(this, null); + this.quotaStorage = new CassandraQuotaStorage(session); + } + + protected MailboxSessionMapperFactory<?> getFactory() { + return factory; + } + + /** + * Allow you to tune default behaviour when facing unlimited quota + * + * @param calculateWhenUnlimited If true, quota are re - calculated when unlimited + */ + public void setCalculateUsedWhenUnlimited(boolean calculateWhenUnlimited) { + this.calculateWhenUnlimited = calculateWhenUnlimited; + } + + /** + * Calculate the current message count for the user owning this MailboxSession + * + * This method is slow ( linear in the number of mailboxes ). It is only called once for each user maximum. + * + * @param session MailboxSession of the user + * @return The number of messages of all mailboxes belonging to this user. + * @throws MailboxException + */ + private long constructMessageCount(MailboxSession session) throws MailboxException { + long mc = 0; + List<Mailbox> mailboxes = factory.getMailboxMapper(session).findMailboxWithPathLike(new MailboxPath(session.getPersonalSpace(), session.getUser().getUserName(), "%")); + for (int i = 0; i < mailboxes.size(); i++) { + mc += factory.getMessageMapper(session).countMessagesInMailbox(mailboxes.get(i)); + } + return mc; + } + + /** + * Calculate the current size of all the mailboxes of the user ( in bytes ). + * + * This method is slow ( linear in the number of mailboxes and in messages contained in these mailboxes). It is only called once for each user maximum. + * + * + * @param session + * @return + * @throws MailboxException + */ + private long calculateMailboxesSize(MailboxSession session) throws MailboxException { + MessageMapper mapper = factory.getMessageMapper(session); + long mSizes = 0; + List<Mailbox> mailboxes = factory.getMailboxMapper(session).findMailboxWithPathLike(new MailboxPath(session.getPersonalSpace(), session.getUser().getUserName(), "%")); + for (int i = 0; i < mailboxes.size(); i++) { + Iterator<Message> messages = mapper.findInMailbox(mailboxes.get(i), MessageRange.all(), MessageMapper.FetchType.Metadata, -1); + + while(messages.hasNext()) { + mSizes += messages.next().getFullContentOctets(); + } + } + return mSizes; + } + + /** + * Return the message quota for the user owning this MailboxSession + * + * @param session MailboxSession of this user + * @return Quota for message count for this user + * @throws MailboxException + */ + @Override + public Quota getMessageQuota(MailboxSession session) throws MailboxException { + long max = getMaxMessage(session); + if (max != Quota.UNLIMITED || calculateWhenUnlimited) { + + String id = session.getUser().getUserName(); + AtomicLong count = quotaStorage.getCount(id); + if (count == null) { + long messageCount = constructMessageCount(session); + count = new AtomicLong(messageCount); + long mailboxesSize = calculateMailboxesSize(session); + quotaStorage.setUserQuotas(session.getUser().getUserName(), messageCount, mailboxesSize); + } + return QuotaImpl.quota(max, count.get()); + } else { + return QuotaImpl.unlimited(); + } + } + + /** + * Return the storage quota for the user owning this MailboxSession + * + * @param session This mailbox session + * @return The storage quota for the user user owning this mailboxSession + * @throws MailboxException + */ + @Override + public Quota getStorageQuota(MailboxSession session) throws MailboxException { + long max = getMaxStorage(session); + if (max != Quota.UNLIMITED || calculateWhenUnlimited) { + MessageMapper mapper = factory.getMessageMapper(session); + String id = session.getUser().getUserName(); + AtomicLong size = quotaStorage.getSize(id); + + if (size == null) { + long messageCount = constructMessageCount(session); + long mailboxesSize = calculateMailboxesSize(session); + size = new AtomicLong(mailboxesSize); + quotaStorage.setUserQuotas(session.getUser().getUserName(), messageCount, mailboxesSize); + } + return QuotaImpl.quota(max, size.get()); + } else { + return QuotaImpl.unlimited(); + } + } + + /** + * Return the maximum storage which is allowed for the given {@link MailboxSession} (in fact the user which the session is bound to) + * + * The returned valued must be in <strong>bytes</strong> + * + * @param session + * @return maxBytes + * @throws MailboxException + */ + protected abstract long getMaxStorage(MailboxSession session) throws MailboxException; + + + /** + * Return the maximum message count which is allowed for the given {@link MailboxSession} (in fact the user which the session is bound to) + * + * @param session + * @return maximum of allowed message count + * @throws MailboxException + */ + protected abstract long getMaxMessage(MailboxSession session) throws MailboxException; + + + @Override + public void event(MailboxListener.Event event) { + String id = event.getSession().getUser().getUserName(); + if (event instanceof MailboxListener.Added) { + MailboxListener.Added added = (MailboxListener.Added) event; + + long s = 0; + long c = 0; + Iterator<Long> uids = added.getUids().iterator();; + while(uids.hasNext()) { + long uid = uids.next(); + s += added.getMetaData(uid).getSize(); + c++; + } + + quotaStorage.addCount(id, c); + quotaStorage.addCount(id, s); + } else if (event instanceof MailboxListener.Expunged) { + MailboxListener.Expunged expunged = (MailboxListener.Expunged) event; + long s = 0; + long c = 0; + Iterator<Long> uids = expunged.getUids().iterator(); + while(uids.hasNext()) { + long uid = uids.next(); + s += expunged.getMetaData(uid).getSize(); + c++; + } + + quotaStorage.addCount(id, -c); + quotaStorage.addCount(id, -s); + } else if (event instanceof MailboxAdded) { + // Trick : creates the row initialized with zero values + quotaStorage.addCount(id, 0); + } + } + + /** + * Get never closed + * + * @return false + */ + public boolean isClosed() { + return false; + } + +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserQuotaManager.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserQuotaManager.java new file mode 100644 index 0000000..d26ea17 --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserQuotaManager.java @@ -0,0 +1,95 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.quota; + +import com.datastax.driver.core.Session; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.store.StoreMailboxManager; + +/** + * Allow you to set a per user quota stored in Cassandra + */ +public class CassandraPerUserQuotaManager extends CassandraListeningQuotaManager { + + public CassandraPerUserQuotaManager(StoreMailboxManager<?> manager, Session session) throws MailboxException { + super(manager, session); + } + + /** + * Return the maximum storage which is allowed for the given {@link MailboxSession} (in fact the user which the session is bound to) + * + * The returned valued must be in <strong>bytes</strong> + * + * @param session + * @return maxBytes + * @throws MailboxException + */ + protected long getMaxStorage(MailboxSession session) throws MailboxException { + return quotaStorage.getMaxMailboxSize(session.getUser().getUserName()); + } + + + /** + * Return the maximum message count which is allowed for the given {@link MailboxSession} (in fact the user which the session is bound to) + * + * @param session + * @return maximum of allowed message count + * @throws MailboxException + */ + protected long getMaxMessage(MailboxSession session) throws MailboxException { + return quotaStorage.getMaxMessageCont(session.getUser().getUserName()); + } + + /** + * Proxy method allowing you to set the maximum storage quota for a given user + * + * @param user This user + * @param maxStorageQuota The new storage quota ( in bytes ) for this user + */ + public void setUserMaxStorage(String user, long maxStorageQuota) { + quotaStorage.setMaxMailboxSize(user, maxStorageQuota); + } + + /** + * Proxy method allowing you to set the maximum message count allowed for this user + * @param user This user + * @param maxMessageCount The new message count allowed for this user. + */ + public void setUserMaxMessage(String user, long maxMessageCount) { + quotaStorage.setMaxMessageCount(user, maxMessageCount); + } + + /** + * Proxy method allowing you to set the default maximum storage in bytes. + * @param defaultMaxStorage new default maximum storage + */ + public void setDefaultMaxStorage(long defaultMaxStorage) { + quotaStorage.setDefaultMaxStorage(defaultMaxStorage); + } + + /** + * Proxy method allowing you to set the default maximum message count allowed + * @param defaultMaxMessageCount new default message count + */ + public void setDefaultMaxMessage(long defaultMaxMessageCount) { + quotaStorage.setDefaultMaxMessageCount(defaultMaxMessageCount); + } + +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorage.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorage.java new file mode 100644 index 0000000..23e1e9b --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorage.java @@ -0,0 +1,264 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.quota; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import org.apache.james.mailbox.cassandra.table.CassandraDefaultMaxQuotaTable; +import org.apache.james.mailbox.cassandra.table.CassandraMaxQuotaTable; +import org.apache.james.mailbox.model.Quota; + +import java.util.concurrent.atomic.AtomicLong; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.*; +import static org.apache.james.mailbox.cassandra.table.CassandraQuotaTable.COUNT_QUOTA; +import static org.apache.james.mailbox.cassandra.table.CassandraQuotaTable.SIZE_QUOTA; +import static org.apache.james.mailbox.cassandra.table.CassandraQuotaTable.TABLE_NAME; +import static org.apache.james.mailbox.cassandra.table.CassandraQuotaTable.USER; + +/** + * This class is responsible for managing quota storage into Cassandra + */ +public class CassandraQuotaStorage { + + private Session session; + + CassandraQuotaStorage(Session session) { + this.session = session; + } + + AtomicLong getCount(String user) { + ResultSet resultSet = session.execute( + select(COUNT_QUOTA) + .from(TABLE_NAME) + .where( + eq(USER, user) + ) + ); + if(resultSet.isExhausted() ) { + return null; + } + return new AtomicLong( resultSet.one().getLong(COUNT_QUOTA) ); + } + + AtomicLong getSize(String user) { + ResultSet resultSet = session.execute( + select(SIZE_QUOTA) + .from(TABLE_NAME) + .where( + eq(USER, user) + ) + ); + if(resultSet.isExhausted() ) { + return null; + } + return new AtomicLong( resultSet.one().getLong(SIZE_QUOTA) ); + } + + AtomicLong setUserQuotas(String user, long messageCount, long mailboxesSize) { + String query = "UPDATE "+TABLE_NAME + +" SET "+COUNT_QUOTA+"="+COUNT_QUOTA+"+"+messageCount+"," + +SIZE_QUOTA+"="+SIZE_QUOTA+"+"+mailboxesSize + +" WHERE "+USER+"='"+user+"';"; + session.execute(query); + return new AtomicLong( messageCount ); + } + + void addCount(String user, long addedCount) { + String query = "UPDATE " + TABLE_NAME + + " SET "+COUNT_QUOTA + "="+ COUNT_QUOTA +"+"+addedCount + + " WHERE " + USER +"='"+user+"'"; + session.execute(query); + } + + void addSize(String user, long addedSize) { + String query = "UPDATE " + TABLE_NAME + + " SET "+SIZE_QUOTA +"="+SIZE_QUOTA+"+"+addedSize + + " WHERE " + USER +"='"+user+"'"; + session.execute(query); + } + + long getMaxMailboxSize(String user) { + ResultSet resultSet = session.execute( + select(CassandraMaxQuotaTable.SIZE_QUOTA_MAX) + .from(CassandraMaxQuotaTable.TABLE_NAME) + .where(eq(CassandraMaxQuotaTable.USER,user)) + ); + if(resultSet.isExhausted()) { + // No specified max value for this user. Return max value instead. + return getDefaultMaxStorage(); + } + long quota_value = resultSet.one().getLong(CassandraMaxQuotaTable.SIZE_QUOTA_MAX); + if( quota_value == Quota.UNKNOWN) { + return getDefaultMaxStorage(); + } + return quota_value; + } + + long getMaxMessageCont(String user) { + ResultSet resultSet = session.execute( + select(CassandraMaxQuotaTable.COUNT_QUOTA_MAX) + .from(CassandraMaxQuotaTable.TABLE_NAME) + .where(eq(CassandraMaxQuotaTable.USER,user)) + ); + if(resultSet.isExhausted()) { + // No specified max value for this user. Return max value instead. + return getDefaultMaxMessageCount(); + } + long quota_value = resultSet.one().getLong(CassandraMaxQuotaTable.COUNT_QUOTA_MAX); + if( quota_value == Quota.UNKNOWN) { + return getDefaultMaxMessageCount(); + } + return quota_value; + } + + void setMaxMailboxSize(String user, long maxMailboxSize) { + createFieldsForUser(user); + session.execute( + update(CassandraMaxQuotaTable.TABLE_NAME) + .with( + set(CassandraMaxQuotaTable.SIZE_QUOTA_MAX, maxMailboxSize) + ) + .where( + eq(CassandraMaxQuotaTable.USER, user) + ) + ); + } + + void setMaxMessageCount(String user, long maxMessageCount) { + createFieldsForUser(user); + session.execute( + update(CassandraMaxQuotaTable.TABLE_NAME) + .with( + set(CassandraMaxQuotaTable.COUNT_QUOTA_MAX, maxMessageCount) + ) + .where( + eq(CassandraMaxQuotaTable.USER, user) + ) + ); + } + + long getDefaultMaxStorage() { + ResultSet resultSet = session.execute( + select(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA) + .from(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .where( + eq(CassandraDefaultMaxQuotaTable.ID, 1) + ) + ); + if(resultSet.isExhausted()) { + // No value specified. Set default max quotas to Quota.unlimited + session.execute( + insertInto(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.ID, 1) + ); + return Quota.UNLIMITED; + } + return resultSet.one().getLong(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA); + + } + + long getDefaultMaxMessageCount() { + ResultSet resultSet = session.execute( + select(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA) + .from(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .where( + eq(CassandraDefaultMaxQuotaTable.ID, 1) + ) + ); + if(resultSet.isExhausted()) { + // No value specified. Set default max quotas to Quota.unlimited + session.execute( + insertInto(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.ID, 1) + ); + return Quota.UNLIMITED; + } + return resultSet.one().getLong(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA); + } + + void setDefaultMaxMessageCount(long defaultMaxMessageCount) { + if(defaultRowExist()) { + session.execute( + update(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .with( + set(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA, defaultMaxMessageCount) + ) + .where( + eq(CassandraDefaultMaxQuotaTable.ID,1) + ) + ); + } else { + //We need to insert the configuration row. Set Quota.UNLIMITED as storage max default value + session.execute( + insertInto(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA, defaultMaxMessageCount) + .value(CassandraDefaultMaxQuotaTable.ID, 1) + ); + } + } + + void setDefaultMaxStorage(long defaultMaxStorage) { + if(defaultRowExist()) { + session.execute( + update(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .with( + set(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA, defaultMaxStorage) + ) + .where( + eq(CassandraDefaultMaxQuotaTable.ID,1) + ) + ); + } else { + //We need to insert the configuration row. Set Quota.UNLIMITED as storage max default value + session.execute( + insertInto(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_SIZE_QUOTA, defaultMaxStorage) + .value(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_COUNT_QUOTA, Quota.UNLIMITED) + .value(CassandraDefaultMaxQuotaTable.ID, 1) + ); + } + } + + private boolean defaultRowExist() { + // Check if ID 1 exists + ResultSet resultSet = session.execute( + select(CassandraDefaultMaxQuotaTable.ID) + .from(CassandraDefaultMaxQuotaTable.DEFAULT_MAX_QUOTA_TABLE) + .where( + eq(CassandraDefaultMaxQuotaTable.ID,1) + ) + ); + return !resultSet.isExhausted(); + } + + private void createFieldsForUser(String user) { + session.execute(insertInto(CassandraMaxQuotaTable.TABLE_NAME) + .value(CassandraMaxQuotaTable.USER, user) + .value(CassandraMaxQuotaTable.SIZE_QUOTA_MAX, Quota.UNKNOWN ) + .value(CassandraMaxQuotaTable.COUNT_QUOTA_MAX, Quota.UNKNOWN ) + .ifNotExists() + ); + } +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraDefaultMaxQuotaTable.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraDefaultMaxQuotaTable.java new file mode 100644 index 0000000..05fe13f --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraDefaultMaxQuotaTable.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.table; + +public interface CassandraDefaultMaxQuotaTable { + String DEFAULT_MAX_QUOTA_TABLE = "default_max_quota_table"; + String ID = "id"; + String DEFAULT_MAX_SIZE_QUOTA = "default_max_size_quota"; + String DEFAULT_MAX_COUNT_QUOTA = "default_max_count_quota"; +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMaxQuotaTable.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMaxQuotaTable.java new file mode 100644 index 0000000..c6b7d28 --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMaxQuotaTable.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.table; + +public interface CassandraMaxQuotaTable { + String TABLE_NAME = "quota_max"; + String USER = "user"; + String SIZE_QUOTA_MAX = "size_quota_max"; + String COUNT_QUOTA_MAX = "count_quota_max"; +} diff --git a/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraQuotaTable.java b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraQuotaTable.java new file mode 100644 index 0000000..0546af0 --- /dev/null +++ b/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraQuotaTable.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.table; + +public interface CassandraQuotaTable { + String TABLE_NAME = "quota"; + String USER = "user"; + String SIZE_QUOTA = "size_quota"; + String COUNT_QUOTA = "count_quota"; +} diff --git a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java index b97315c..83cdf97 100644 --- a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java +++ b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java @@ -57,9 +57,10 @@ public final class CassandraClusterSingleton { EmbeddedCassandraServerHelper.startEmbeddedCassandra(); // Let Cassandra initialization before creating // the session. Solve very fast computer tests run. - Thread.sleep(2000); + Thread.sleep(4000); this.session = new CassandraSession(CLUSTER_IP, CLUSTER_PORT_TEST, KEYSPACE_NAME, DEFAULT_REPLICATION_FACTOR); } catch (Exception e) { + e.printStackTrace(); throw new RuntimeException(e); } } @@ -94,6 +95,24 @@ public final class CassandraClusterSingleton { + "PRIMARY KEY (mailboxId, uid)" + ");"); } else if (tableName.equals("subscription")) { session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");"); + } else if (tableName.equals("quota")) { + session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".quota (" + + "user text PRIMARY KEY," + + "size_quota counter," + + "count_quota counter" + + ");"); + } else if (tableName.equals("quota_max")) { + session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".quota_max (" + + "user text PRIMARY KEY," + + "size_quota_max bigint," + + "count_quota_max bigint" + + ");"); + } else if(tableName.equals("default_max_quota_table")) { + session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".default_max_quota_table (" + + "id int PRIMARY KEY," + + "default_max_size_quota bigint," + + "default_max_count_quota bigint" + + ");"); } else { throw new NotImplementedException("We don't support the class " + tableName); } @@ -107,6 +126,9 @@ public final class CassandraClusterSingleton { ensureTable("mailboxCounters"); ensureTable("message"); ensureTable("subscription"); + ensureTable("quota"); + ensureTable("quota_max"); + ensureTable("default_max_quota_table"); } /** @@ -126,6 +148,9 @@ public final class CassandraClusterSingleton { clearTable("mailboxCounters"); clearTable("message"); clearTable("subscription"); + clearTable("quota"); + clearTable("quota_max"); + clearTable("default_max_quota_table"); } } diff --git a/cassandra/src/test/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorageTest.java b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorageTest.java new file mode 100644 index 0000000..0d5267f --- /dev/null +++ b/cassandra/src/test/java/org/apache/james/mailbox/cassandra/quota/CassandraQuotaStorageTest.java @@ -0,0 +1,163 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.cassandra.quota; + +import org.apache.james.mailbox.cassandra.CassandraClusterSingleton; +import org.apache.james.mailbox.model.Quota; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/** + * Run tests for CassandraQuotaStorage. + */ +public class CassandraQuotaStorageTest { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraQuotaStorageTest.class); + public static final CassandraClusterSingleton CLUSTER = CassandraClusterSingleton.build(); + private static CassandraQuotaStorage quotaStorage = new CassandraQuotaStorage(CLUSTER.getConf()); + + @Before + public void setUp() throws Exception { + LOG.info("Set up for tests for Quotas over Cassandra"); + CLUSTER.ensureAllTables(); + CLUSTER.clearAllTables(); + } + + @After + public void cleanUp() { + CLUSTER.clearAllTables(); + } + + @Test + public void testSetUserQuota() { + quotaStorage.setUserQuotas("benwa", 42, 524); + quotaStorage.setUserQuotas("toto", 24, 444); + quotaStorage.setUserQuotas("tata", 56, 965); + + assertEquals(42, quotaStorage.getCount("benwa").get() ); + assertEquals(24, quotaStorage.getCount("toto").get() ); + assertEquals(56, quotaStorage.getCount("tata").get() ); + + assertEquals(524, quotaStorage.getSize("benwa").get() ); + assertEquals(444, quotaStorage.getSize("toto").get() ); + assertEquals(965, quotaStorage.getSize("tata").get() ); + } + + @Test + public void testAddCountQuotaStorage() { + quotaStorage.addCount("toubib",0); + assertEquals(0, quotaStorage.getCount("toubib").get() ); + + quotaStorage.setUserQuotas("benwa", 42, 524); + quotaStorage.setUserQuotas("toto", 24, 444); + quotaStorage.setUserQuotas("tata", 56, 965); + + quotaStorage.addCount("benwa", 36); + quotaStorage.addCount("toto", -12); + quotaStorage.addCount("tata", 0); + + assertEquals(78, quotaStorage.getCount("benwa").get() ); + assertEquals(12, quotaStorage.getCount("toto").get() ); + assertEquals(56, quotaStorage.getCount("tata").get() ); + } + + @Test + public void testAddSizeQuotaStorage() { + quotaStorage.addCount("toubib",0); + assertEquals(0, quotaStorage.getCount("toubib").get() ); + + quotaStorage.setUserQuotas("benwa", 42, 524); + quotaStorage.setUserQuotas("toto", 24, 444); + quotaStorage.setUserQuotas("tata", 56, 965); + + quotaStorage.addSize("benwa", 102); + quotaStorage.addSize("toto", -56); + quotaStorage.addSize("tata", 0); + + assertEquals(626, quotaStorage.getSize("benwa").get() ); + assertEquals(388, quotaStorage.getSize("toto").get() ); + assertEquals(965, quotaStorage.getSize("tata").get() ); + } + + @Test + public void testGetDefaultMaxMessageCount() { + assertEquals(Quota.UNLIMITED, quotaStorage.getDefaultMaxMessageCount() ); + } + + @Test + public void testGetDefaultMaxStorage() { + assertEquals(Quota.UNLIMITED, quotaStorage.getDefaultMaxStorage() ); + } + + @Test + public void testSetDefaultMaxMessageCount() { + quotaStorage.setDefaultMaxMessageCount(42); + assertEquals(42, quotaStorage.getDefaultMaxMessageCount()); + } + + @Test + public void testSetDefaultMaxStorage() { + quotaStorage.setDefaultMaxStorage(421); + assertEquals(421, quotaStorage.getDefaultMaxStorage()); + } + + @Test + public void testGetMaxMessageCont() { + quotaStorage.setDefaultMaxMessageCount(42); + assertEquals(42,quotaStorage.getMaxMessageCont("benwa")); + } + + @Test + public void testSetMaxMessageCont() { + quotaStorage.setMaxMessageCount("benwa",84); + assertEquals(84, quotaStorage.getMaxMessageCont("benwa")); + } + + @Test + public void testGetMaxMailboxSize() { + quotaStorage.setMaxMailboxSize("benwa", 421); + assertEquals(421, quotaStorage.getMaxMailboxSize("benwa")); + } + + @Test + public void testSetMaxMailboxSize() { + quotaStorage.setMaxMailboxSize("benwa", 842); + assertEquals(842,quotaStorage.getMaxMailboxSize("benwa")); + } + + @Test + public void testSetMailboxSizeInteractionOnGetMailboxSizeDefaultBehavior() { + quotaStorage.setMaxMailboxSize("benwa", 1024); + quotaStorage.setDefaultMaxMessageCount(102); + assertEquals(102, quotaStorage.getMaxMessageCont("benwa")); + } + + @Test + public void testSetMessageCountInteractionOnGetMailboxSizeDefaultBehavior() { + quotaStorage.setMaxMessageCount("benwa", 102); + quotaStorage.setDefaultMaxStorage(2048); + assertEquals(2048, quotaStorage.getMaxMailboxSize("benwa")); + } +} -- 2.1.3
signature.asc
Description: OpenPGP digital signature
