Author: davsclaus Date: Tue Jan 20 22:44:45 2009 New Revision: 736242 URL: http://svn.apache.org/viewvc?rev=736242&view=rev Log: CAMEL-1237: reading from file and url now supported. Added option to continiusly read from stream using scanStream. Thanks to patches from Stephen Joyner.
Added: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (with props) camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java (contents, props changed) - copied, changed from r736216, camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=736242&r1=736241&r2=736242&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Tue Jan 20 22:44:45 2009 @@ -33,13 +33,14 @@ import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * Consumer that can read from streams */ -public class StreamConsumer extends DefaultConsumer { +public class StreamConsumer extends DefaultConsumer implements Runnable { private static final transient Log LOG = LogFactory.getLog(StreamConsumer.class); private static final String TYPES = "in,file,url"; @@ -49,6 +50,7 @@ private StreamEndpoint endpoint; private String uri; private boolean initialPromptDone; + private Thread scanThread; public StreamConsumer(StreamEndpoint endpoint, Processor processor, String uri) throws Exception { super(endpoint, processor); @@ -69,36 +71,92 @@ inputStream = resolveStreamFromUrl(); } - readFromStream(); + scanThread = new Thread(this, getThreadName(endpoint.getEndpointUri())); + scanThread.setDaemon(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting thread: " + scanThread.getName()); + } + scanThread.start(); } @Override public void doStop() throws Exception { // important: do not close the stream as it will close the standard system.in etc. + if (LOG.isDebugEnabled()) { + LOG.debug("Stopping thread: " + scanThread.getName()); + } + // must use timeout to let this thread die + scanThread.join(1000); + scanThread = null; super.doStop(); } + public void run() { + try { + readFromStream(); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } + private void readFromStream() throws Exception { Charset charset = endpoint.getCharset(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset)); String line; - boolean eos = false; - while (!eos) { - if (endpoint.getPromptMessage() != null) { - doPromptMessage(); + if (endpoint.isScanStream()) { + // repeat scanning from stream + while (isRunAllowed()) { + line = br.readLine(); + if (LOG.isTraceEnabled()) { + LOG.trace("Read line: " + line); + } + boolean eos = line == null; + if (!eos && isRunAllowed()) { + processLine(line); + } + try { + Thread.sleep(endpoint.getScanStreamDelay()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } } - - line = br.readLine(); - eos = line == null; - if (!eos) { - consumeLine(line); + } else { + // regular read stream once until end of stream + boolean eos = false; + while (!eos && isRunAllowed()) { + if (endpoint.getPromptMessage() != null) { + doPromptMessage(); + } + + line = br.readLine(); + if (LOG.isTraceEnabled()) { + LOG.trace("Read line: " + line); + } + eos = line == null; + if (!eos && isRunAllowed()) { + processLine(line); + } } } // important: do not close the reader as it will close the standard system.in etc. } /** + * Strategy method for processing the line + */ + protected void processLine(Object line) throws Exception { + Exchange exchange = endpoint.createExchange(); + + Message msg = new DefaultMessage(); + msg.setBody(line); + exchange.setIn(msg); + + getProcessor().process(exchange); + } + + /** * Strategy method for prompting the prompt message */ protected void doPromptMessage() { @@ -122,31 +180,27 @@ System.out.print(endpoint.getPromptMessage()); } - private void consumeLine(Object line) throws Exception { - Exchange exchange = endpoint.createExchange(); - - Message msg = new DefaultMessage(); - msg.setBody(line); - exchange.setIn(msg); - - getProcessor().process(exchange); - } - private InputStream resolveStreamFromUrl() throws IOException { String u = endpoint.getUrl(); + ObjectHelper.notEmpty(u, "url"); + if (LOG.isDebugEnabled()) { + LOG.debug("About to read from url: " + u); + } + URL url = new URL(u); URLConnection c = url.openConnection(); return c.getInputStream(); } private InputStream resolveStreamFromFile() throws IOException { - String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file"; - File f = new File(fileName); + String fileName = endpoint.getFileName(); + ObjectHelper.notEmpty(fileName, "fileName"); if (LOG.isDebugEnabled()) { - LOG.debug("About to read from file: " + f); + LOG.debug("About to read from file: " + fileName); } - f.createNewFile(); - return new FileInputStream(f); + + File file = new File(fileName); + return new FileInputStream(file); } private void validateUri(String uri) throws IllegalArgumentException { Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=736242&r1=736241&r2=736242&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java Tue Jan 20 22:44:45 2009 @@ -29,8 +29,9 @@ public class StreamEndpoint extends DefaultEndpoint { private static final transient Log LOG = LogFactory.getLog(StreamEndpoint.class); - private String uri; - private String file; + private String fileName; + private boolean scanStream; + private long scanStreamDelay; private String url; private long delay; private String encoding; @@ -40,20 +41,18 @@ public StreamEndpoint(String endpointUri, Component component) throws Exception { super(endpointUri, component); - this.uri = endpointUri; } public StreamEndpoint(String endpointUri) { super(endpointUri); - this.uri = endpointUri; } public Consumer createConsumer(Processor processor) throws Exception { - return new StreamConsumer(this, processor, uri); + return new StreamConsumer(this, processor, getEndpointUri()); } public Producer createProducer() throws Exception { - return new StreamProducer(this, uri); + return new StreamProducer(this, getEndpointUri()); } public boolean isSingleton() { @@ -63,14 +62,22 @@ // Properties //------------------------------------------------------------------------- - public String getFile() { - return file; - } - + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + public String getUrl() { return url; } + public void setUrl(String url) { + this.url = url; + } + public long getDelay() { return delay; } @@ -111,6 +118,22 @@ this.initialPromptDelay = initialPromptDelay; } + public boolean isScanStream() { + return scanStream; + } + + public void setScanStream(boolean scanStream) { + this.scanStream = scanStream; + } + + public long getScanStreamDelay() { + return scanStreamDelay; + } + + public void setScanStreamDelay(long scanStreamDelay) { + this.scanStreamDelay = scanStreamDelay; + } + // Implementations //------------------------------------------------------------------------- Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=736242&r1=736241&r2=736242&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java Tue Jan 20 22:44:45 2009 @@ -32,6 +32,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,41 +76,44 @@ } else if ("url".equals(uri)) { outputStream = resolveStreamFromUrl(); } + writeToStream(exchange); } private OutputStream resolveStreamFromUrl() throws IOException { String u = endpoint.getUrl(); + ObjectHelper.notEmpty(u, "url"); + if (LOG.isDebugEnabled()) { + LOG.debug("About to write to url: " + u); + } + URL url = new URL(u); URLConnection c = url.openConnection(); return c.getOutputStream(); } private OutputStream resolveStreamFromFile() throws IOException { - String fileName = endpoint.getFile() != null ? endpoint.getFile().trim() : "_file"; - File f = new File(fileName); + String fileName = endpoint.getFileName(); + ObjectHelper.notEmpty(fileName, "fileName"); if (LOG.isDebugEnabled()) { - LOG.debug("About to write to file: " + f); + LOG.debug("About to write to file: " + fileName); } + File f = new File(fileName); + // will create a new file if missing or append to existing f.createNewFile(); return new FileOutputStream(f); } private OutputStream resolveStreamFromHeader(Object o, Exchange exchange) throws CamelExchangeException { - if (o != null && o instanceof OutputStream) { - return (OutputStream)o; - } else { - throw new CamelExchangeException("Expected OutputStream in header('stream'), found: " + o, - exchange); - } + return exchange.getContext().getTypeConverter().convertTo(OutputStream.class, o); } private void delay(long ms) throws InterruptedException { if (ms == 0) { return; } - if (LOG.isDebugEnabled()) { - LOG.debug("Delaying " + ms + " millis"); + if (LOG.isTraceEnabled()) { + LOG.trace("Delaying " + ms + " millis"); } Thread.sleep(ms); } @@ -129,7 +133,7 @@ // important: do not close the writer as it will close the standard system.out etc. } else if (body instanceof byte[]) { if (LOG.isDebugEnabled()) { - LOG.debug("Writing as text: " + body + " to " + outputStream); + LOG.debug("Writing as byte[]: " + body + " to " + outputStream); } outputStream.write((byte[])body); } else { Added: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=736242&view=auto ============================================================================== --- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (added) +++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Tue Jan 20 22:44:45 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.stream; + +import java.io.File; +import java.io.FileOutputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for scan stream file + */ +public class ScanStreamFileTest extends ContextTestSupport { + + private File file; + + @Override + protected void setUp() throws Exception { + deleteDirectory("./target/stream"); + createDirectory("./target/stream"); + + file = new File("./target/stream/streamfile.txt"); + file = file.getAbsoluteFile(); + file.createNewFile(); + + super.setUp(); + } + + public void testScanFile() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello", "World"); + + FileOutputStream fos = new FileOutputStream(file); + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos.write("World\n".getBytes()); + + assertMockEndpointsSatisfied(); + + fos.close(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("stream:file?fileName=./target/stream/streamfile.txt&scanStream=true&scanStreamDelay=100").to("mock:result"); + } + }; + } + +} \ No newline at end of file Propchange: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java (from r736216, camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java?p2=camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java&p1=camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java&r1=736216&r2=736242&rev=736242&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java (original) +++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java Tue Jan 20 22:44:45 2009 @@ -16,30 +16,51 @@ */ package org.apache.camel.component.stream; +import java.io.File; +import java.io.FileOutputStream; + import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; /** - * Unit test for System.out + * Unit test for stream file */ -public class StreamSystemOutTest extends ContextTestSupport { +public class StreamFileTest extends ContextTestSupport { + + private File file; + private FileOutputStream fos; + + @Override + protected void setUp() throws Exception { + deleteDirectory("./target/stream"); + createDirectory("./target/stream"); + + file = new File("./target/stream/streamfile.txt"); + file = file.getAbsoluteFile(); + file.createNewFile(); - // START SNIPPET: e1 - public void testStringContent() throws Exception { - template.sendBody("direct:in", "Hello Text World\n"); + fos = new FileOutputStream(file); + fos.write("Hello\n".getBytes()); + + super.setUp(); } - public void testBinaryContent() { - template.sendBody("direct:in", "Hello Bytes World\n".getBytes()); + public void testFile() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello"); + + assertMockEndpointsSatisfied(); + + fos.close(); } protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:in").to("stream:out"); + from("stream:file?fileName=./target/stream/streamfile.txt").to("mock:result"); } }; } - // END SNIPPET: e1 -} +} \ No newline at end of file Propchange: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java ------------------------------------------------------------------------------ svn:mergeinfo =