Author: noel Date: Tue Sep 5 21:38:23 2006 New Revision: 440612 URL: http://svn.apache.org/viewvc?view=rev&rev=440612 Log: JAMES-603. The salient change is that we push the filter all the way down to the code that processes the ResultSet, and we don't load messages into the cache that aren't accepted by the filter. Unfortunately, we can no longer naively call setMaxRows, since we don't know how many rows we might have to process in order to get to even ANY valid messages, so we'll have to trust the JDBC driver to use cursors properly, rather than buffer a potentially huge ResultSet in memory.
Added: james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java - copied, changed from r440610, james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java Modified: james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java Copied: james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java (from r440610, james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java) URL: http://svn.apache.org/viewvc/james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java?view=diff&rev=440612&p1=james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java&r1=440610&p2=james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java&r2=440612 ============================================================================== --- james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java (original) +++ james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java Tue Sep 5 21:38:23 2006 @@ -193,12 +193,12 @@ //Loop through until we are either out of pending messages or have a message // that we can lock PendingMessage next = null; - while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) { + while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) { //Check whether this is time to expire - boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage); + // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage); - if (shouldProcess && lock(next.key)) { + if (/*shouldProcess && */ lock(next.key)) { try { Mail mail = retrieve(next.key); // Retrieve can return null if the mail is no longer on the spool @@ -245,11 +245,12 @@ * checks the last time pending messages was loaded and load if * it's been more than 1 second (should be configurable). */ - private PendingMessage getNextPendingMessage() { + private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) { synchronized (pendingMessages) { if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) { - pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis(); - loadPendingMessages(); + // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis(); + loadPendingMessages(filter); + pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis(); } if (pendingMessages.size() == 0) { @@ -263,7 +264,7 @@ /** * Retrieves the pending messages that are in the database */ - private void loadPendingMessages() { + private void loadPendingMessages(SpoolRepository.AcceptFilter filter) { //Loads a vector with PendingMessage objects synchronized (pendingMessages) { pendingMessages.clear(); @@ -276,7 +277,11 @@ listMessages = conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true)); listMessages.setString(1, repositoryName); - listMessages.setMaxRows(maxPendingMessages); + // Too simplistic. When filtering, we may need to see + // more than just maxPendingMessages to load the + // cache, so just hope that the driver and server use + // cursors properly. + // --> listMessages.setMaxRows(maxPendingMessages); rsListMessages = listMessages.executeQuery(); // Continue to have it loop through the list of messages until we hit // a possible message, or we retrieve maxPendingMessages messages. @@ -287,7 +292,9 @@ String state = rsListMessages.getString(2); long lastUpdated = rsListMessages.getTimestamp(3).getTime(); String errorMessage = rsListMessages.getString(4); - pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage)); + if (filter.accept(key, state, lastUpdated, errorMessage)) { + pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage)); + } } } catch (SQLException sqle) { //Log it and avoid reloading for a bit Modified: james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java URL: http://svn.apache.org/viewvc/james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java?view=diff&rev=440612&r1=440611&r2=440612 ============================================================================== --- james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java (original) +++ james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java Tue Sep 5 21:38:23 2006 @@ -193,12 +193,12 @@ //Loop through until we are either out of pending messages or have a message // that we can lock PendingMessage next = null; - while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) { + while ((next = getNextPendingMessage(filter)) != null && !Thread.currentThread().isInterrupted()) { //Check whether this is time to expire - boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage); + // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage); - if (shouldProcess && lock(next.key)) { + if (/*shouldProcess && */ lock(next.key)) { try { Mail mail = retrieve(next.key); // Retrieve can return null if the mail is no longer on the spool @@ -245,11 +245,12 @@ * checks the last time pending messages was loaded and load if * it's been more than 1 second (should be configurable). */ - private PendingMessage getNextPendingMessage() { + private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter filter) { synchronized (pendingMessages) { if (pendingMessages.size() == 0 && pendingMessagesLoadTime < System.currentTimeMillis()) { - pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis(); - loadPendingMessages(); + // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis(); + loadPendingMessages(filter); + pendingMessagesLoadTime = Math.max(filter.getWaitTime(), LOAD_TIME_MININUM) + System.currentTimeMillis(); } if (pendingMessages.size() == 0) { @@ -263,7 +264,7 @@ /** * Retrieves the pending messages that are in the database */ - private void loadPendingMessages() { + private void loadPendingMessages(SpoolRepository.AcceptFilter filter) { //Loads a vector with PendingMessage objects synchronized (pendingMessages) { pendingMessages.clear(); @@ -276,7 +277,11 @@ listMessages = conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true)); listMessages.setString(1, repositoryName); - listMessages.setMaxRows(maxPendingMessages); + // Too simplistic. When filtering, we may need to see + // more than just maxPendingMessages to load the + // cache, so just hope that the driver and server use + // cursors properly. + // --> listMessages.setMaxRows(maxPendingMessages); rsListMessages = listMessages.executeQuery(); // Continue to have it loop through the list of messages until we hit // a possible message, or we retrieve maxPendingMessages messages. @@ -287,7 +292,9 @@ String state = rsListMessages.getString(2); long lastUpdated = rsListMessages.getTimestamp(3).getTime(); String errorMessage = rsListMessages.getString(4); - pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage)); + if (filter.accept(key, state, lastUpdated, errorMessage)) { + pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage)); + } } } catch (SQLException sqle) { //Log it and avoid reloading for a bit --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]