Author: davsclaus
Date: Thu Feb 12 07:55:07 2009
New Revision: 743644

URL: http://svn.apache.org/viewvc?rev=743644&view=rev
Log:
CAMEL-209: reuse producer template for enrich

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=743644&r1=743643&r2=743644&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
Thu Feb 12 07:55:07 2009
@@ -19,9 +19,10 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
@@ -31,57 +32,52 @@
  * data and additional data is delegated to an {...@link AggregationStrategy}
  * object.
  */
-public class Enricher implements Processor {
+public class Enricher extends ServiceSupport implements Processor {
+
+    private ProducerTemplate producer;
 
     private String resourceUri;
-    
     private AggregationStrategy aggregationStrategy;
-    
+
     /**
      * Creates a new {...@link Enricher}. The default aggregation strategy is 
to
      * copy the additional data obtained from the enricher's resource over the
      * input data. When using the copy aggregation strategy the enricher
      * degenerates to a normal transformer.
-     * 
-     * @param resourceUri
-     *            URI of resource endpoint for obtaining additional data.
+     *
+     * @param resourceUri URI of resource endpoint for obtaining additional 
data.
      */
     public Enricher(String resourceUri) {
         this(defaultAggregationStrategy(), resourceUri);
     }
-    
+
     /**
      * Creates a new {...@link Enricher}.
-     * 
-     * @param aggregationStrategy
-     *            aggregation strategy to aggregate input data and additional
-     *            data.
-     * @param resourceUri
-     *            URI of resource endpoint for obtaining additional data.
+     *
+     * @param aggregationStrategy aggregation strategy to aggregate input data 
and additional data.
+     * @param resourceUri         URI of resource endpoint for obtaining 
additional data.
      */
     public Enricher(AggregationStrategy aggregationStrategy, String 
resourceUri) {
         this.aggregationStrategy = aggregationStrategy;
         this.resourceUri = resourceUri;
     }
-    
+
     /**
      * Sets the aggregation strategy for this enricher.
-     * 
+     *
      * @param aggregationStrategy the aggregationStrategy to set
      */
     public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
-    
+
     /**
      * Sets the default aggregation strategy for this enricher.
-     * 
-     * @param aggregationStrategy the aggregationStrategy to set
      */
     public void setDefaultAggregationStrategy() {
         this.aggregationStrategy = defaultAggregationStrategy();
     }
-    
+
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
      * additional data from an endpoint identified by an
@@ -91,16 +87,15 @@
      * time. If the message exchange with the resource endpoint fails then no
      * aggregation will be done and the failed exchange content is copied over
      * to the original message exchange.
-     * 
-     * @param exchange
-     *            input data.
+     *
+     * @param exchange input data.
      */
     public void process(Exchange exchange) throws Exception {
         // create in-out exchange to obtain additional data from resource
         Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
         // send created exchange to resource endpoint
-        resourceExchange = 
exchange.getContext().createProducerTemplate().send(resourceUri, 
resourceExchange);
-        
+        resourceExchange = getProducerTemplate(exchange).send(resourceUri, 
resourceExchange);
+
         if (resourceExchange.isFailed()) {
             // copy resource exchange onto original exchange (preserving 
pattern)
             copyResultsPreservePattern(exchange, resourceExchange);
@@ -112,16 +107,14 @@
             copyResultsPreservePattern(exchange, aggregatedExchange);
         }
     }
-    
+
     /**
      * Creates a new {...@link DefaultExchange} instance from the given
      * <code>exchange</code>. The resulting exchange's pattern is defined by
      * <code>pattern</code>.
-     * 
-     * @param source
-     *            exchange to copy from.
-     * @param pattern
-     *            exchange pattern to set.
+     *
+     * @param source  exchange to copy from.
+     * @param pattern exchange pattern to set.
      * @return created exchange.
      */
     protected Exchange createResourceExchange(Exchange source, ExchangePattern 
pattern) {
@@ -136,18 +129,36 @@
             exchange.getOut().copyFrom(exchange.getIn());
         }
     }
-    
+
     private static AggregationStrategy defaultAggregationStrategy() {
         return new CopyAggregationStrategy();
     }
-    
+
+    private synchronized ProducerTemplate getProducerTemplate(Exchange 
exchange) throws Exception {
+        if (producer == null) {
+            producer = exchange.getContext().createProducerTemplate();
+            producer.start();
+        }
+        return producer;
+    }
+
+    protected void doStart() throws Exception {
+    }
+
+    protected void doStop() throws Exception {
+        if (producer != null) {
+            producer.stop();
+            producer = null;
+        }
+    }
+
     private static class CopyAggregationStrategy implements 
AggregationStrategy {
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
             copyResultsPreservePattern(oldExchange, newExchange);
             return oldExchange;
         }
-        
+
     }
-    
+
 }


Reply via email to