Author: noel
Date: Tue Sep  5 21:38:23 2006
New Revision: 440612

URL: http://svn.apache.org/viewvc?view=rev&rev=440612
Log:
JAMES-603.  The salient change is that we push the filter all the way down to 
the code that processes the ResultSet, and we don't load messages into the 
cache that aren't accepted by the filter.  Unfortunately, we can no longer 
naively call setMaxRows, since we don't know how many rows we might have to 
process in order to get to even ANY valid messages, so we'll have to trust the 
JDBC driver to use cursors properly, rather than buffer a potentially huge 
ResultSet in memory.

Added:
    
james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
      - copied, changed from r440610, 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
Modified:
    
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java

Copied: 
james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
 (from r440610, 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java)
URL: 
http://svn.apache.org/viewvc/james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java?view=diff&rev=440612&p1=james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java&r1=440610&p2=james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java&r2=440612
==============================================================================
--- 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
 (original)
+++ 
james/server/branches/v2.3/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
 Tue Sep  5 21:38:23 2006
@@ -193,12 +193,12 @@
             //Loop through until we are either out of pending messages or have 
a message
             // that we can lock
             PendingMessage next = null;
-            while ((next = getNextPendingMessage()) != null && 
!Thread.currentThread().isInterrupted()) {
+            while ((next = getNextPendingMessage(filter)) != null && 
!Thread.currentThread().isInterrupted()) {
                 //Check whether this is time to expire
                 
-                boolean shouldProcess = filter.accept (next.key, next.state, 
next.lastUpdated, next.errorMessage);
+                // boolean shouldProcess = filter.accept (next.key, 
next.state, next.lastUpdated, next.errorMessage);
                 
-                if (shouldProcess && lock(next.key)) {
+                if (/*shouldProcess && */ lock(next.key)) {
                     try {
                         Mail mail = retrieve(next.key);
                         // Retrieve can return null if the mail is no longer 
on the spool
@@ -245,11 +245,12 @@
      * checks the last time pending messages was loaded and load if
      * it's been more than 1 second (should be configurable).
      */
-    private PendingMessage getNextPendingMessage() {
+    private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter 
filter) {
         synchronized (pendingMessages) {
             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < 
System.currentTimeMillis()) {
-                pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
-                loadPendingMessages();
+                // pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
+                loadPendingMessages(filter);
+                pendingMessagesLoadTime = Math.max(filter.getWaitTime(), 
LOAD_TIME_MININUM) + System.currentTimeMillis();
             }
 
             if (pendingMessages.size() == 0) {
@@ -263,7 +264,7 @@
     /**
      * Retrieves the pending messages that are in the database
      */
-    private void loadPendingMessages() {
+    private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
         //Loads a vector with PendingMessage objects
         synchronized (pendingMessages) {
             pendingMessages.clear();
@@ -276,7 +277,11 @@
                 listMessages =
                     
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
                 listMessages.setString(1, repositoryName);
-                listMessages.setMaxRows(maxPendingMessages);
+                // Too simplistic.  When filtering, we may need to see
+                // more than just maxPendingMessages to load the
+                // cache, so just hope that the driver and server use
+                // cursors properly.
+                // --> listMessages.setMaxRows(maxPendingMessages);
                 rsListMessages = listMessages.executeQuery();
                 // Continue to have it loop through the list of messages until 
we hit
                 // a possible message, or we retrieve maxPendingMessages 
messages.
@@ -287,7 +292,9 @@
                     String state = rsListMessages.getString(2);
                     long lastUpdated = 
rsListMessages.getTimestamp(3).getTime();
                     String errorMessage = rsListMessages.getString(4);
-                    pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    if (filter.accept(key, state, lastUpdated, errorMessage)) {
+                        pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    }
                 }
             } catch (SQLException sqle) {
                 //Log it and avoid reloading for a bit

Modified: 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java?view=diff&rev=440612&r1=440611&r2=440612
==============================================================================
--- 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
 (original)
+++ 
james/server/trunk/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
 Tue Sep  5 21:38:23 2006
@@ -193,12 +193,12 @@
             //Loop through until we are either out of pending messages or have 
a message
             // that we can lock
             PendingMessage next = null;
-            while ((next = getNextPendingMessage()) != null && 
!Thread.currentThread().isInterrupted()) {
+            while ((next = getNextPendingMessage(filter)) != null && 
!Thread.currentThread().isInterrupted()) {
                 //Check whether this is time to expire
                 
-                boolean shouldProcess = filter.accept (next.key, next.state, 
next.lastUpdated, next.errorMessage);
+                // boolean shouldProcess = filter.accept (next.key, 
next.state, next.lastUpdated, next.errorMessage);
                 
-                if (shouldProcess && lock(next.key)) {
+                if (/*shouldProcess && */ lock(next.key)) {
                     try {
                         Mail mail = retrieve(next.key);
                         // Retrieve can return null if the mail is no longer 
on the spool
@@ -245,11 +245,12 @@
      * checks the last time pending messages was loaded and load if
      * it's been more than 1 second (should be configurable).
      */
-    private PendingMessage getNextPendingMessage() {
+    private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter 
filter) {
         synchronized (pendingMessages) {
             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < 
System.currentTimeMillis()) {
-                pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
-                loadPendingMessages();
+                // pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
+                loadPendingMessages(filter);
+                pendingMessagesLoadTime = Math.max(filter.getWaitTime(), 
LOAD_TIME_MININUM) + System.currentTimeMillis();
             }
 
             if (pendingMessages.size() == 0) {
@@ -263,7 +264,7 @@
     /**
      * Retrieves the pending messages that are in the database
      */
-    private void loadPendingMessages() {
+    private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
         //Loads a vector with PendingMessage objects
         synchronized (pendingMessages) {
             pendingMessages.clear();
@@ -276,7 +277,11 @@
                 listMessages =
                     
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
                 listMessages.setString(1, repositoryName);
-                listMessages.setMaxRows(maxPendingMessages);
+                // Too simplistic.  When filtering, we may need to see
+                // more than just maxPendingMessages to load the
+                // cache, so just hope that the driver and server use
+                // cursors properly.
+                // --> listMessages.setMaxRows(maxPendingMessages);
                 rsListMessages = listMessages.executeQuery();
                 // Continue to have it loop through the list of messages until 
we hit
                 // a possible message, or we retrieve maxPendingMessages 
messages.
@@ -287,7 +292,9 @@
                     String state = rsListMessages.getString(2);
                     long lastUpdated = 
rsListMessages.getTimestamp(3).getTime();
                     String errorMessage = rsListMessages.getString(4);
-                    pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    if (filter.accept(key, state, lastUpdated, errorMessage)) {
+                        pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    }
                 }
             } catch (SQLException sqle) {
                 //Log it and avoid reloading for a bit



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to