Hi all
I am working in cloudera CDH5.6 and version of spark is 1.5.0-cdh5.6.0
I have a strange problem that spark streaming works on a directory in local
filesystem but doesnt work for hdfs.
My spark streaming program:
package com.oreilly.learningsparkexamples.java;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Duration;
public class StreamingLogInput {
public static void main(String[] args) throws Exception {
String master = args[0];
String target = args[1];
JavaSparkContext sc = new JavaSparkContext(master,
"StreamingLogInput");
// Create a StreamingContext with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(1000));
if (target.equals("localfs")) {
JavaDStream<String> lines =
jssc.textFileStream("file:///tmp/swg/");
lines.print();
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
if (rdd.count() > 0) {
// errorLines.dstream().saveAsTextFiles("errlines",
"txt");
rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
System.nanoTime()+".out");
}
return null;
}
});
} else if (target.equals("hdfs")) {
JavaDStream<String> lines =
jssc.textFileStream("hdfs://hadoop-host-1:8020/tmp/swg");
// JavaDStream<String> lines =
jssc.textFileStream("tempstream");
lines.print();
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
if (rdd.count() > 0) {
// errorLines.dstream().saveAsTextFiles("errlines",
"txt");
rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" +
System.nanoTime()+".out");
}
return null;
}
});
}
// Filter our DStream for lines with "error"
// Print out the lines with errors, which causes this DStream to be
evaluated
// start our streaming context and wait for it to "finish"
jssc.start();
// Wait for 10 seconds then exit. To run forever call without a
timeout
jssc.awaitTermination(5 * 60 * 1000);
// Stop the streaming context
jssc.stop();
}
}
Testing the above program for local filesystem by the following command
spark-submit --class
com.oreilly.learningsparkexamples.java.StreamingLogInput
target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] localfs
creates the new files in HDFS (hdfs://hadoop-host-1:8020/tmp/inputswg)
but when tried with hdfs
spark-submit --class
com.oreilly.learningsparkexamples.java.StreamingLogInput
target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] hdfs
it doesnt seem to detect the new file.
I am creating the new file by manual copy.
cp file.txt /tmp/swg (for localfs)
hadoop fs -copyFromLocal file.txt /tmp/swg (for hdfs)
I checked the permissions for /tmp/swg and it has got read permissions.
drwxr-xr-x - tomcat7 supergroup 0 2016-03-09 23:16 /tmp/swg
whats the problem?