CAMEL-8386: WireTap should copy the body if the body is a stream cache.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4661cbb9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4661cbb9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4661cbb9

Branch: refs/heads/master
Commit: 4661cbb94513d6047e58581b23dcd4a6fad166f7
Parents: 686bb35
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sun Feb 22 16:11:41 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Feb 22 16:11:41 2015 +0100

----------------------------------------------------------------------
 .../camel/processor/WireTapProcessor.java       | 27 +++++-
 .../processor/MulticastStreamCachingTest.java   |  2 +-
 .../processor/WireTapStreamCachingTest.java     | 91 ++++++++++++++++++++
 3 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4661cbb9/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index ab9ca15..3f71226 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -28,7 +29,9 @@ import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.support.ServiceSupport;
@@ -93,7 +96,16 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
         }
 
         // must configure the wire tap beforehand
-        final Exchange wireTapExchange = configureExchange(exchange, 
exchangePattern);
+        Exchange target;
+        try {
+            target = configureExchange(exchange, exchangePattern);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        final Exchange wireTapExchange = target;
 
         // send the exchange to the destination using an executor service
         executorService.submit(new Callable<Exchange>() {
@@ -114,7 +126,7 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
     }
 
 
-    protected Exchange configureExchange(Exchange exchange, ExchangePattern 
pattern) {
+    protected Exchange configureExchange(Exchange exchange, ExchangePattern 
pattern) throws IOException {
         Exchange answer;
         if (copy) {
             // use a copy of the original exchange
@@ -145,6 +157,17 @@ public class WireTapProcessor extends ServiceSupport 
implements AsyncProcessor,
             }
         }
 
+        // if the body is a stream cache we must use a copy of the stream in 
the wire tapped exchange
+        Message msg = answer.hasOut() ? answer.getOut() : answer.getIn();
+        if (msg.getBody() instanceof StreamCache) {
+            // in parallel processing case, the stream must be copied, 
therefore get the stream
+            StreamCache cache = (StreamCache) msg.getBody();
+            StreamCache copied = cache.copy();
+            if (copied != null) {
+                msg.setBody(copied);
+            }
+        }
+
         // invoke on prepare on the exchange if specified
         if (onPrepare != null) {
             try {

http://git-wip-us.apache.org/repos/asf/camel/blob/4661cbb9/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
index 281b87c..8efa5a1 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
@@ -80,7 +80,7 @@ public class MulticastStreamCachingTest extends 
ContextTestSupport {
 
                 
errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3));
 
-                //stream caching should fix re-readability issues when 
multicasting messags
+                //stream caching should fix re-readability issues when 
multicasting messages
                 from("direct:a").multicast().to("direct:x", "direct:y", 
"direct:z");
 
                 from("direct:x").process(processor).to("mock:x");

http://git-wip-us.apache.org/repos/asf/camel/blob/4661cbb9/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
new file mode 100644
index 0000000..0a87c13
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/WireTapStreamCachingTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.StringReader;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class WireTapStreamCachingTest extends ContextTestSupport {
+    protected Endpoint startEndpoint;
+    protected MockEndpoint x;
+    protected MockEndpoint y;
+    protected MockEndpoint z;
+
+    public void testSendingAMessageUsingWiretapConvertsToReReadable() throws 
Exception {
+        x.expectedBodiesReceived("<input/>+output");
+        y.expectedBodiesReceived("<input/>+output");
+        z.expectedBodiesReceived("<input/>+output");
+
+        template.send("direct:a", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(new StreamSource(new StringReader("<input/>")));
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        z = getMockEndpoint("mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        final Processor processor = new Processor() {
+            public void process(Exchange exchange) {
+                // lets transform the IN message
+                Message in = exchange.getIn();
+                String body = in.getBody(String.class);
+                in.setBody(body + "+output");
+            }
+        };
+
+        return new RouteBuilder() {
+            public void configure() {
+                // enable stream caching
+                context.setStreamCaching(true);
+
+                
errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(3));
+
+                //stream caching should fix re-readability issues when wire 
tapping messages
+                
from("direct:a").wireTap("direct:x").wireTap("direct:y").wireTap("direct:z");
+
+                from("direct:x").process(processor).to("mock:x");
+                from("direct:y").process(processor).to("mock:y");
+                from("direct:z").process(processor).to("mock:z");
+            }
+        };
+    }
+}

Reply via email to