Thanks Tathagata. Here’s the code snippet:

       // insert the records read in this batch interval into DB
       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();

                     System.out.println(">>>>>> Received Spark Flume Events: " 
+ events.size());

                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;

                     // All the user level data is carried as payload in Flume 
Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();

                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);
                            String[] fields = logRecord.split("|");

                            //TODO: Insert a record of format: 
[msisdn|ip_addr|start_time|end_time] into DB
                     }
                     System.out.println("Processed this batch in: " + 
(System.currentTimeMillis() - t1)/1000 + " seconds");

                     return null;
              }
         });


Regards,
Vikram

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Tuesday, April 29, 2014 2:41 AM
To: user@spark.apache.org
Subject: Re: Java Spark Streaming - SparkFlumeEvent

You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using 
SparkFlumeEvent.event. That should probably give you all the original text data.


On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram 
<vikram.kulka...@hp.com<mailto:vikram.kulka...@hp.com>> wrote:
Hi Spark-users,
  Within my Spark Streaming program, I am able to ingest data sent by my Flume 
Avro Client. I configured a ‘spooling directory source’ to write data to a 
Flume Avro Sink (the Spark Streaming Driver program in this case). The default 
deserializer i.e. LINE is used to parse the file into events. Therefore I am 
expecting an event (SparkFlumeEvent) for every line in the log file.

My Spark Streaming Code snippet here:

       System.out.println("Setting up Flume Stream using Avro Sink at: " + 
avroServer + ":" + avroPort);

       //JavaDStream<SparkFlumeEvent> flumeStream = 
sc.flumeStream("XXX.YYY.XXX.YYY", port);
       JavaDStream<SparkFlumeEvent> flumeStream =
                     FlumeUtils.createStream(ssc, avroServer, avroPort);

       flumeStream.count();

       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
                     @Override
                     public Void call(JavaRDD<SparkFlumeEvent> eventsData) 
throws Exception {
                            List<SparkFlumeEvent> events = eventsData.collect();
                            Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();

                            System.out.println(">>>>>> Received Spark Flume 
Events: " + events.size());
                            while(batchedEvents.hasNext()) {
                                   SparkFlumeEvent flumeEvent = 
batchedEvents.next();
                                   //System.out.println("SparkFlumeEvent = " + 
flumeEvent);
                                   //System.out.println(">>>>>>>>" + 
flumeEvent.toString());

//TODO: How to build each line in the file using this SparkFlumeEvent object?
                            }
                           return null;
                     }
             });

Within this while loop, how do I extract each line that was streamed using the 
SparkFlumeEvent object? I intend to then parse this line, extract various 
fields and then persist it to memory.

Regards,
Vikram


Reply via email to