> Attached is a first pass at an attempt to fix.

[EMAIL PROTECTED] list filters!  :-(

Ok, attached to JAMES-603 issue, and included for reference below (although
the mailing list will corrupt the formatting).

        --- Noel

Index: src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
===================================================================
--- src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
(revision 439942)
+++ src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
(working copy)
@@ -189,12 +189,12 @@
             //Loop through until we are either out of pending messages or
have a message
             // that we can lock
             PendingMessage next = null;
-            while ((next = getNextPendingMessage()) != null &&
!Thread.currentThread().isInterrupted()) {
+            while ((next = getNextPendingMessage(filter)) != null &&
!Thread.currentThread().isInterrupted()) {
                 //Check whether this is time to expire

-                boolean shouldProcess = filter.accept (next.key,
next.state, next.lastUpdated, next.errorMessage);
+                // boolean shouldProcess = filter.accept (next.key,
next.state, next.lastUpdated, next.errorMessage);

-                if (shouldProcess && lock(next.key)) {
+                if (/*shouldProcess && */ lock(next.key)) {
                     try {
                         Mail mail = retrieve(next.key);
                         // Retrieve can return null if the mail is no
longer on the spool
@@ -241,11 +241,12 @@
      * checks the last time pending messages was loaded and load if
      * it's been more than 1 second (should be configurable).
      */
-    private PendingMessage getNextPendingMessage() {
+    private PendingMessage
getNextPendingMessage(SpoolRepository.AcceptFilter filter) {
         synchronized (pendingMessages) {
             if (pendingMessages.size() == 0 && pendingMessagesLoadTime <
System.currentTimeMillis()) {
-                pendingMessagesLoadTime = LOAD_TIME_MININUM +
System.currentTimeMillis();
-                loadPendingMessages();
+                // pendingMessagesLoadTime = LOAD_TIME_MININUM +
System.currentTimeMillis();
+                loadPendingMessages(filter);
+                pendingMessagesLoadTime = Math.max(filter.getWaitTime(),
LOAD_TIME_MININUM) + System.currentTimeMillis();
             }

             if (pendingMessages.size() == 0) {
@@ -259,7 +260,7 @@
     /**
      * Retrieves the pending messages that are in the database
      */
-    private void loadPendingMessages() {
+    private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
         //Loads a vector with PendingMessage objects
         synchronized (pendingMessages) {
             pendingMessages.clear();
@@ -272,7 +273,11 @@
                 listMessages =
                     conn.prepareStatement(sqlQueries.getSqlString("listMess
agesSQL", true));
                 listMessages.setString(1, repositoryName);
-                listMessages.setMaxRows(maxPendingMessages);
+                // Too simplistic.  When filtering, we may need to see
+                // more than just maxPendingMessages to load the
+                // cache, so just hope that the driver and server use
+                // cursors properly.
+                // --> listMessages.setMaxRows(maxPendingMessages);
                 rsListMessages = listMessages.executeQuery();
                 // Continue to have it loop through the list of messages
until we hit
                 // a possible message, or we retrieve maxPendingMessages
messages.
@@ -283,7 +288,9 @@
                     String state = rsListMessages.getString(2);
                     long lastUpdated =
rsListMessages.getTimestamp(3).getTime();
                     String errorMessage = rsListMessages.getString(4);
-                    pendingMessages.add(new PendingMessage(key, state,
lastUpdated, errorMessage));
+                    if (filter.accept(key, state, lastUpdated,
errorMessage)) {
+                        pendingMessages.add(new PendingMessage(key, state,
lastUpdated, errorMessage));
+                    }
                 }
             } catch (SQLException sqle) {
                 //Log it and avoid reloading for a bit

Index: src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
===================================================================
--- src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java   
(revision 439942)
+++ src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java   
(working copy)
@@ -189,12 +189,12 @@
             //Loop through until we are either out of pending messages or have 
a message
             // that we can lock
             PendingMessage next = null;
-            while ((next = getNextPendingMessage()) != null && 
!Thread.currentThread().isInterrupted()) {
+            while ((next = getNextPendingMessage(filter)) != null && 
!Thread.currentThread().isInterrupted()) {
                 //Check whether this is time to expire
                 
-                boolean shouldProcess = filter.accept (next.key, next.state, 
next.lastUpdated, next.errorMessage);
+                // boolean shouldProcess = filter.accept (next.key, 
next.state, next.lastUpdated, next.errorMessage);
                 
-                if (shouldProcess && lock(next.key)) {
+                if (/*shouldProcess && */ lock(next.key)) {
                     try {
                         Mail mail = retrieve(next.key);
                         // Retrieve can return null if the mail is no longer 
on the spool
@@ -241,11 +241,12 @@
      * checks the last time pending messages was loaded and load if
      * it's been more than 1 second (should be configurable).
      */
-    private PendingMessage getNextPendingMessage() {
+    private PendingMessage getNextPendingMessage(SpoolRepository.AcceptFilter 
filter) {
         synchronized (pendingMessages) {
             if (pendingMessages.size() == 0 && pendingMessagesLoadTime < 
System.currentTimeMillis()) {
-                pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
-                loadPendingMessages();
+                // pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
+                loadPendingMessages(filter);
+                pendingMessagesLoadTime = Math.max(filter.getWaitTime(), 
LOAD_TIME_MININUM) + System.currentTimeMillis();
             }
 
             if (pendingMessages.size() == 0) {
@@ -259,7 +260,7 @@
     /**
      * Retrieves the pending messages that are in the database
      */
-    private void loadPendingMessages() {
+    private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
         //Loads a vector with PendingMessage objects
         synchronized (pendingMessages) {
             pendingMessages.clear();
@@ -272,7 +273,11 @@
                 listMessages =
                     
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
                 listMessages.setString(1, repositoryName);
-                listMessages.setMaxRows(maxPendingMessages);
+                // Too simplistic.  When filtering, we may need to see
+                // more than just maxPendingMessages to load the
+                // cache, so just hope that the driver and server use
+                // cursors properly.
+                // --> listMessages.setMaxRows(maxPendingMessages);
                 rsListMessages = listMessages.executeQuery();
                 // Continue to have it loop through the list of messages until 
we hit
                 // a possible message, or we retrieve maxPendingMessages 
messages.
@@ -283,7 +288,9 @@
                     String state = rsListMessages.getString(2);
                     long lastUpdated = 
rsListMessages.getTimestamp(3).getTime();
                     String errorMessage = rsListMessages.getString(4);
-                    pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    if (filter.accept(key, state, lastUpdated, errorMessage)) {
+                        pendingMessages.add(new PendingMessage(key, state, 
lastUpdated, errorMessage));
+                    }
                 }
             } catch (SQLException sqle) {
                 //Log it and avoid reloading for a bit

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to