My intention is not to write data directly from flume to hdfs. I have to 
collect messages from queue using flume and send it to spark streaming for 
additional processing. I will try what you have suggested.

Thanks,
Muthu

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Monday, July 14, 2014 2:23 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: writing FLume data to HDFS

Stepping a bit back, if you just want to write flume data to HDFS, you can use 
flume's HDFS sink for that.

Trying to do this using Spark Streaming and SparkFlumeEvent is unnecessarily 
complex. And I guess it is tricky to write the raw bytes from the 
sparkflumevent into a file. If you want to do it this way, I suggest trying 
this (not tested, pure guess work).

RDD[SparkFlumeEvent] ---> map to get the RDD of payload bytes ---> do 
RDD.mapPartition() to write the whole RDD's partition of bytes into a HDFS file 
(using HDFS's file output stream interface)

You will have to take care of making the file names of each parititon unique, 
and dealing with failures in writing, etc.

TD


On Mon, Jul 14, 2014 at 9:29 AM, Sundaram, Muthu X. 
<muthu.x.sundaram....@sabre.com<mailto:muthu.x.sundaram....@sabre.com>> wrote:
I am not sure how to write it…I tried writing to local file system using 
FileWriter and Print Writer. I tried it inside the while loop. I am able to get 
the text and able to print it but it fails when I use regular java classes. 
Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I 
have to create the file in HDFS using HDFS classes? I thought of using Spark’s 
SaveAsTextFile(). But I have JavaRDD<SparkFlumeEvent> of this..not 
JavaRDD<AvroEvent>. So I am not sure whether SaveAsText() will work. I 
appreciate any guidance here. How do I get more code examples? Books, URL?


  flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume 
Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);

                           ??I was trying to write the data to hdfs..but it 
fails…


From: Tathagata Das 
[mailto:tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>]
Sent: Friday, July 11, 2014 1:43 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Cc: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org>
Subject: Re: writing FLume data to HDFS

What is the error you are getting when you say "??I was trying to write the 
data to hdfs..but it fails…"

TD

On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. 
<muthu.x.sundaram....@sabre.com<mailto:muthu.x.sundaram....@sabre.com>> wrote:
I am new to spark. I am trying to do the following.
Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS.

My flume config file has following set up.

Source = netcat
Sink=avrosink.

Spark Streaming code:
I am able to print data from flume to the monitor. But I am struggling to 
create a file. In order to get the real data I need to convert SparkEvent to 
avroEvent.
JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of 
SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>?
Please share any code examples… Thanks.

Code:

 Duration batchInterval = new Duration(2000);
    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
    JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, 
host, port);

    flumeStream.count();
flumeStream.foreachRDD(new 
Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){
 @Override
 public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent> 
events2) throws Exception{
events1.saveasTextFile("output.txt");
return null;
 }
 });

    /*flumeStream.count().map(new Function<Long, String>() {
      @Override
      public String call(Long in) {
        return "Received " + in + " flume events.";
      }
    }).print();*/

    flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();


                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;

                     // All the user level data is carried as payload in Flume 
Event

                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);

                           ??I was trying to write the data to hdfs..but it 
fails…



                     }
                     System.out.println("Processed this batch in: " + 
(System.currentTimeMillis() - t1)/1000 + " seconds");
                     return null;
              }
         });



Reply via email to