How about the following code? I'm not quiet sure what you were doing inside
the flatmap and foreach.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class Test1 {
  public static void main(String[] args) throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaStreamingContext ssc = new
JavaStreamingContext("local[4]","JavaWordCount",
new Duration(20000));

    JavaDStream<String> textStream = ssc.textFileStream("user/
huser/user/huser/flume");//Data Directory Path in HDFS


    textStream.print();

    System.out.println("Welcome TO Flume Streaming");
    ssc.start();
    ssc.awaitTermination();
  }

}


Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson <
jeniba.john...@lntinfotech.com> wrote:

> Hi Akhil,
>
>
>
> I had missed the forward slash in the directory part. After correcting the
> directory path ,Now Iam facing with the below mentioned error.
>
> Can anyone help me with this issue.
>
>
>
> 15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took
> 360 ms
>
> 15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time
> 1420647920000 ms:
>
>
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time
> 1420647920000 ms
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
> 1420647920000 ms.0 from job set of time 1420647920000 ms
>
> -------------------------------------------
>
> Time: 1420647920000 ms
>
> -------------------------------------------
>
>
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job
> 1420647920000 ms.0 from job set of time 1420647920000 ms
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
> 1420647920000 ms.1 from job set of time 1420647920000 ms
>
> 15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job
> streaming job 1420647920000 ms.1
>
> java.lang.UnsupportedOperationException: empty collection
>
>         at org.apache.spark.rdd.RDD.first(RDD.scala:1094)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433)
>
>         at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
>
>         at xyz.Test1$2.call(Test1.java:67)
>
>         at xyz.Test1$2.call(Test1.java:1)
>
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
>
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
>
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
>
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
>         at scala.util.Try$.apply(Try.scala:161)
>
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>         at java.lang.Thread.run(Thread.java:722)
>
>
>
>
>
> Regards,
>
> Jeniba Johnson
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Wednesday, January 07, 2015 12:11 PM
> *To:* Jeniba Johnson
> *Cc:* Hari Shreedharan (hshreedha...@cloudera.com); d...@spark.apache.org
> *Subject:* Re: Reading Data Using TextFileStream
>
>
>
> I think you need to start your streaming job, then put the files there to
> get them read. textFileStream doesn't read the existing files i believe.
>
>
>
> Also are you sure the path is not the following? (no missing / in the
> beginning?)
>
>
>
> JavaDStream<String> textStream = ssc.textFileStream("*/*user/
> huser/user/huser/flume");
>
>
> Thanks
>
> Best Regards
>
>
>
> On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson <
> jeniba.john...@lntinfotech.com> wrote:
>
>
> Hi Hari,
>
> Iam trying to read data from a file which is stored in HDFS. Using Flume
> the data is tailed and stored in HDFS.
> Now I want to read this data using TextFileStream. Using the below
> mentioned code Iam not able to fetch the
> Data  from a file which is stored in HDFS. Can anyone help me with this
> issue.
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.FlatMapFunction;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import com.google.common.collect.Lists;
>
> import java.util.Arrays;
> import java.util.List;
> import java.util.regex.Pattern;
>
> public final class Test1 {
>   public static void main(String[] args) throws Exception {
>
>     SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
>     JavaStreamingContext ssc = new
> JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));
>
>     JavaDStream<String> textStream =
> ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in
> HDFS
>
>
>     JavaDStream<String> suspectedStream = textStream.flatMap(new
> FlatMapFunction<String,String>()
>      {
>                             public Iterable<String> call(String line)
> throws Exception {
>
>                             //return
> Arrays.asList(line.toString().toString());
>                            return
> Lists.newArrayList(line.toString().toString());
>                              }
>      });
>
>
>     suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){
>
>         public Void call(JavaRDD<String> rdd) throws Exception {
>         List<String> output = rdd.collect();
>         System.out.println("Sentences Collected from Flume " + output);
>                return  null;
>         }
>         });
>
>     suspectedStream.print();
>
>     System.out.println("Welcome TO Flume Streaming");
>     ssc.start();
>     ssc.awaitTermination();
>   }
>
> }
>
> The command I use is:
> ./bin/spark-submit --verbose --jars
> lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*]
> --deploy-mode client --class xyz.Test1 bin/filestream3.jar
>
>
>
>
>
> Regards,
> Jeniba Johnson
>
>
> ________________________________
> The contents of this e-mail and any attachment(s) may contain confidential
> or privileged information for the intended recipient(s). Unintended
> recipients are prohibited from taking action on the basis of information in
> this e-mail and using or disseminating the information, and must notify the
> sender and delete it from their system. L&T Infotech will not accept
> responsibility or liability for the accuracy or completeness of, or the
> presence of any virus or disabling code in this e-mail"
>
>
>

Reply via email to