This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0c3f2f28ede94f8e7a47934d523b29d1eacc7720
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 14 06:57:24 2018 -0500

    CAMEL-12244: Intercept send to endpoint moved inner processor to external 
class.
---
 .../apache/camel/impl/InterceptSendToEndpoint.java | 107 +--------------
 .../impl/InterceptSendToEndpointProcessor.java     | 143 +++++++++++++++++++++
 2 files changed, 148 insertions(+), 102 deletions(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
index b94b3eb..629803c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
@@ -18,8 +18,6 @@ package org.apache.camel.impl;
 
 import java.util.Map;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -34,8 +32,6 @@ import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.processor.PipelineHelper.continueProcessing;
-
 /**
  * This is an endpoint when sending to it, is intercepted and is routed in a 
detour
  *
@@ -46,7 +42,6 @@ public class InterceptSendToEndpoint implements Endpoint, 
ShutdownableService {
     private static final Logger LOG = 
LoggerFactory.getLogger(InterceptSendToEndpoint.class);
 
     private final Endpoint delegate;
-    private Producer producer;
     private Processor detour;
     private boolean skip;
 
@@ -65,6 +60,10 @@ public class InterceptSendToEndpoint implements Endpoint, 
ShutdownableService {
         this.detour = detour;
     }
 
+    public Processor getDetour() {
+        return detour;
+    }
+
     public Endpoint getDelegate() {
         return delegate;
     }
@@ -99,103 +98,7 @@ public class InterceptSendToEndpoint implements Endpoint, 
ShutdownableService {
     }
 
     public Producer createProducer() throws Exception {
-        producer = delegate.createProducer();
-        return new DefaultAsyncProducer(delegate) {
-
-            public Endpoint getEndpoint() {
-                return producer.getEndpoint();
-            }
-
-            public Exchange createExchange() {
-                return producer.createExchange();
-            }
-
-            public Exchange createExchange(ExchangePattern pattern) {
-                return producer.createExchange(pattern);
-            }
-
-            @Deprecated
-            public Exchange createExchange(Exchange exchange) {
-                return producer.createExchange(exchange);
-            }
-
-            @Override
-            public boolean process(Exchange exchange, AsyncCallback callback) {
-                // process the detour so we do the detour routing
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sending to endpoint: {} is intercepted and 
detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, 
exchange});
-                }
-                // add header with the real endpoint uri
-                exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
-
-                // detour the exchange using synchronous processing
-                try {
-                    detour.process(exchange);
-                } catch (Exception e) {
-                    exchange.setException(e);
-                }
-
-                // Decide whether to continue or not; similar logic to the 
Pipeline
-                // check for error if so we should break out
-                if (!continueProcessing(exchange, "skip sending to original 
intended destination: " + getEndpoint(), LOG)) {
-                    callback.done(true);
-                    return true;
-                }
-
-                // determine if we should skip or not
-                boolean shouldSkip = skip;
-
-                // if then interceptor had a when predicate, then we should 
only skip if it matched
-                Boolean whenMatches = (Boolean) 
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
-                if (whenMatches != null) {
-                    shouldSkip = skip && whenMatches;
-                }
-
-                if (!shouldSkip) {
-                    if (exchange.hasOut()) {
-                        // replace OUT with IN as detour changed something
-                        exchange.setIn(exchange.getOut());
-                        exchange.setOut(null);
-                    }
-
-                    // route to original destination leveraging the 
asynchronous routing engine if possible
-                    if (producer instanceof AsyncProcessor) {
-                        AsyncProcessor async = (AsyncProcessor) producer;
-                        return async.process(exchange, callback);
-                    } else {
-                        try {
-                            producer.process(exchange);
-                        } catch (Exception e) {
-                            exchange.setException(e);
-                        }
-                        callback.done(true);
-                        return true;
-                    }
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Stop() means skip sending exchange to 
original intended destination: {} for exchange: {}", getEndpoint(), exchange);
-                    }
-                    callback.done(true);
-                    return true;
-                }
-            }
-
-            public boolean isSingleton() {
-                return producer.isSingleton();
-            }
-
-            public void start() throws Exception {
-                ServiceHelper.startService(detour);
-                // here we also need to start the producer
-                ServiceHelper.startService(producer);
-            }
-
-            public void stop() throws Exception {
-                // do not stop detour as it should only be stopped when the 
interceptor stops
-                // we should stop the producer here
-                ServiceHelper.stopService(producer);
-            }
-        };
+        return new InterceptSendToEndpointProcessor(this, delegate, skip);
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
new file mode 100644
index 0000000..10c1052
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.processor.PipelineHelper.continueProcessing;
+
+public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(InterceptSendToEndpointProcessor.class);
+    private final InterceptSendToEndpoint endpoint;
+    private final Endpoint delegate;
+    private final Producer producer;
+    private final boolean skip;
+
+    public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, boolean skip) throws Exception {
+        super(delegate);
+        this.endpoint = endpoint;
+        this.delegate = delegate;
+        this.producer = delegate.createProducer();
+        this.skip = skip;
+    }
+
+    public Endpoint getEndpoint() {
+        return producer.getEndpoint();
+    }
+
+    public Exchange createExchange() {
+        return producer.createExchange();
+    }
+
+    public Exchange createExchange(ExchangePattern pattern) {
+        return producer.createExchange(pattern);
+    }
+
+    @Deprecated
+    public Exchange createExchange(Exchange exchange) {
+        return producer.createExchange(exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // process the detour so we do the detour routing
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending to endpoint: {} is intercepted and detoured to: 
{} for exchange: {}", new Object[]{getEndpoint(), endpoint.getDetour(), 
exchange});
+        }
+        // add header with the real endpoint uri
+        exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
+
+        if (endpoint.getDetour() != null) {
+            // detour the exchange using synchronous processing
+            try {
+                endpoint.getDetour().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        }
+
+        // Decide whether to continue or not; similar logic to the Pipeline
+        // check for error if so we should break out
+        if (!continueProcessing(exchange, "skip sending to original intended 
destination: " + getEndpoint(), LOG)) {
+            callback.done(true);
+            return true;
+        }
+
+        // determine if we should skip or not
+        boolean shouldSkip = skip;
+
+        // if then interceptor had a when predicate, then we should only skip 
if it matched
+        Boolean whenMatches = (Boolean) 
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+        if (whenMatches != null) {
+            shouldSkip = skip && whenMatches;
+        }
+
+        if (!shouldSkip) {
+            if (exchange.hasOut()) {
+                // replace OUT with IN as detour changed something
+                exchange.setIn(exchange.getOut());
+                exchange.setOut(null);
+            }
+
+            // route to original destination leveraging the asynchronous 
routing engine if possible
+            if (producer instanceof AsyncProcessor) {
+                AsyncProcessor async = (AsyncProcessor) producer;
+                return async.process(exchange, callback);
+            } else {
+                try {
+                    producer.process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+                callback.done(true);
+                return true;
+            }
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stop() means skip sending exchange to original 
intended destination: {} for exchange: {}", getEndpoint(), exchange);
+            }
+            callback.done(true);
+            return true;
+        }
+    }
+
+    public boolean isSingleton() {
+        return producer.isSingleton();
+    }
+
+    public void start() throws Exception {
+        ServiceHelper.startService(endpoint.getDetour());
+        // here we also need to start the producer
+        ServiceHelper.startService(producer);
+    }
+
+    public void stop() throws Exception {
+        // do not stop detour as it should only be stopped when the 
interceptor stops
+        // we should stop the producer here
+        ServiceHelper.stopService(producer);
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to