I am not sure how to write it…I tried writing to local file system using 
FileWriter and Print Writer. I tried it inside the while loop. I am able to get 
the text and able to print it but it fails when I use regular java classes. 
Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I 
have to create the file in HDFS using HDFS classes? I thought of using Spark’s 
SaveAsTextFile(). But I have JavaRDD<SparkFlumeEvent> of this..not 
JavaRDD<AvroEvent>. So I am not sure whether SaveAsText() will work. I 
appreciate any guidance here. How do I get more code examples? Books, URL?


  flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume 
Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);

                           ??I was trying to write the data to hdfs..but it 
fails…


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, July 11, 2014 1:43 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: writing FLume data to HDFS

What is the error you are getting when you say "??I was trying to write the 
data to hdfs..but it fails…"

TD

On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. 
<muthu.x.sundaram....@sabre.com<mailto:muthu.x.sundaram....@sabre.com>> wrote:
I am new to spark. I am trying to do the following.
Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>?
Please share any code examples… Thanks.

Code:

 Duration batchInterval = new Duration(2000);
    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
    JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, 
host, port);

    flumeStream.count();
flumeStream.foreachRDD(new 
Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
 @Override
 public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent> 
events2) throws Exception{
events1.saveasTextFile("output.txt");
return null;
 }
 });

    /*flumeStream.count().map(new Function<Long, String>() {
      @Override
      public String call(Long in) {
        return "Received " + in + " flume events.";
      }
    }).print();*/

    flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();


                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;

                     // All the user level data is carried as payload in Flume 
Event

                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);

                           ??I was trying to write the data to hdfs..but it 
fails…



                     }
                     System.out.println("Processed this batch in: " + 
(System.currentTimeMillis() - t1)/1000 + " seconds");
                     return null;
              }
         });


Reply via email to