You need to put some files in the location *(/user/huser/user/huser/flume)* once the job starts running, then only it will print. also note i missed the / in the above code.
Thanks Best Regards On Wed, Jan 7, 2015 at 4:42 PM, Jeniba Johnson < jeniba.john...@lntinfotech.com> wrote: > Hi Akhil, > > > > I had used flat map method, so that the lines from a file will be printed > as soon as I tailed it from flume to HDFS. > > Using the below mentioned code, the lines from a file are not being > printed. > > > > *Output* > > *Welcome TO Flume Streaming* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: metadataCleanupDelay = -1* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: metadataCleanupDelay = -1* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: metadataCleanupDelay = > -1* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: Slide time = 20000 ms* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: Storage level = > StorageLevel(false, false, false, false, 1)* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: Checkpoint interval = > null* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: Remember duration = > 20000 ms* > > *15/01/07 22:32:46 INFO dstream.FileInputDStream: Initialized and > validated org.apache.spark.streaming.dstream.FileInputDStream@6c8185d3* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: Slide time = 20000 ms* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: Storage level = > StorageLevel(false, false, false, false, 1)* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: Checkpoint interval = null* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: Remember duration = 20000 > ms* > > *15/01/07 22:32:46 INFO dstream.MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@2b79174c* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: Slide time = 20000 ms* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: Storage level = > StorageLevel(false, false, false, false, 1)* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: Checkpoint interval = null* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: Remember duration = 20000 > ms* > > *15/01/07 22:32:46 INFO dstream.ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@1ae894e0* > > *15/01/07 22:32:46 INFO util.RecurringTimer: Started timer for > JobGenerator at time 1420650180000* > > *15/01/07 22:32:46 INFO scheduler.JobGenerator: Started JobGenerator at > 1420650180000 ms* > > *15/01/07 22:32:46 INFO scheduler.JobScheduler: Started JobScheduler* > > *15/01/07 22:33:00 INFO dstream.FileInputDStream: Finding new files took > 347 ms* > > *15/01/07 22:33:00 INFO dstream.FileInputDStream: New files at time > 1420650180000 ms:* > > > > *15/01/07 22:33:00 INFO scheduler.JobScheduler: Added jobs for time > 1420650180000 ms* > > *15/01/07 22:33:00 INFO scheduler.JobScheduler: Starting job streaming job > 1420650180000 ms.0 from job set of time 1420650180000 ms* > > *-------------------------------------------* > > *Time: 1420650180000 ms* > > *-------------------------------------------* > > > > *15/01/07 22:33:00 INFO scheduler.JobScheduler: Finished job streaming job > 1420650180000 ms.0 from job set of time 1420650180000 ms* > > *15/01/07 22:33:00 INFO scheduler.JobScheduler: Total delay: 0.424 s for > time 1420650180000 ms (execution: 0.017 s)* > > *15/01/07 22:33:00 INFO dstream.FileInputDStream: Cleared 0 old files that > were older than 1420650160000 ms:* > > *15/01/07 22:33:20 INFO dstream.FileInputDStream: Finding new files took 9 > ms* > > *15/01/07 22:33:20 INFO dstream.FileInputDStream: New files at time > 1420650200000 ms:* > > > > *-------------------------------------------* > > *15/01/07 22:33:20 INFO scheduler.JobScheduler: Starting job streaming job > 1420650200000 ms.0 from job set of time 1420650200000 ms* > > *15/01/07 22:33:20 INFO scheduler.JobScheduler: Added jobs for time > 1420650200000 ms* > > *Time: 1420650200000 ms* > > > > Regards, > > Jeniba Johnson > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Wednesday, January 07, 2015 4:17 PM > *To:* Jeniba Johnson; user@spark.apache.org > > *Subject:* Re: Reading Data Using TextFileStream > > > > How about the following code? I'm not quiet sure what you were doing > inside the flatmap and foreach. > > > > import org.apache.spark.SparkConf; > > import org.apache.spark.api.java.JavaRDD; > > import org.apache.spark.api.java.function.FlatMapFunction; > > import org.apache.spark.api.java.function.Function; > > import org.apache.spark.streaming.Duration; > > import org.apache.spark.streaming.api.java.JavaDStream; > > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > > > import com.google.common.collect.Lists; > > > > import java.util.Arrays; > > import java.util.List; > > import java.util.regex.Pattern; > > > > public final class Test1 { > > public static void main(String[] args) throws Exception { > > > > SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); > > JavaStreamingContext ssc = new > JavaStreamingContext("local[4]","JavaWordCount", new Duration(20000)); > > > > JavaDStream<String> textStream = > ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in > HDFS > > > > > > textStream.print(); > > > > System.out.println("Welcome TO Flume Streaming"); > > ssc.start(); > > ssc.awaitTermination(); > > } > > > > } > > > Thanks > > Best Regards > > > > On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson < > jeniba.john...@lntinfotech.com> wrote: > > Hi Akhil, > > > > I had missed the forward slash in the directory part. After correcting the > directory path ,Now Iam facing with the below mentioned error. > > Can anyone help me with this issue. > > > > 15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took > 360 ms > > 15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time > 1420647920000 ms: > > > > 15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time > 1420647920000 ms > > 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job > 1420647920000 ms.0 from job set of time 1420647920000 ms > > ------------------------------------------- > > Time: 1420647920000 ms > > ------------------------------------------- > > > > 15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job > 1420647920000 ms.0 from job set of time 1420647920000 ms > > 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job > 1420647920000 ms.1 from job set of time 1420647920000 ms > > 15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job > streaming job 1420647920000 ms.1 > > java.lang.UnsupportedOperationException: empty collection > > at org.apache.spark.rdd.RDD.first(RDD.scala:1094) > > at > org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433) > > at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32) > > at xyz.Test1$2.call(Test1.java:67) > > at xyz.Test1$2.call(Test1.java:1) > > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) > > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > > at scala.util.Try$.apply(Try.scala:161) > > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:722) > > > > > > Regards, > > Jeniba Johnson > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Wednesday, January 07, 2015 12:11 PM > *To:* Jeniba Johnson > *Cc:* Hari Shreedharan (hshreedha...@cloudera.com); d...@spark.apache.org > *Subject:* Re: Reading Data Using TextFileStream > > > > I think you need to start your streaming job, then put the files there to > get them read. textFileStream doesn't read the existing files i believe. > > > > Also are you sure the path is not the following? (no missing / in the > beginning?) > > > > JavaDStream<String> textStream = ssc.textFileStream("*/*user/ > huser/user/huser/flume"); > > > Thanks > > Best Regards > > > > On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson < > jeniba.john...@lntinfotech.com> wrote: > > > Hi Hari, > > Iam trying to read data from a file which is stored in HDFS. Using Flume > the data is tailed and stored in HDFS. > Now I want to read this data using TextFileStream. Using the below > mentioned code Iam not able to fetch the > Data from a file which is stored in HDFS. Can anyone help me with this > issue. > > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.FlatMapFunction; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.Duration; > import org.apache.spark.streaming.api.java.JavaDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > import com.google.common.collect.Lists; > > import java.util.Arrays; > import java.util.List; > import java.util.regex.Pattern; > > public final class Test1 { > public static void main(String[] args) throws Exception { > > SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); > JavaStreamingContext ssc = new > JavaStreamingContext("local[4]","JavaWordCount", new Duration(20000)); > > JavaDStream<String> textStream = > ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in > HDFS > > > JavaDStream<String> suspectedStream = textStream.flatMap(new > FlatMapFunction<String,String>() > { > public Iterable<String> call(String line) > throws Exception { > > //return > Arrays.asList(line.toString().toString()); > return > Lists.newArrayList(line.toString().toString()); > } > }); > > > suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){ > > public Void call(JavaRDD<String> rdd) throws Exception { > List<String> output = rdd.collect(); > System.out.println("Sentences Collected from Flume " + output); > return null; > } > }); > > suspectedStream.print(); > > System.out.println("Welcome TO Flume Streaming"); > ssc.start(); > ssc.awaitTermination(); > } > > } > > The command I use is: > ./bin/spark-submit --verbose --jars > lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*] > --deploy-mode client --class xyz.Test1 bin/filestream3.jar > > > > > > Regards, > Jeniba Johnson > > > ________________________________ > The contents of this e-mail and any attachment(s) may contain confidential > or privileged information for the intended recipient(s). Unintended > recipients are prohibited from taking action on the basis of information in > this e-mail and using or disseminating the information, and must notify the > sender and delete it from their system. L&T Infotech will not accept > responsibility or liability for the accuracy or completeness of, or the > presence of any virus or disabling code in this e-mail" > > > > >