bq. drwxr-xr-x - tomcat7 supergroup 0 2016-03-09 23:16 /tmp/swg
If I read the above line correctly, the size of the file was 0. On Wed, Mar 9, 2016 at 10:00 AM, srimugunthan dhandapani < [email protected]> wrote: > Hi all > I am working in cloudera CDH5.6 and version of spark is 1.5.0-cdh5.6.0 > > I have a strange problem that spark streaming works on a directory in > local filesystem but doesnt work for hdfs. > > My spark streaming program: > > > package com.oreilly.learningsparkexamples.java; > > import java.util.concurrent.atomic.AtomicLong; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.streaming.Duration; > > public class StreamingLogInput { > > > > public static void main(String[] args) throws Exception { > String master = args[0]; > String target = args[1]; > JavaSparkContext sc = new JavaSparkContext(master, > "StreamingLogInput"); > // Create a StreamingContext with a 1 second batch size > JavaStreamingContext jssc = new JavaStreamingContext(sc, new > Duration(1000)); > > if (target.equals("localfs")) { > JavaDStream<String> lines = > jssc.textFileStream("file:///tmp/swg/"); > lines.print(); > > lines.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> rdd) throws Exception { > > if (rdd.count() > 0) { > // > errorLines.dstream().saveAsTextFiles("errlines", "txt"); > > rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" + > System.nanoTime()+".out"); > > > } > > return null; > } > }); > } else if (target.equals("hdfs")) { > JavaDStream<String> lines = > jssc.textFileStream("hdfs://hadoop-host-1:8020/tmp/swg"); > // JavaDStream<String> lines = > jssc.textFileStream("tempstream"); > lines.print(); > > lines.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> rdd) throws Exception { > if (rdd.count() > 0) { > // > errorLines.dstream().saveAsTextFiles("errlines", "txt"); > > rdd.saveAsTextFile("hdfs://hadoop-host-1:8020/tmp/inputswg/file-" + > System.nanoTime()+".out"); > } > > return null; > } > }); > } > // Filter our DStream for lines with "error" > > // Print out the lines with errors, which causes this DStream to > be evaluated > // start our streaming context and wait for it to "finish" > jssc.start(); > // Wait for 10 seconds then exit. To run forever call without a > timeout > jssc.awaitTermination(5 * 60 * 1000); > // Stop the streaming context > jssc.stop(); > } > } > > Testing the above program for local filesystem by the following command > spark-submit --class > com.oreilly.learningsparkexamples.java.StreamingLogInput > target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] localfs > creates the new files in HDFS (hdfs://hadoop-host-1:8020/tmp/inputswg) > > but when tried with hdfs > spark-submit --class > com.oreilly.learningsparkexamples.java.StreamingLogInput > target/StreamingLogInput-1.0-SNAPSHOT.jar local[4] hdfs > it doesnt seem to detect the new file. > > I am creating the new file by manual copy. > cp file.txt /tmp/swg (for localfs) > hadoop fs -copyFromLocal file.txt /tmp/swg (for hdfs) > > I checked the permissions for /tmp/swg and it has got read permissions. > drwxr-xr-x - tomcat7 supergroup 0 2016-03-09 23:16 /tmp/swg > > whats the problem? > > >
