Author: norman
Date: Tue Nov 9 17:11:47 2010
New Revision: 1033103
URL: http://svn.apache.org/viewvc?rev=1033103&view=rev
Log:
Make sure all InputStreams get closed before delete the Blob file, so windows
will not fail on delete (JAMES-1122)
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
Tue Nov 9 17:11:47 2010
@@ -36,6 +36,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
+import javax.mail.internet.SharedInputStream;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
@@ -45,7 +46,6 @@ import org.apache.james.core.MimeMessage
import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.core.MimeMessageSource;
import org.apache.james.core.MimeMessageWrapper;
-import org.apache.james.core.NonClosingSharedInputStream;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
@@ -170,13 +170,12 @@ public class ActiveMQMailQueue extends J
* org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax
* .jms.Message, org.apache.mailet.Mail)
*/
- @SuppressWarnings("unchecked")
protected void populateMailMimeMessage(Message message, Mail mail) throws
MessagingException, JMSException {
if (message instanceof BlobMessage) {
try {
BlobMessage blobMessage = (BlobMessage) message;
try {
- // store URL and queuenamefor later usage
+ // store URL and queuename for later usage
mail.setAttribute(JAMES_BLOB_URL, blobMessage.getURL());
mail.setAttribute(JAMES_QUEUE_NAME, queuename);
} catch (MalformedURLException e) {
@@ -185,12 +184,16 @@ public class ActiveMQMailQueue extends J
}
InputStream in = blobMessage.getInputStream();
MimeMessageSource source;
-
- if (in instanceof NonClosingSharedInputStream) {
- source = new MimeMessageBlobMessageSource(blobMessage);
+
+ // if its a SharedInputStream we can make use of some more
performant implementation which don't need to copy the message to a temporary
file
+ if (in instanceof SharedInputStream) {
+ String sourceId = message.getJMSMessageID();
+ long size =
message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+ source = new
MimeMessageBlobMessageSource((SharedInputStream) in, size, sourceId);
} else {
- source = new MimeMessageInputStreamSource(mail.getName(),
blobMessage.getInputStream());
+ source = new MimeMessageInputStreamSource(mail.getName(),
in);
}
+
mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
} catch (IOException e) {
throw new MailQueueException("Unable to populate MimeMessage
for mail " + mail.getName(), e);
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
Tue Nov 9 17:11:47 2010
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.mail.util.SharedFileInputStream;
@@ -45,6 +46,7 @@ public class FileSystemBlobStrategy impl
private final FileSystem fs;
private final BlobTransferPolicy policy;
+ private final ConcurrentHashMap<String, SharedFileInputStream> map = new
ConcurrentHashMap<String, SharedFileInputStream>();
public FileSystemBlobStrategy(final BlobTransferPolicy policy, final
FileSystem fs) {
this.fs = fs;
@@ -64,17 +66,35 @@ public class FileSystemBlobStrategy impl
* @see
org.apache.activemq.blob.BlobUploadStrategy#uploadStream(org.apache.activemq.command.ActiveMQBlobMessage,
java.io.InputStream)
*/
public URL uploadStream(ActiveMQBlobMessage message, InputStream in)
throws JMSException, IOException {
- File f = getFile(message);
- FileOutputStream out = new FileOutputStream(f);
- byte[] buffer = new byte[policy.getBufferSize()];
- for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
- out.write(buffer, 0, c);
+ FileOutputStream out = null;
+ try {
+ File f = getFile(message);
+ out = new FileOutputStream(f);
+ byte[] buffer = new byte[policy.getBufferSize()];
+ for (int c = in.read(buffer); c != -1; c = in.read(buffer)) {
+ out.write(buffer, 0, c);
+ out.flush();
+ }
out.flush();
+ // File.toURL() is deprecated
+ return f.toURI().toURL();
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ // ignore on close
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ // ignore on close
+ }
+ }
}
- out.flush();
- out.close();
- // File.toURL() is deprecated
- return f.toURI().toURL();
+
}
/*
@@ -83,6 +103,12 @@ public class FileSystemBlobStrategy impl
*/
public void deleteFile(ActiveMQBlobMessage message) throws IOException,
JMSException {
File f = getFile(message);
+ SharedFileInputStream in = map.remove(f.getCanonicalPath());
+ try {
+ if (in != null) in .close();
+ } catch (IOException e) {
+ // ignore here
+ }
if (f.exists()) {
if (f.delete() == false) {
throw new IOException("Unable to delete file " + f);
@@ -94,7 +120,15 @@ public class FileSystemBlobStrategy impl
* Returns a {...@link SharedFileInputStream} for the give {...@link
BlobMessage}
*/
public InputStream getInputStream(ActiveMQBlobMessage message) throws
IOException, JMSException {
- return new SharedFileInputStream(getFile(message));
+ File f = getFile(message);
+ String key = f.getCanonicalPath();
+ // use exactly one SharedFileInputStream per file so we can keep track
of filehandles
+ // See JAMES-1122
+ SharedFileInputStream in = map.putIfAbsent(key, new
SharedFileInputStream(f));
+ if (in == null) {
+ in = map.get(key);
+ }
+ return in;
}
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1033103&r1=1033102&r2=1033103&view=diff
==============================================================================
---
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
(original)
+++
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
Tue Nov 9 17:11:47 2010
@@ -20,29 +20,28 @@ package org.apache.james.queue.activemq;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
-import javax.jms.JMSException;
-import javax.mail.util.SharedFileInputStream;
+import javax.mail.internet.SharedInputStream;
-import org.apache.activemq.BlobMessage;
import org.apache.activemq.Disposable;
import org.apache.james.core.MimeMessageSource;
/**
- * {...@link MimeMessageSource} which use a {...@link BlobMessage} as input.
Be aware that {...@link BlobMessage} must contain
- * a {...@link SharedFileInputStream} for this implementation!
*
*/
public class MimeMessageBlobMessageSource extends MimeMessageSource implements
ActiveMQSupport, Disposable{
- private SharedFileInputStream in;
+ private SharedInputStream in;
private String sourceId;
private long size;
+ private List<InputStream> streams = new ArrayList<InputStream>();
- public MimeMessageBlobMessageSource(BlobMessage message) throws
JMSException, IOException {
- this.sourceId = message.getJMSMessageID();
- this.size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
- this.in = (SharedFileInputStream) message.getInputStream();
+ public MimeMessageBlobMessageSource(SharedInputStream in, long size,
String sourceId) {
+ this.in = in;
+ this.size = size;
+ this.sourceId = sourceId;
}
@@ -50,8 +49,10 @@ public class MimeMessageBlobMessageSourc
* (non-Javadoc)
* @see org.apache.james.core.MimeMessageSource#getInputStream()
*/
- public InputStream getInputStream() throws IOException {
- return in.newStream(0, -1);
+ public synchronized InputStream getInputStream() throws IOException {
+ InputStream sin = in.newStream(0, -1);
+ streams.add(sin);
+ return sin;
}
/*
@@ -74,11 +75,23 @@ public class MimeMessageBlobMessageSourc
/**
* Call dispose on the {...@link InputStream}
*/
- public void dispose() {
+ public synchronized void dispose() {
+
try {
- in.close();
+ ((InputStream) in).close();
} catch (IOException e) {
- // ingore on dispose
+ // ignore on dispose
+ }
+ in = null;
+ for (int i = 0; i < streams.size(); i++) {
+ InputStream s = streams.get(i);
+ try {
+ s.close();
+ } catch (IOException e) {
+ // ignore on dispose
+ }
+ s = null;
}
+ streams.clear();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]