Author: norman Date: Wed Sep 15 10:42:58 2010 New Revision: 997272 URL: http://svn.apache.org/viewvc?rev=997272&view=rev Log: Use one jms queue for the incomming spool (before we used one queue per processor in the spool). This improves performance and will help to simplify spool/queue management a lot (JAMES-1046)
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java Removed: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQRecipientList.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSRecipientList.java Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java Wed Sep 15 10:42:58 2010 @@ -41,7 +41,6 @@ import javax.mail.internet.ParseExceptio import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; -import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; @@ -61,6 +60,7 @@ import org.apache.james.lifecycle.Lifecy import org.apache.james.lifecycle.LogEnabled; import org.apache.james.services.MailServer; import org.apache.james.transport.camel.DisposeProcessor; +import org.apache.james.transport.camel.JamesCamelConstants; import org.apache.mailet.Mail; import org.apache.mailet.MailAddress; @@ -309,7 +309,7 @@ public abstract class AbstractMailServer */ public void sendMail(Mail mail) throws MessagingException { try { - producerTemplate.sendBody("direct:mailserver", ExchangePattern.InOnly, mail); + producerTemplate.sendBodyAndHeader("direct:mailserver", ExchangePattern.InOnly, mail, JamesCamelConstants.JAMES_MAIL_STATE, mail.getState()); } catch (Exception e) { logger.error("Error storing message: " + e.getMessage(),e); @@ -437,13 +437,12 @@ public abstract class AbstractMailServer } /** - * Return the camel endpoint uri which should get used for the given mail + * Return the camel endpoint uri which should get used * - * @param mail * @return toUri * */ - protected abstract String getToUri(Mail mail); + protected abstract String getToUri(); @@ -465,7 +464,6 @@ public abstract class AbstractMailServer private final class InjectionRouteBuilder extends RouteBuilder { - private final static String SLIP ="JAMES_TO_SLIP"; @Override public void configure() throws Exception { Processor disposeProcessor = new DisposeProcessor(); @@ -479,13 +477,7 @@ public abstract class AbstractMailServer // dispose the mail object if route processing was complete .onCompletion().process(disposeProcessor).end() - .transacted().pipeline().beanRef("mailClaimCheck").process(new Processor() { - - public void process(Exchange ex) throws Exception { - - ex.getIn().setHeader(SLIP, getToUri(ex.getIn().getBody(Mail.class))); - } - }).routingSlip(SLIP); + .transacted().pipeline().beanRef("mailClaimCheck").to(getToUri()); } } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java Wed Sep 15 10:42:58 2010 @@ -21,8 +21,6 @@ package org.apache.james; -import org.apache.mailet.Mail; - /** * MailServer implementation which use ActiveMQ to spool mails * @@ -31,7 +29,7 @@ import org.apache.mailet.Mail; public class ActiveMQMailServer extends AbstractMailServer{ @Override - protected String getToUri(Mail mail) { - return "activemq:queue:processor."+ mail.getState(); + protected String getToUri() { + return "activemq:queue:spool"; } } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java Wed Sep 15 10:42:58 2010 @@ -21,10 +21,8 @@ package org.apache.james; -import org.apache.mailet.Mail; - /** - * MailServer implementation which use JMS to spool mails# + * MailServer implementation which use JMS to spool mails * * If you use ActiveMQ for JMS you should use {...@link ActiveMQMailServer} * @@ -33,7 +31,7 @@ import org.apache.mailet.Mail; public class JMSMailServer extends AbstractMailServer{ @Override - protected String getToUri(Mail mail) { - return "jms:queue:processor."+ mail.getState(); + protected String getToUri() { + return "jms:queue:spool"; } } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010 @@ -90,6 +90,10 @@ public abstract class AbstractProcessorR Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger); Processor disposeProcessor = new DisposeProcessor(); + + from(getFromUri()).recipientList().method(ProcessorRecipientList.class); + + List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor"); for (int i = 0; i < processorConfs.size(); i++) { final HierarchicalConfiguration processorConf = processorConfs.get(i); @@ -101,7 +105,7 @@ public abstract class AbstractProcessorR matchers.put(processorName, new ArrayList<Matcher>()); // Check which route we need to go - ChoiceDefinition processorDef = fromF(getFromUri(processorName)) + ChoiceDefinition processorDef = from("direct:processor." + processorName) // exchange mode is inOnly .inOnly() @@ -224,8 +228,8 @@ public abstract class AbstractProcessorR .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop() // check if the state of the mail is the same as the - // current processor. If not just route it to the right endpoint via recipientList and stop processing. - .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(getRecipientList()).stop() + // current processor. If not just route it to the spool again + .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class).stop() // end first choice .end() @@ -258,8 +262,8 @@ public abstract class AbstractProcessorR // end the choice .end() - // route it to the right processor - .beanRef("mailClaimCheck").recipientList().method(getRecipientList()); + // route it to the spool again + .beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class); } } @@ -404,17 +408,11 @@ public abstract class AbstractProcessorR } /** - * Return the uri for the processor to use for consuming mails + * Return the uri for consuming mail * - * @param processor * @return consumerUri */ - protected abstract String getFromUri(String processor); + protected abstract String getFromUri(); - /** - * Return the class which get used for dynamic lookup the ToUris for the mails (producers) - * - * @return recipientListClass - */ - protected abstract Class<?> getRecipientList(); + } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010 @@ -33,13 +33,12 @@ public class ActiveMQProcessorRouteBuild } @Override - protected String getFromUri(String processorName) { - return "activemq:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers; - } - - @Override - protected Class<?> getRecipientList() { - return ActiveMQRecipientList.class; + protected String getFromUri() { + return "activemq:queue:spool?" + getOptions(); } + + protected String getOptions() { + return "maxConcurrentConsumers=" + maxConcurrentConsumers; + } } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010 @@ -23,22 +23,11 @@ package org.apache.james.transport.camel * * If you want to use ActiveMQ as JMS implementation you should use {...@link ActiveMQProcessorRouteBuilder} */ -public class JMSProcessorRouteBuilder extends AbstractProcessorRouteBuilder{ +public class JMSProcessorRouteBuilder extends ActiveMQProcessorRouteBuilder{ - private int maxConcurrentConsumers = 20; - public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { - this.maxConcurrentConsumers = maxConcurrentConsumers; - } - - @Override - protected String getFromUri(String processorName) { - return "jms:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers; + @Override + protected String getFromUri() { + return "jms:queue:spool ?" + getOptions(); } - - @Override - protected Class<?> getRecipientList() { - return JMSRecipientList.class; - } - } Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java Wed Sep 15 10:42:58 2010 @@ -36,4 +36,5 @@ public interface JamesCamelConstants { public final static String JAMES_RETRY_DELIVERY = "JAMES_RETRY_DELIVERY"; + public final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE"; } Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java?rev=997272&view=auto ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java (added) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java Wed Sep 15 10:42:58 2010 @@ -0,0 +1,29 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.transport.camel; + +import org.apache.camel.Body; +import org.apache.mailet.Mail; + +public final class ProcessorRecipientList { + + public String to(@Body Mail mail) { + return "direct:processor." + mail.getState(); + } +} Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=997272&r1=997271&r2=997272&view=diff ============================================================================== --- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original) +++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Wed Sep 15 10:42:58 2010 @@ -124,11 +124,12 @@ <camel:routeBuilder ref="processorRoute" /> </camel:camelContext> + <bean id="pollingjms" class="org.apache.james.transport.camel.JMSSelectorPollingComponent"/> <!-- Build the camelroute from the spoolmanager.xml using ActiveMQ as producer and consumer--> <bean id="spoolmanager" name="processorRoute" class="org.apache.james.transport.camel.ActiveMQProcessorRouteBuilder"> - <property name="maxConcurrentConsumers" value="200"/> + <property name="maxConcurrentConsumers" value="500"/> </bean> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org