bq. drwxr-xr-x   - tomcat7  supergroup          0 2016-03-09 23:16 /tmp/swg

If I read the above line correctly, the size of the file was 0.

On Wed, Mar 9, 2016 at 10:00 AM, srimugunthan dhandapani <
[email protected]> wrote:

> Hi all
> I am working in cloudera CDH5.6 and  version of spark is 1.5.0-cdh5.6.0
>
> I have a strange problem that spark streaming works on a directory in
> local filesystem but doesnt work for hdfs.
>
> My spark streaming program:
>
>
> package com.oreilly.learningsparkexamples.java;
>
> import java.util.concurrent.atomic.AtomicLong;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.Duration;
>
> public class StreamingLogInput {
>
>
>
>     public static void main(String[] args) throws Exception {
>         String master = args[0];
>         String target = args[1];
>         JavaSparkContext sc = new JavaSparkContext(master,
> "StreamingLogInput");
>         // Create a StreamingContext with a 1 second batch size
>         JavaStreamingContext jssc = new JavaStreamingContext(sc, new
> Duration(1000));
>
>         if (target.equals("localfs")) {
>             JavaDStream<String> lines =
> jssc.textFileStream("file:///tmp/swg/");
>             lines.print();
>
>             lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
>                 @Override
>                 public Void call(JavaRDD<String> rdd) throws Exception {
>
>                     if (rdd.count() > 0) {
>                         //
> errorLines.dstream().saveAsTextFiles("errlines", "txt");
>
> rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
> System.nanoTime()+".out");
>
>
>                     }
>
>                     return null;
>                 }
>             });
>         } else if (target.equals("hdfs")) {
>             JavaDStream<String> lines =
> jssc.textFileStream("hdfs://hadoop-host-1:8020/tmp/swg");
>            //  JavaDStream<String> lines =
> jssc.textFileStream("tempstream");
>             lines.print();
>
>             lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
>                 @Override
>                 public Void call(JavaRDD<String> rdd) throws Exception {
>                     if (rdd.count() > 0) {
>                         //
> errorLines.dstream().saveAsTextFiles("errlines", "txt");
>
> rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
> System.nanoTime()+".out");
>                     }
>
>                     return null;
>                 }
>             });
>         }
>         // Filter our DStream for lines with "error"
>
>         // Print out the lines with errors, which causes this DStream to
> be evaluated
>         // start our streaming context and wait for it to "finish"
>         jssc.start();
>         // Wait for 10 seconds then exit. To run forever call without a
> timeout
>         jssc.awaitTermination(5 * 60 * 1000);
>         // Stop the streaming context
>         jssc.stop();
>     }
> }
>
> Testing the above program for local filesystem by the following command
> spark-submit  --class
> com.oreilly.learningsparkexamples.java.StreamingLogInput
> target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] localfs
> creates the new files in HDFS (hdfs://hadoop-host-1:8020/tmp/inputswg)
>
> but when tried with hdfs
> spark-submit  --class
> com.oreilly.learningsparkexamples.java.StreamingLogInput
> target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] hdfs
> it doesnt seem to detect the new file.
>
> I am creating the new file by manual copy.
> cp file.txt /tmp/swg (for localfs)
> hadoop fs -copyFromLocal file.txt /tmp/swg (for hdfs)
>
> I checked the permissions for /tmp/swg and it has got read permissions.
> drwxr-xr-x   - tomcat7  supergroup          0 2016-03-09 23:16 /tmp/swg
>
> whats the problem?
>
>
>

Reply via email to