Author: davsclaus
Date: Tue Jan 20 22:44:45 2009
New Revision: 736242

URL: http://svn.apache.org/viewvc?rev=736242&view=rev
Log:
CAMEL-1237: reading from file and url now supported. Added option to 
continiusly read from stream using scanStream. Thanks to patches from Stephen 
Joyner.

Added:
    
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
   (with props)
    
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
   (contents, props changed)
      - copied, changed from r736216, 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
Modified:
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
    
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=736242&r1=736241&r2=736242&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 Tue Jan 20 22:44:45 2009
@@ -33,13 +33,14 @@
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Consumer that can read from streams
  */
-public class StreamConsumer extends DefaultConsumer {
+public class StreamConsumer extends DefaultConsumer implements Runnable {
 
     private static final transient Log LOG = 
LogFactory.getLog(StreamConsumer.class);
     private static final String TYPES = "in,file,url";
@@ -49,6 +50,7 @@
     private StreamEndpoint endpoint;
     private String uri;
     private boolean initialPromptDone;
+    private Thread scanThread;
 
     public StreamConsumer(StreamEndpoint endpoint, Processor processor, String 
uri) throws Exception {
         super(endpoint, processor);
@@ -69,36 +71,92 @@
             inputStream = resolveStreamFromUrl();
         }
 
-        readFromStream();
+        scanThread = new Thread(this, 
getThreadName(endpoint.getEndpointUri()));
+        scanThread.setDaemon(true);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Starting thread: " + scanThread.getName());
+        }
+        scanThread.start();
     }
 
     @Override
     public void doStop() throws Exception {
         // important: do not close the stream as it will close the standard 
system.in etc.
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Stopping thread: " + scanThread.getName());
+        }
+        // must use timeout to let this thread die
+        scanThread.join(1000);
+        scanThread = null;
         super.doStop();
     }
 
+    public void run() {
+        try {
+            readFromStream();
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
+        }
+    }
+
     private void readFromStream() throws Exception {
         Charset charset = endpoint.getCharset();
         BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream, charset));
         String line;
 
-        boolean eos = false;
-        while (!eos) {
-            if (endpoint.getPromptMessage() != null) {
-                doPromptMessage();
+        if (endpoint.isScanStream()) {
+            // repeat scanning from stream
+            while (isRunAllowed()) {
+                line = br.readLine();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Read line: " + line);
+                }
+                boolean eos = line == null;
+                if (!eos && isRunAllowed()) {
+                    processLine(line);
+                }
+                try {
+                    Thread.sleep(endpoint.getScanStreamDelay());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
             }
-
-            line = br.readLine();
-            eos = line == null;
-            if (!eos) {
-                consumeLine(line);
+        } else {
+            // regular read stream once until end of stream
+            boolean eos = false;
+            while (!eos && isRunAllowed()) {
+                if (endpoint.getPromptMessage() != null) {
+                    doPromptMessage();
+                }
+
+                line = br.readLine();
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Read line: " + line);
+                }
+                eos = line == null;
+                if (!eos && isRunAllowed()) {
+                    processLine(line);
+                }
             }
         }
         // important: do not close the reader as it will close the standard 
system.in etc.
     }
 
     /**
+     * Strategy method for processing the line
+     */
+    protected void processLine(Object line) throws Exception {
+        Exchange exchange = endpoint.createExchange();
+
+        Message msg = new DefaultMessage();
+        msg.setBody(line);
+        exchange.setIn(msg);
+
+        getProcessor().process(exchange);
+    }
+
+    /**
      * Strategy method for prompting the prompt message
      */
     protected void doPromptMessage() {
@@ -122,31 +180,27 @@
         System.out.print(endpoint.getPromptMessage());
     }
 
-    private void consumeLine(Object line) throws Exception {
-        Exchange exchange = endpoint.createExchange();
-
-        Message msg = new DefaultMessage();
-        msg.setBody(line);
-        exchange.setIn(msg);
-
-        getProcessor().process(exchange);
-    }
-
     private InputStream resolveStreamFromUrl() throws IOException {
         String u = endpoint.getUrl();
+        ObjectHelper.notEmpty(u, "url");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("About to read from url: " + u);
+        }
+
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getInputStream();
     }
 
     private InputStream resolveStreamFromFile() throws IOException {
-        String fileName = endpoint.getFile() != null ? 
endpoint.getFile().trim() : "_file";
-        File f = new File(fileName);
+        String fileName = endpoint.getFileName();
+        ObjectHelper.notEmpty(fileName, "fileName");
         if (LOG.isDebugEnabled()) {
-            LOG.debug("About to read from file: " + f);
+            LOG.debug("About to read from file: " + fileName);
         }
-        f.createNewFile();
-        return new FileInputStream(f);
+
+        File file = new File(fileName);
+        return new FileInputStream(file);
     }
 
     private void validateUri(String uri) throws IllegalArgumentException {

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java?rev=736242&r1=736241&r2=736242&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 Tue Jan 20 22:44:45 2009
@@ -29,8 +29,9 @@
 public class StreamEndpoint extends DefaultEndpoint {
     private static final transient Log LOG = 
LogFactory.getLog(StreamEndpoint.class);
 
-    private String uri;
-    private String file;
+    private String fileName;
+    private boolean scanStream;
+    private long scanStreamDelay;
     private String url;
     private long delay;
     private String encoding;
@@ -40,20 +41,18 @@
 
     public StreamEndpoint(String endpointUri, Component component) throws 
Exception {
         super(endpointUri, component);
-        this.uri = endpointUri;
     }
 
     public StreamEndpoint(String endpointUri) {
         super(endpointUri);
-        this.uri = endpointUri;
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new StreamConsumer(this, processor, uri);
+        return new StreamConsumer(this, processor, getEndpointUri());
     }
 
     public Producer createProducer() throws Exception {
-        return new StreamProducer(this, uri);
+        return new StreamProducer(this, getEndpointUri());
     }
 
     public boolean isSingleton() {
@@ -63,14 +62,22 @@
     // Properties
     //-------------------------------------------------------------------------
 
-    public String getFile() {
-        return file;
-    }   
-    
+    public String getFileName() {
+        return fileName;
+    }
+
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
     public String getUrl() {
         return url;
     }
 
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
     public long getDelay() {
         return delay;
     }
@@ -111,6 +118,22 @@
         this.initialPromptDelay = initialPromptDelay;
     }
 
+    public boolean isScanStream() {
+        return scanStream;
+    }
+
+    public void setScanStream(boolean scanStream) {
+        this.scanStream = scanStream;
+    }
+
+    public long getScanStreamDelay() {
+        return scanStreamDelay;
+    }
+
+    public void setScanStreamDelay(long scanStreamDelay) {
+        this.scanStreamDelay = scanStreamDelay;
+    }
+
     // Implementations
     //-------------------------------------------------------------------------
 

Modified: 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java?rev=736242&r1=736241&r2=736242&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 Tue Jan 20 22:44:45 2009
@@ -32,6 +32,7 @@
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -75,41 +76,44 @@
         } else if ("url".equals(uri)) {
             outputStream = resolveStreamFromUrl();
         }
+
         writeToStream(exchange);
     }
 
     private OutputStream resolveStreamFromUrl() throws IOException {
         String u = endpoint.getUrl();
+        ObjectHelper.notEmpty(u, "url");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("About to write to url: " + u);
+        }
+
         URL url = new URL(u);
         URLConnection c = url.openConnection();
         return c.getOutputStream();
     }
 
     private OutputStream resolveStreamFromFile() throws IOException {
-        String fileName = endpoint.getFile() != null ? 
endpoint.getFile().trim() : "_file";
-        File f = new File(fileName);
+        String fileName = endpoint.getFileName();
+        ObjectHelper.notEmpty(fileName, "fileName");
         if (LOG.isDebugEnabled()) {
-            LOG.debug("About to write to file: " + f);
+            LOG.debug("About to write to file: " + fileName);
         }
+        File f = new File(fileName);
+        // will create a new file if missing or append to existing
         f.createNewFile();
         return new FileOutputStream(f);
     }
 
     private OutputStream resolveStreamFromHeader(Object o, Exchange exchange) 
throws CamelExchangeException {
-        if (o != null && o instanceof OutputStream) {
-            return (OutputStream)o;
-        } else {
-            throw new CamelExchangeException("Expected OutputStream in 
header('stream'), found: " + o,
-                exchange);
-        }
+        return 
exchange.getContext().getTypeConverter().convertTo(OutputStream.class, o);
     }
 
     private void delay(long ms) throws InterruptedException {
         if (ms == 0) {
             return;
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Delaying " + ms + " millis");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Delaying " + ms + " millis");
         }
         Thread.sleep(ms);
     }
@@ -129,7 +133,7 @@
             // important: do not close the writer as it will close the 
standard system.out etc.
         } else if (body instanceof byte[]) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Writing as text: " + body + " to " + outputStream);
+                LOG.debug("Writing as byte[]: " + body + " to " + 
outputStream);
             }
             outputStream.write((byte[])body);
         } else {

Added: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=736242&view=auto
==============================================================================
--- 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
 (added)
+++ 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
 Tue Jan 20 22:44:45 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for scan stream file
+ */
+public class ScanStreamFileTest extends ContextTestSupport {
+
+    private File file;
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("./target/stream");
+        createDirectory("./target/stream");
+
+        file = new File("./target/stream/streamfile.txt");
+        file = file.getAbsoluteFile();
+        file.createNewFile();
+
+        super.setUp();
+    }
+
+    public void testScanFile() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello", "World");
+
+        FileOutputStream fos = new FileOutputStream(file);
+        fos.write("Hello\n".getBytes());
+        Thread.sleep(150);
+        fos.write("World\n".getBytes());
+
+        assertMockEndpointsSatisfied();
+
+        fos.close();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                
from("stream:file?fileName=./target/stream/streamfile.txt&scanStream=true&scanStreamDelay=100").to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
 (from r736216, 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java?p2=camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java&p1=camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java&r1=736216&r2=736242&rev=736242&view=diff
==============================================================================
--- 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamSystemOutTest.java
 (original)
+++ 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
 Tue Jan 20 22:44:45 2009
@@ -16,30 +16,51 @@
  */
 package org.apache.camel.component.stream;
 
+import java.io.File;
+import java.io.FileOutputStream;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for System.out
+ * Unit test for stream file
  */
-public class StreamSystemOutTest extends ContextTestSupport {
+public class StreamFileTest extends ContextTestSupport {
+
+    private File file;
+    private FileOutputStream fos;
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("./target/stream");
+        createDirectory("./target/stream");
+
+        file = new File("./target/stream/streamfile.txt");
+        file = file.getAbsoluteFile();
+        file.createNewFile();
 
-    // START SNIPPET: e1
-    public void testStringContent() throws Exception {
-        template.sendBody("direct:in", "Hello Text World\n");
+        fos = new FileOutputStream(file);
+        fos.write("Hello\n".getBytes());
+
+        super.setUp();
     }
 
-    public void testBinaryContent() {
-        template.sendBody("direct:in", "Hello Bytes World\n".getBytes());
+    public void testFile() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello");
+
+        assertMockEndpointsSatisfied();
+
+        fos.close();
     }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:in").to("stream:out");
+                
from("stream:file?fileName=./target/stream/streamfile.txt").to("mock:result");
             }
         };
     }
-    // END SNIPPET: e1
 
-}
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: 
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamFileTest.java
------------------------------------------------------------------------------
    svn:mergeinfo = 


Reply via email to