Author: gertv Date: Tue Feb 3 09:16:53 2009 New Revision: 740251 URL: http://svn.apache.org/viewvc?rev=740251&view=rev Log: CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb 3 09:16:53 2009 @@ -25,6 +25,7 @@ import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; +import org.apache.camel.processor.interceptor.StreamCaching; import org.apache.camel.spi.RouteContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +67,7 @@ public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { Processor deadLetter = getDeadLetterFactory().createProcessor(); DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy()); + StreamCaching.enable(routeContext); configure(answer); return answer; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb 3 09:16:53 2009 @@ -31,6 +31,7 @@ import org.apache.camel.model.LoggingLevel; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -175,6 +176,9 @@ exchange.setException(null); } + // reset cached streams so they can be read again + MessageHelper.resetStreamCache(exchange.getIn()); + // wait until we should redeliver data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?rev=740251&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb 3 09:16:53 2009 @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import org.apache.camel.Processor; +import org.apache.camel.model.ProcessorType; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.RouteContext; + +/** + * {...@link InterceptStrategy} implementation to configure stream caching on a RouteContext + */ +public class StreamCaching implements InterceptStrategy { + + /* + * Hide constructor -- instances will be created through static enable() methods + */ + private StreamCaching() { + super(); + } + + /** + * {...@inheritdoc} + */ + @SuppressWarnings("unchecked") + public Processor wrapProcessorInInterceptors(ProcessorType processorType, Processor target) throws Exception { + return new StreamCachingInterceptor(target); + } + + /** + * Enable stream caching for a RouteContext + * + * @param context the route context + */ + public static void enable(RouteContext context) { + for (InterceptStrategy strategy : context.getInterceptStrategies()) { + if (strategy instanceof StreamCaching) { + return; + } + } + context.addInterceptStrategy(new StreamCaching()); + } +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb 3 09:16:53 2009 @@ -25,6 +25,7 @@ import org.apache.camel.model.InterceptorRef; import org.apache.camel.model.InterceptorType; import org.apache.camel.processor.Interceptor; +import org.apache.camel.util.MessageHelper; /** * {...@link Interceptor} that converts a message into a re-readable format @@ -38,9 +39,9 @@ try { StreamCache newBody = exchange.getIn().getBody(StreamCache.class); if (newBody != null) { - newBody.reset(); exchange.getIn().setBody(newBody); } + MessageHelper.resetStreamCache(exchange.getIn()); } catch (NoTypeConversionAvailableException ex) { // ignore if in is not of StreamCache type } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb 3 09:16:53 2009 @@ -90,4 +90,19 @@ } return answer; } + + /** + * If the message body contains a {...@link StreamCache} instance, reset the cache to + * enable reading from it again. + * + * @param message the message for which to reset the body + */ + public static void resetStreamCache(Message message) { + if (message == null) { + return; + } + if (message.getBody() instanceof StreamCache) { + ((StreamCache) message.getBody()).reset(); + } + } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb 3 09:16:53 2009 @@ -46,6 +46,9 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { + //TODO: remove this once the delegate processor supports async + errorHandler(noErrorHandler()); + from("dataset:foo").to("seda:queue:test?size=100"); from("seda:queue:test?size=100").to("dataset:foo"); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb 3 09:16:53 2009 @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.io.Reader; import java.io.StringReader; import javax.xml.transform.stream.StreamSource; @@ -68,23 +69,22 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - streamCaching(); errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3)); from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { count++; // Read the in stream from cache String result = exchange.getIn().getBody(String.class); - assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>"); + assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result); throw new Exception("Forced exception by unit test"); } }); //Need to set the streamCaching for the deadLetterChannel - from("direct:errorHandler").streamCaching().process(new Processor() { + from("direct:errorHandler").process(new Processor() { public void process(Exchange exchange) throws Exception { String result = exchange.getIn().getBody(String.class); - assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>"); + assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result); } }).to("mock:error"); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb 3 09:16:53 2009 @@ -104,8 +104,12 @@ outputProcessor = interceptor.getProcessor(); } + // we are not interested in any other delegate processors in the route (e.g. stream caching) + while (outputProcessor instanceof DelegateProcessor) { + outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor(); + } + assertIsInstanceOf(StreamResequencer.class, outputProcessor); } - } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb 3 09:16:53 2009 @@ -37,7 +37,7 @@ public void testSendStreamSource() throws Exception { x.expectedMessageCount(1); y.expectedMessageCount(1); - + sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>"))); sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>"))); @@ -65,7 +65,7 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:start").convertBodyTo(String.class).choice() + from("direct:start").choice() .when().xpath("/message/text() = 'xx'").to("mock:x") .when().xpath("/message/text() = 'yy'").to("mock:y"); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740251&r1=740250&r2=740251&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb 3 09:16:53 2009 @@ -20,12 +20,16 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.ProcessorType; +import org.apache.camel.processor.interceptor.Tracer; +import org.apache.camel.spi.InterceptStrategy; /** * @version $Revision$ @@ -141,7 +145,8 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - inheritErrorHandler(false); + //TODO: revert this once we get DelegateProcessor to support async + setErrorHandlerBuilder(noErrorHandler()); // START SNIPPET: example from("direct:a").thread(1).process(new Processor() { Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java?rev=740251&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java Tue Feb 3 09:16:53 2009 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.interceptor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.impl.DefaultRouteContext; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.spi.RouteContext; + +/** + * Test cases for {...@link StreamCaching} + */ +public class StreamCachingTest extends ContextTestSupport { + + /** + * Tests enabling stream caching on a {...@link RouteContext} + */ + public void testEnableOnRouteContext() throws Exception { + RouteContext rc = new DefaultRouteContext(super.context); + StreamCaching.enable(rc); + assertStrategyEnabled("Enabling StreamCaching should add it to the intercept strategies", rc); + StreamCaching.enable(rc); + assertStrategyEnabled("Enabling it again should not add a second instance", rc); + } + + /* + * Assert that the strategy is enabled exactly one time + */ + private void assertStrategyEnabled(String message, RouteContext rc) { + int count = 0; + for (InterceptStrategy strategy : rc.getInterceptStrategies()) { + if (strategy instanceof StreamCaching) { + count++; + } + } + assertEquals(message, 1, count); + } + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java?rev=740251&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java Tue Feb 3 09:16:53 2009 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import junit.framework.TestCase; + +import org.apache.camel.Message; +import org.apache.camel.converter.stream.StreamCache; +import org.apache.camel.impl.DefaultMessage; + +/** + * Test cases for {...@link MessageHelper} + */ +public class MessageHelperTest extends TestCase { + + private Message message; + + @Override + protected void setUp() throws Exception { + message = new DefaultMessage(); + } + + /* + * Tests the {...@link MessageHelper#resetStreamCache(Message)} method + */ + public void testResetStreamCache() throws Exception { + // should not throw exceptions when Message or message body is null + MessageHelper.resetStreamCache((Message) null); + MessageHelper.resetStreamCache(message); + + // handle StreamCache + final ValueHolder<Boolean> reset = new ValueHolder<Boolean>(Boolean.FALSE); + message.setBody(new StreamCache() { + public void reset() { + reset.set(Boolean.TRUE); + } + }); + MessageHelper.resetStreamCache(message); + assertTrue("Should have reset the stream cache", reset.get()); + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java ------------------------------------------------------------------------------ svn:eol-style = native