Hi all
I am working in cloudera CDH5.6 and  version of spark is 1.5.0-cdh5.6.0

I have a strange problem that spark streaming works on a directory in local
filesystem but doesnt work for hdfs.

My spark streaming program:


package com.oreilly.learningsparkexamples.java;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Duration;

public class StreamingLogInput {



    public static void main(String[] args) throws Exception {
        String master = args[0];
        String target = args[1];
        JavaSparkContext sc = new JavaSparkContext(master,
"StreamingLogInput");
        // Create a StreamingContext with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(1000));

        if (target.equals("localfs")) {
            JavaDStream<String> lines =
jssc.textFileStream("file:///tmp/swg/");
            lines.print();

            lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
                @Override
                public Void call(JavaRDD<String> rdd) throws Exception {

                    if (rdd.count() > 0) {
                        // errorLines.dstream().saveAsTextFiles("errlines",
"txt");

rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
System.nanoTime()+".out");


                    }

                    return null;
                }
            });
        } else if (target.equals("hdfs")) {
            JavaDStream<String> lines =
jssc.textFileStream("hdfs://hadoop-host-1:8020/tmp/swg");
           //  JavaDStream<String> lines =
jssc.textFileStream("tempstream");
            lines.print();

            lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
                @Override
                public Void call(JavaRDD<String> rdd) throws Exception {
                    if (rdd.count() > 0) {
                        // errorLines.dstream().saveAsTextFiles("errlines",
"txt");

rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
System.nanoTime()+".out");
                    }

                    return null;
                }
            });
        }
        // Filter our DStream for lines with "error"

        // Print out the lines with errors, which causes this DStream to be
evaluated
        // start our streaming context and wait for it to "finish"
        jssc.start();
        // Wait for 10 seconds then exit. To run forever call without a
timeout
        jssc.awaitTermination(5 * 60 * 1000);
        // Stop the streaming context
        jssc.stop();
    }
}

Testing the above program for local filesystem by the following command
spark-submit  --class
com.oreilly.learningsparkexamples.java.StreamingLogInput
target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] localfs
creates the new files in HDFS (hdfs://hadoop-host-1:8020/tmp/inputswg)

but when tried with hdfs
spark-submit  --class
com.oreilly.learningsparkexamples.java.StreamingLogInput
target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] hdfs
it doesnt seem to detect the new file.

I am creating the new file by manual copy.
cp file.txt /tmp/swg (for localfs)
hadoop fs -copyFromLocal file.txt /tmp/swg (for hdfs)

I checked the permissions for /tmp/swg and it has got read permissions.
drwxr-xr-x   - tomcat7  supergroup          0 2016-03-09 23:16 /tmp/swg

whats the problem?

Reply via email to