--- Serge Knystautas <[EMAIL PROTECTED]> wrote:
[snip]
>
> Actually, make a copy of AvalonMailRepository and
> AvalonSpoolRepository,
> calling them MemoryMailRepository and
> MemorySpoolRepository (or
> whatever).
Ok, reworked my code to fit your spec. Attached are
the changes. Note the inner class:
MemorySpoolRepository$MemoryStreamRepository.
As far as my first change is concerned
(MimeMessageByteArrayInputStreamSource vs.
MimeMessageFileInputStreamSource), can that also be
selected from the configuration?
> At the end of your config file, you can
> see how to define
> memory: as a mail and spool repository that points
> to that new class.
> Then for various configurations you could switch
> from file://foobar to
> memory://foobar.
Almost. memory://fubar blows ugly chunks. I had to
change references to Avalon* to Memory* and the
File_Persistent_Stream_Repository to
MemorySpoolRepository$MemoryStreamRepository.
> This would allow us to bundle your in-memory version
> and let people
> switch as they want.
>
> --
> Serge Knystautas
> Lokitech >> software . strategy . design >>
> http://www.lokitech.com
> p. 301.656.5501
> e. [EMAIL PROTECTED]
>
=====
Gabor Kincses
Running Mandrake Linux 10.0
__________________________________
Do you Yahoo!?
Yahoo! Mail - Easier than ever with enhanced search. Learn more.
http://info.mail.yahoo.com/mail_250/***********************************************************************
* Copyright (c) 1999-2004 The Apache Software Foundation. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package org.apache.james.mailrepository;
import org.apache.avalon.cornerstone.services.store.ObjectRepository;
import org.apache.avalon.cornerstone.services.store.Repository;
import org.apache.avalon.cornerstone.services.store.Store;
import org.apache.avalon.cornerstone.services.store.StreamRepository;
import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.component.Component;
import org.apache.avalon.framework.component.ComponentException;
import org.apache.avalon.framework.component.ComponentManager;
import org.apache.avalon.framework.component.Composable;
import org.apache.avalon.framework.configuration.Configurable;
import org.apache.avalon.framework.configuration.Configuration;
import org.apache.avalon.framework.configuration.ConfigurationException;
import org.apache.avalon.framework.configuration.DefaultConfiguration;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageWrapper;
import org.apache.james.services.MailRepository;
import org.apache.james.util.Lock;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.*;
import javax.mail.MessagingException;
/**
* Implementation of a MailRepository on a FileSystem.
*
* Requires a configuration element in the .conf.xml file of the form:
* <repository destinationURL="file://path-to-root-dir-for-repository"
* type="MAIL"
* model="SYNCHRONOUS"/>
* Requires a logger called MailRepository.
*
* @version 1.0.0, 24/04/1999
*/
public class MemoryMailRepository
extends AbstractLogEnabled
implements MailRepository, Component, Configurable, Composable, Initializable {
/**
* Whether 'deep debugging' is turned on.
*/
protected final static boolean DEEP_DEBUG = false;
private static final String TYPE = "MAIL";
private Lock lock;
private Store store;
private StreamRepository sr;
private ObjectRepository or;
private String destination;
private Set keys;
private boolean fifo;
private Map objectMap = Collections.synchronizedMap(new HashMap());
private static StreamRepository msr = null;
/**
* @see org.apache.avalon.framework.component.Composable#compose(ComponentManager)
*/
public void compose( final ComponentManager componentManager )
throws ComponentException {
store = (Store)componentManager.
lookup( "org.apache.avalon.cornerstone.services.store.Store" );
}
/**
* @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
*/
public void configure(Configuration conf) throws ConfigurationException {
destination = conf.getAttribute("destinationURL");
if (getLogger().isDebugEnabled()) {
getLogger().debug("AvalonMailRepository.destinationURL: " + destination);
}
String checkType = conf.getAttribute("type");
if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
String exceptionString = "Attempt to configure AvalonMailRepository as " +
checkType;
if (getLogger().isWarnEnabled()) {
getLogger().warn(exceptionString);
}
throw new ConfigurationException(exceptionString);
}
fifo = conf.getAttributeAsBoolean("FIFO", false);
// ignore model
}
/**
* @see org.apache.avalon.framework.activity.Initializable#initialize()
*/
public void initialize()
throws Exception {
try {
//prepare Configurations for object and stream repositories
DefaultConfiguration objectConfiguration
= new DefaultConfiguration( "repository",
"generated:MemoryFileRepository.compose()" );
objectConfiguration.setAttribute("destination", destination);
objectConfiguration.setAttribute("type", "OBJECT");
objectConfiguration.setAttribute("model", "SYNCHRONOUS");
DefaultConfiguration streamConfiguration
= new DefaultConfiguration( "repository",
"generated:MemoryFileRepository.compose()" );
streamConfiguration.setAttribute( "destination", destination);
streamConfiguration.setAttribute( "type", "STREAM" );
streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
synchronized (MemorySpoolRepository.class) {
if (msr == null) msr = new MemoryStreamRepository();
}
sr = msr;
lock = new Lock();
keys = Collections.synchronizedSet(new HashSet());
} catch (Exception e) {
final String message = "Failed to retrieve Store component:" + e.getMessage();
getLogger().error( message, e );
throw e;
}
}
/**
* Releases a lock on a message identified by a key
*
* @param key the key of the message to be unlocked
*
* @return true if successfully released the lock, false otherwise
*/
public boolean unlock(String key) {
if (lock.unlock(key)) {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
StringBuffer debugBuffer =
new StringBuffer(256)
.append("Unlocked ")
.append(key)
.append(" for ")
.append(Thread.currentThread().getName())
.append(" @ ")
.append(new java.util.Date(System.currentTimeMillis()));
getLogger().debug(debugBuffer.toString());
}
// synchronized (this) {
// notifyAll();
// }
return true;
} else {
return false;
}
}
/**
* Obtains a lock on a message identified by a key
*
* @param key the key of the message to be locked
*
* @return true if successfully obtained the lock, false otherwise
*/
public boolean lock(String key) {
if (lock.lock(key)) {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
StringBuffer debugBuffer =
new StringBuffer(256)
.append("Locked ")
.append(key)
.append(" for ")
.append(Thread.currentThread().getName())
.append(" @ ")
.append(new java.util.Date(System.currentTimeMillis()));
getLogger().debug(debugBuffer.toString());
}
// synchronized (this) {
// notifyAll();
// }
return true;
} else {
return false;
}
}
/**
* Stores a message in this repository. Shouldn't this return the key
* under which it is stored?
*
* @param mc the mail message to store
*/
public void store(MailImpl mc) throws MessagingException {
try {
String key = mc.getName();
//Remember whether this key was locked
boolean wasLocked = lock.isLocked(key);
if (!wasLocked) {
//If it wasn't locked, we want a lock during the store
lock.lock(key);
}
try {
if (!keys.contains(key)) {
keys.add(key);
}
boolean saveStream = true;
if (mc.getMessage() instanceof MimeMessageWrapper) {
MimeMessageWrapper wrapper = (MimeMessageWrapper) mc.getMessage();
if (DEEP_DEBUG) {
System.out.println("Retrieving from: " + wrapper.getSourceId());
StringBuffer debugBuffer =
new StringBuffer(64)
.append("Saving to: ")
.append(destination)
.append("/")
.append(mc.getName());
System.out.println(debugBuffer.toString());
System.out.println("Modified: " + wrapper.isModified());
}
StringBuffer destinationBuffer =
new StringBuffer(128)
.append(destination)
.append("/")
.append(mc.getName());
if (wrapper.getSourceId().equals(destinationBuffer.toString()) && !wrapper.isModified()) {
//We're trying to save to the same place, and it's not modified... we shouldn't save.
//More importantly, if we try to save, we will create a 0-byte file since we're
//retrying to retrieve from a file we'll be overwriting.
saveStream = false;
}
}
if (saveStream) {
OutputStream out = null;
try {
out = sr.put(key);
mc.writeMessageTo(out);
} finally {
if (out != null) out.close();
}
}
//Always save the header information
objectMap.put(key, mc);
} finally {
if (!wasLocked) {
//If it wasn't locked, we need to now unlock
lock.unlock(key);
}
}
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
StringBuffer logBuffer =
new StringBuffer(64)
.append("Mail ")
.append(key)
.append(" stored.");
getLogger().debug(logBuffer.toString());
}
synchronized (this) {
// notifyAll();
notify();
Thread.yield();
}
} catch (Exception e) {
getLogger().error("Exception storing mail: " + e);
throw new MessagingException("Exception caught while storing Message Container: " + e);
}
}
/**
* Retrieves a message given a key. At the moment, keys can be obtained
* from list() in superinterface Store.Repository
*
* @param key the key of the message to retrieve
* @return the mail corresponding to this key, null if none exists
*/
public MailImpl retrieve(String key) throws MessagingException {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Retrieving mail: " + key);
}
try {
MailImpl mc = null;
try {
mc = (MailImpl) objectMap.get(key);
} catch (RuntimeException re) {
StringBuffer exceptionBuffer =
new StringBuffer(128)
.append("Exception retrieving mail: ")
.append(re.toString())
.append(", so we're deleting it... good riddance!");
getLogger().debug(exceptionBuffer.toString());
remove(key);
return null;
}
MimeMessageAvalonSource source = new MimeMessageAvalonSource(sr, "fubar", key);
mc.setMessage(new MimeMessageWrapper(source));
return mc;
} catch (Exception me) {
getLogger().error("Exception retrieving mail: " + me);
throw new MessagingException("Exception while retrieving mail: " + me.getMessage());
}
}
/**
* Removes a specified message
*
* @param mail the message to be removed from the repository
*/
public void remove(MailImpl mail) throws MessagingException {
remove(mail.getName());
}
/**
* Removes a Collection of mails from the repository
* @param mails The Collection of <code>MailImpl</code>'s to delete
* @throws MessagingException
* @since 2.2.0
*/
public void remove(Collection mails) throws MessagingException {
Iterator delList = mails.iterator();
while (delList.hasNext()) {
remove((MailImpl)delList.next());
}
}
/**
* Removes a message identified by key.
*
* @param key the key of the message to be removed from the repository
*/
public void remove(String key) throws MessagingException {
if (lock(key)) {
try {
keys.remove(key);
sr.remove(key);
objectMap.remove(key);
} finally {
unlock(key);
}
} else {
StringBuffer exceptionBuffer =
new StringBuffer(64)
.append("Cannot lock ")
.append(key)
.append(" to remove it");
throw new MessagingException(exceptionBuffer.toString());
}
}
/**
* List string keys of messages in repository.
*
* @return an <code>Iterator</code> over the list of keys in the repository
*
*/
public Iterator list() {
// Fix ConcurrentModificationException by cloning
// the keyset before getting an iterator
final ArrayList clone;
synchronized(keys) {
clone = new ArrayList(keys);
}
if (fifo) Collections.sort(clone); // Keys is a HashSet; impose FIFO for apps that need it
return clone.iterator();
}
public class MemoryStreamRepository implements StreamRepository {
protected final Map outs = new HashMap();
/* (non-Javadoc)
* @see org.apache.avalon.cornerstone.services.store.StreamRepository#put(java.lang.String)
*/
public synchronized OutputStream put(String key) {
OutputStream out = new ByteArrayOutputStream();
outs.put(key, out);
return out;
}
/* (non-Javadoc)
* @see org.apache.avalon.cornerstone.services.store.StreamRepository#get(java.lang.String)
*/
public synchronized InputStream get(String key) {
try {
ByteArrayOutputStream bout = (ByteArrayOutputStream) outs.get(key);
bout.flush();
byte[] ba = bout.toByteArray();
return new ByteArrayInputStream(ba);
} catch (IOException ioe) {
ioe.printStackTrace();
return null;
}
}
/* (non-Javadoc)
* @see org.apache.avalon.cornerstone.services.store.StreamRepository#remove(java.lang.String)
*/
public synchronized void remove(String key) {
outs.remove(key);
}
/* (non-Javadoc)
* @see org.apache.avalon.cornerstone.services.store.Repository#getChildRepository(java.lang.String)
*/
public Repository getChildRepository(String childName) {
System.err.println("NOT IMPLEMENTED");
new Exception().printStackTrace();
return null;
}
/* (non-Javadoc)
* @see org.apache.avalon.cornerstone.services.store.StreamRepository#list()
*/
public Iterator list() {
return outs.keySet().iterator();
}
}
}
/***********************************************************************
* Copyright (c) 1999-2004 The Apache Software Foundation. *
* All rights reserved. *
* ------------------------------------------------------------------- *
* Licensed under the Apache License, Version 2.0 (the "License"); you *
* may not use this file except in compliance with the License. You *
* may obtain a copy of the License at: *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, *
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *
* implied. See the License for the specific language governing *
* permissions and limitations under the License. *
***********************************************************************/
package org.apache.james.mailrepository;
import org.apache.avalon.cornerstone.services.store.ObjectRepository;
import org.apache.avalon.cornerstone.services.store.Repository;
import org.apache.avalon.cornerstone.services.store.StreamRepository;
import org.apache.avalon.framework.configuration.DefaultConfiguration;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageWrapper;
import org.apache.james.services.SpoolRepository;
import org.apache.james.util.Lock;
import org.apache.mailet.Mail;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import javax.mail.MessagingException;
/**
* Implementation of a MailRepository on a FileSystem.
*
* Requires a configuration element in the .conf.xml file of the form:
* <repository destinationURL="memory://"
* type="MAIL"
* model="SYNCHRONOUS"/>
* Requires a logger called MailRepository.
*
* @version 1.0.0, 24/04/1999
*/
public class MemorySpoolRepository
extends MemoryMailRepository
implements SpoolRepository {
/**
* <p>Returns an arbitrarily selected mail deposited in this Repository.
* Usage: SpoolManager calls accept() to see if there are any unprocessed
* mails in the spool repository.</p>
*
* <p>Synchronized to ensure thread safe access to the underlying spool.</p>
*
* @return the mail
*/
public synchronized Mail accept() throws InterruptedException {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Method accept() called");
}
return accept(new SpoolRepository.AcceptFilter () {
public boolean accept (String _, String __, long ___, String ____) {
return true;
}
public long getWaitTime () {
return 100;
}
});
}
/**
* <p>Returns an arbitrarily selected mail deposited in this Repository that
* is either ready immediately for delivery, or is younger than it's last_updated plus
* the number of failed attempts times the delay time.
* Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
* unprocessed mail is available.</p>
*
* <p>Synchronized to ensure thread safe access to the underlying spool.</p>
*
* @return the mail
*/
public synchronized Mail accept(final long delay) throws InterruptedException
{
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Method accept(delay) called");
}
return accept(new SpoolRepository.AcceptFilter () {
long youngest = 0;
public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
if (state.equals(Mail.ERROR)) {
//Test the time...
long timeToProcess = delay + lastUpdated;
if (System.currentTimeMillis() > timeToProcess) {
//We're ready to process this again
return true;
} else {
//We're not ready to process this.
if (youngest == 0 || youngest > timeToProcess) {
//Mark this as the next most likely possible mail to process
youngest = timeToProcess;
}
return false;
}
} else {
//This mail is good to go... return the key
return true;
}
}
public long getWaitTime () {
if (youngest == 0) {
return 0;
} else {
long duration = youngest - System.currentTimeMillis();
youngest = 0; //get ready for next round
return duration <= 0 ? 1 : duration;
}
}
});
}
/**
* Returns an arbitrarily select mail deposited in this Repository for
* which the supplied filter's accept method returns true.
* Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
* based on number of retries if the mail is ready for processing.
* If no message is ready the method will block until one is, the amount of time to block is
* determined by calling the filters getWaitTime method.
*
* <p>Synchronized to ensure thread safe access to the underlying spool.</p>
*
* @return the mail
*/
public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Method accept(Filter) called");
}
while (!Thread.currentThread().isInterrupted()) try {
for (Iterator it = list(); it.hasNext(); ) {
String s = it.next().toString();
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
StringBuffer logBuffer =
new StringBuffer(64)
.append("Found item ")
.append(s)
.append(" in spool.");
getLogger().debug(logBuffer.toString());
}
if (lock(s)) {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("accept(Filter) has locked: " + s);
}
try {
MailImpl mail = retrieve(s);
// Retrieve can return null if the mail is no longer on the spool
// (i.e. another thread has gotten to it first).
// In this case we simply continue to the next key
if (mail == null || !filter.accept (mail.getName(),
mail.getState(),
mail.getLastUpdated().getTime(),
mail.getErrorMessage())) {
unlock(s);
continue;
}
return mail;
} catch (javax.mail.MessagingException e) {
unlock(s);
getLogger().error("Exception during retrieve -- skipping item " + s, e);
}
}
}
//We did not find any... let's wait for a certain amount of time
wait (filter.getWaitTime());
Thread.yield();
} catch (InterruptedException ex) {
throw ex;
} catch (ConcurrentModificationException cme) {
// Should never get here now that list methods clones keyset for iterator
getLogger().error("CME in spooler - please report to http://james.apache.org", cme);
}
throw new InterruptedException();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]