This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0c3f2f28ede94f8e7a47934d523b29d1eacc7720 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Feb 14 06:57:24 2018 -0500 CAMEL-12244: Intercept send to endpoint moved inner processor to external class. --- .../apache/camel/impl/InterceptSendToEndpoint.java | 107 +-------------- .../impl/InterceptSendToEndpointProcessor.java | 143 +++++++++++++++++++++ 2 files changed, 148 insertions(+), 102 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java index b94b3eb..629803c 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java @@ -18,8 +18,6 @@ package org.apache.camel.impl; import java.util.Map; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; @@ -34,8 +32,6 @@ import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.camel.processor.PipelineHelper.continueProcessing; - /** * This is an endpoint when sending to it, is intercepted and is routed in a detour * @@ -46,7 +42,6 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService { private static final Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpoint.class); private final Endpoint delegate; - private Producer producer; private Processor detour; private boolean skip; @@ -65,6 +60,10 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService { this.detour = detour; } + public Processor getDetour() { + return detour; + } + public Endpoint getDelegate() { return delegate; } @@ -99,103 +98,7 @@ public class InterceptSendToEndpoint implements Endpoint, ShutdownableService { } public Producer createProducer() throws Exception { - producer = delegate.createProducer(); - return new DefaultAsyncProducer(delegate) { - - public Endpoint getEndpoint() { - return producer.getEndpoint(); - } - - public Exchange createExchange() { - return producer.createExchange(); - } - - public Exchange createExchange(ExchangePattern pattern) { - return producer.createExchange(pattern); - } - - @Deprecated - public Exchange createExchange(Exchange exchange) { - return producer.createExchange(exchange); - } - - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - // process the detour so we do the detour routing - if (LOG.isDebugEnabled()) { - LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), detour, exchange}); - } - // add header with the real endpoint uri - exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); - - // detour the exchange using synchronous processing - try { - detour.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - - // Decide whether to continue or not; similar logic to the Pipeline - // check for error if so we should break out - if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) { - callback.done(true); - return true; - } - - // determine if we should skip or not - boolean shouldSkip = skip; - - // if then interceptor had a when predicate, then we should only skip if it matched - Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); - if (whenMatches != null) { - shouldSkip = skip && whenMatches; - } - - if (!shouldSkip) { - if (exchange.hasOut()) { - // replace OUT with IN as detour changed something - exchange.setIn(exchange.getOut()); - exchange.setOut(null); - } - - // route to original destination leveraging the asynchronous routing engine if possible - if (producer instanceof AsyncProcessor) { - AsyncProcessor async = (AsyncProcessor) producer; - return async.process(exchange, callback); - } else { - try { - producer.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - callback.done(true); - return true; - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); - } - callback.done(true); - return true; - } - } - - public boolean isSingleton() { - return producer.isSingleton(); - } - - public void start() throws Exception { - ServiceHelper.startService(detour); - // here we also need to start the producer - ServiceHelper.startService(producer); - } - - public void stop() throws Exception { - // do not stop detour as it should only be stopped when the interceptor stops - // we should stop the producer here - ServiceHelper.stopService(producer); - } - }; + return new InterceptSendToEndpointProcessor(this, delegate, skip); } public Consumer createConsumer(Processor processor) throws Exception { diff --git a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java new file mode 100644 index 0000000..10c1052 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.camel.processor.PipelineHelper.continueProcessing; + +public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { + + private static final Logger LOG = LoggerFactory.getLogger(InterceptSendToEndpointProcessor.class); + private final InterceptSendToEndpoint endpoint; + private final Endpoint delegate; + private final Producer producer; + private final boolean skip; + + public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, boolean skip) throws Exception { + super(delegate); + this.endpoint = endpoint; + this.delegate = delegate; + this.producer = delegate.createProducer(); + this.skip = skip; + } + + public Endpoint getEndpoint() { + return producer.getEndpoint(); + } + + public Exchange createExchange() { + return producer.createExchange(); + } + + public Exchange createExchange(ExchangePattern pattern) { + return producer.createExchange(pattern); + } + + @Deprecated + public Exchange createExchange(Exchange exchange) { + return producer.createExchange(exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // process the detour so we do the detour routing + if (LOG.isDebugEnabled()) { + LOG.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", new Object[]{getEndpoint(), endpoint.getDetour(), exchange}); + } + // add header with the real endpoint uri + exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); + + if (endpoint.getDetour() != null) { + // detour the exchange using synchronous processing + try { + endpoint.getDetour().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + } + + // Decide whether to continue or not; similar logic to the Pipeline + // check for error if so we should break out + if (!continueProcessing(exchange, "skip sending to original intended destination: " + getEndpoint(), LOG)) { + callback.done(true); + return true; + } + + // determine if we should skip or not + boolean shouldSkip = skip; + + // if then interceptor had a when predicate, then we should only skip if it matched + Boolean whenMatches = (Boolean) exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED); + if (whenMatches != null) { + shouldSkip = skip && whenMatches; + } + + if (!shouldSkip) { + if (exchange.hasOut()) { + // replace OUT with IN as detour changed something + exchange.setIn(exchange.getOut()); + exchange.setOut(null); + } + + // route to original destination leveraging the asynchronous routing engine if possible + if (producer instanceof AsyncProcessor) { + AsyncProcessor async = (AsyncProcessor) producer; + return async.process(exchange, callback); + } else { + try { + producer.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Stop() means skip sending exchange to original intended destination: {} for exchange: {}", getEndpoint(), exchange); + } + callback.done(true); + return true; + } + } + + public boolean isSingleton() { + return producer.isSingleton(); + } + + public void start() throws Exception { + ServiceHelper.startService(endpoint.getDetour()); + // here we also need to start the producer + ServiceHelper.startService(producer); + } + + public void stop() throws Exception { + // do not stop detour as it should only be stopped when the interceptor stops + // we should stop the producer here + ServiceHelper.stopService(producer); + } + +} -- To stop receiving notification emails like this one, please contact davscl...@apache.org.