Author: davsclaus Date: Thu Feb 12 07:55:07 2009 New Revision: 743644 URL: http://svn.apache.org/viewvc?rev=743644&view=rev Log: CAMEL-209: reuse producer template for enrich
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=743644&r1=743643&r2=743644&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Thu Feb 12 07:55:07 2009 @@ -19,9 +19,10 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.ServiceSupport; import org.apache.camel.processor.aggregate.AggregationStrategy; - import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; /** @@ -31,57 +32,52 @@ * data and additional data is delegated to an {...@link AggregationStrategy} * object. */ -public class Enricher implements Processor { +public class Enricher extends ServiceSupport implements Processor { + + private ProducerTemplate producer; private String resourceUri; - private AggregationStrategy aggregationStrategy; - + /** * Creates a new {...@link Enricher}. The default aggregation strategy is to * copy the additional data obtained from the enricher's resource over the * input data. When using the copy aggregation strategy the enricher * degenerates to a normal transformer. - * - * @param resourceUri - * URI of resource endpoint for obtaining additional data. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. */ public Enricher(String resourceUri) { this(defaultAggregationStrategy(), resourceUri); } - + /** * Creates a new {...@link Enricher}. - * - * @param aggregationStrategy - * aggregation strategy to aggregate input data and additional - * data. - * @param resourceUri - * URI of resource endpoint for obtaining additional data. + * + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param resourceUri URI of resource endpoint for obtaining additional data. */ public Enricher(AggregationStrategy aggregationStrategy, String resourceUri) { this.aggregationStrategy = aggregationStrategy; this.resourceUri = resourceUri; } - + /** * Sets the aggregation strategy for this enricher. - * + * * @param aggregationStrategy the aggregationStrategy to set */ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } - + /** * Sets the default aggregation strategy for this enricher. - * - * @param aggregationStrategy the aggregationStrategy to set */ public void setDefaultAggregationStrategy() { this.aggregationStrategy = defaultAggregationStrategy(); } - + /** * Enriches the input data (<code>exchange</code>) by first obtaining * additional data from an endpoint identified by an @@ -91,16 +87,15 @@ * time. If the message exchange with the resource endpoint fails then no * aggregation will be done and the failed exchange content is copied over * to the original message exchange. - * - * @param exchange - * input data. + * + * @param exchange input data. */ public void process(Exchange exchange) throws Exception { // create in-out exchange to obtain additional data from resource Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); // send created exchange to resource endpoint - resourceExchange = exchange.getContext().createProducerTemplate().send(resourceUri, resourceExchange); - + resourceExchange = getProducerTemplate(exchange).send(resourceUri, resourceExchange); + if (resourceExchange.isFailed()) { // copy resource exchange onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, resourceExchange); @@ -112,16 +107,14 @@ copyResultsPreservePattern(exchange, aggregatedExchange); } } - + /** * Creates a new {...@link DefaultExchange} instance from the given * <code>exchange</code>. The resulting exchange's pattern is defined by * <code>pattern</code>. - * - * @param source - * exchange to copy from. - * @param pattern - * exchange pattern to set. + * + * @param source exchange to copy from. + * @param pattern exchange pattern to set. * @return created exchange. */ protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) { @@ -136,18 +129,36 @@ exchange.getOut().copyFrom(exchange.getIn()); } } - + private static AggregationStrategy defaultAggregationStrategy() { return new CopyAggregationStrategy(); } - + + private synchronized ProducerTemplate getProducerTemplate(Exchange exchange) throws Exception { + if (producer == null) { + producer = exchange.getContext().createProducerTemplate(); + producer.start(); + } + return producer; + } + + protected void doStart() throws Exception { + } + + protected void doStop() throws Exception { + if (producer != null) { + producer.stop(); + producer = null; + } + } + private static class CopyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { copyResultsPreservePattern(oldExchange, newExchange); return oldExchange; } - + } - + }