Author: norman
Date: Tue Nov  9 17:11:47 2010
New Revision: 1033103

URL: http://svn.apache.org/viewvc?rev=1033103&view=rev
Log:
Make sure all InputStreams get closed before delete the Blob file, so windows 
will not fail on delete (JAMES-1122)

Modified:
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 Tue Nov  9 17:11:47 2010
@@ -36,6 +36,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
+import javax.mail.internet.SharedInputStream;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
@@ -45,7 +46,6 @@ import org.apache.james.core.MimeMessage
 import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.core.MimeMessageSource;
 import org.apache.james.core.MimeMessageWrapper;
-import org.apache.james.core.NonClosingSharedInputStream;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
@@ -170,13 +170,12 @@ public class ActiveMQMailQueue extends J
      * org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax
      * .jms.Message, org.apache.mailet.Mail)
      */
-    @SuppressWarnings("unchecked")
     protected void populateMailMimeMessage(Message message, Mail mail) throws 
MessagingException, JMSException {
         if (message instanceof BlobMessage) {
             try {
                 BlobMessage blobMessage = (BlobMessage) message;
                 try {
-                    // store URL and queuenamefor later usage
+                    // store URL and queuename for later usage
                     mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
                     mail.setAttribute(JAMES_QUEUE_NAME, queuename);
                 } catch (MalformedURLException e) {
@@ -185,12 +184,16 @@ public class ActiveMQMailQueue extends J
                 }
                 InputStream in = blobMessage.getInputStream();
                 MimeMessageSource source;
-  
-                if (in instanceof NonClosingSharedInputStream) {
-                    source = new MimeMessageBlobMessageSource(blobMessage);
+ 
+                // if its a SharedInputStream we can make use of some more 
performant implementation which don't need to copy the message to a temporary 
file
+                if (in instanceof SharedInputStream) {
+                    String sourceId = message.getJMSMessageID();
+                    long size = 
message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+                    source = new 
MimeMessageBlobMessageSource((SharedInputStream) in, size, sourceId);
                 } else {
-                    source = new MimeMessageInputStreamSource(mail.getName(), 
blobMessage.getInputStream());
+                    source = new MimeMessageInputStreamSource(mail.getName(), 
in);
                 }
+        
                 mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
             } catch (IOException e) {
                 throw new MailQueueException("Unable to populate MimeMessage 
for mail " + mail.getName(), e);

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
 Tue Nov  9 17:11:47 2010
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 import javax.mail.util.SharedFileInputStream;
@@ -45,6 +46,7 @@ public class FileSystemBlobStrategy impl
    
     private final FileSystem fs;
     private final BlobTransferPolicy policy;
+    private final ConcurrentHashMap<String, SharedFileInputStream> map = new 
ConcurrentHashMap<String, SharedFileInputStream>();
 
     public FileSystemBlobStrategy(final BlobTransferPolicy policy, final 
FileSystem fs) {
         this.fs = fs;
@@ -64,17 +66,35 @@ public class FileSystemBlobStrategy impl
      * @see 
org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage,
 java.io.InputStream)
      */
     public URL uploadStream(ActiveMQBlobMessage message, InputStream in) 
throws JMSException, IOException {
-        File f = getFile(message);
-        FileOutputStream out = new FileOutputStream(f);
-        byte[] buffer = new byte[policy.getBufferSize()];
-        for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
-            out.write(buffer, 0, c);
+        FileOutputStream out = null;
+        try {
+            File f = getFile(message);
+            out = new FileOutputStream(f);
+            byte[] buffer = new byte[policy.getBufferSize()];
+            for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+                out.write(buffer, 0, c);
+                out.flush();
+            }
             out.flush();
+            // File.toURL() is deprecated
+            return f.toURI().toURL();
+        } finally {
+            if (in != null) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    // ignore on close
+                }
+            }
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    // ignore on close
+                }
+            }
         }
-        out.flush();
-        out.close();
-        // File.toURL() is deprecated
-        return f.toURI().toURL();
+
     }
 
     /*
@@ -83,6 +103,12 @@ public class FileSystemBlobStrategy impl
      */
     public void deleteFile(ActiveMQBlobMessage message) throws IOException, 
JMSException {
         File f = getFile(message);
+        SharedFileInputStream in = map.remove(f.getCanonicalPath());
+        try {
+            if (in != null) in .close();
+        } catch (IOException e) {
+            // ignore here
+        }
         if (f.exists()) {
             if (f.delete() == false) {
                 throw new IOException("Unable to delete file " + f);
@@ -94,7 +120,15 @@ public class FileSystemBlobStrategy impl
      * Returns a {...@link SharedFileInputStream} for the give {...@link 
BlobMessage}
      */
     public InputStream getInputStream(ActiveMQBlobMessage message) throws 
IOException, JMSException {
-        return new SharedFileInputStream(getFile(message));
+        File f = getFile(message);
+        String key = f.getCanonicalPath();
+        // use exactly one SharedFileInputStream per file so we can keep track 
of filehandles
+        // See JAMES-1122
+        SharedFileInputStream in = map.putIfAbsent(key, new 
SharedFileInputStream(f));
+        if (in == null) {
+            in = map.get(key);
+        }
+        return in;
     }
 
     

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
 Tue Nov  9 17:11:47 2010
@@ -20,29 +20,28 @@ package org.apache.james.queue.activemq;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
-import javax.jms.JMSException;
-import javax.mail.util.SharedFileInputStream;
+import javax.mail.internet.SharedInputStream;
 
-import org.apache.activemq.BlobMessage;
 import org.apache.activemq.Disposable;
 import org.apache.james.core.MimeMessageSource;
 
 /**
- * {...@link MimeMessageSource} which use a {...@link BlobMessage} as input. 
Be aware that {...@link BlobMessage} must contain
- * a {...@link SharedFileInputStream} for this implementation!
  *
  */
 public class MimeMessageBlobMessageSource extends MimeMessageSource implements 
ActiveMQSupport, Disposable{
 
-    private SharedFileInputStream in;
+    private SharedInputStream in;
     private String sourceId;
     private long size;
+    private List<InputStream> streams = new ArrayList<InputStream>();
 
-    public MimeMessageBlobMessageSource(BlobMessage message) throws 
JMSException, IOException {
-        this.sourceId = message.getJMSMessageID();
-        this.size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
-        this.in = (SharedFileInputStream) message.getInputStream();
+    public MimeMessageBlobMessageSource(SharedInputStream in, long size, 
String sourceId)  {
+        this.in = in;
+        this.size = size;
+        this.sourceId = sourceId;
     }
     
 
@@ -50,8 +49,10 @@ public class MimeMessageBlobMessageSourc
      * (non-Javadoc)
      * @see org.apache.james.core.MimeMessageSource#getInputStream()
      */
-    public InputStream getInputStream() throws IOException {
-        return in.newStream(0, -1);
+    public synchronized InputStream getInputStream() throws IOException {
+        InputStream sin = in.newStream(0, -1);
+        streams.add(sin);
+        return sin;
     }
 
     /*
@@ -74,11 +75,23 @@ public class MimeMessageBlobMessageSourc
     /**
      * Call dispose on the {...@link InputStream}
      */
-    public void dispose() {
+    public synchronized void dispose() {
+
         try {
-            in.close();
+            ((InputStream) in).close();
         } catch (IOException e) {
-            // ingore on dispose
+            // ignore on dispose
+        }
+        in = null;
+        for (int i = 0; i < streams.size(); i++) {
+            InputStream s = streams.get(i);
+            try {
+                s.close();
+            } catch (IOException e) {
+                // ignore on dispose
+            }
+            s = null;
         }
+        streams.clear();
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to