Hi Spark-users,
  Within my Spark Streaming program, I am able to ingest data sent by my Flume 
Avro Client. I configured a 'spooling directory source' to write data to a 
Flume Avro Sink (the Spark Streaming Driver program in this case). The default 
deserializer i.e. LINE is used to parse the file into events. Therefore I am 
expecting an event (SparkFlumeEvent) for every line in the log file.

My Spark Streaming Code snippet here:

       System.out.println("Setting up Flume Stream using Avro Sink at: " + 
avroServer + ":" + avroPort);

       //JavaDStream<SparkFlumeEvent> flumeStream = 
sc.flumeStream("XXX.YYY.XXX.YYY", port);
       JavaDStream<SparkFlumeEvent> flumeStream =
                     FlumeUtils.createStream(ssc, avroServer, avroPort);

       flumeStream.count();

       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
                     @Override
                     public Void call(JavaRDD<SparkFlumeEvent> eventsData) 
throws Exception {
                            List<SparkFlumeEvent> events = eventsData.collect();
                            Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();

                            System.out.println(">>>>>> Received Spark Flume 
Events: " + events.size());
                            while(batchedEvents.hasNext()) {
                                   SparkFlumeEvent flumeEvent = 
batchedEvents.next();
                                   //System.out.println("SparkFlumeEvent = " + 
flumeEvent);
                                   //System.out.println(">>>>>>>>" + 
flumeEvent.toString());

//TODO: How to build each line in the file using this SparkFlumeEvent object?
                            }
                           return null;
                     }
             });

Within this while loop, how do I extract each line that was streamed using the 
SparkFlumeEvent object? I intend to then parse this line, extract various 
fields and then persist it to memory.

Regards,
Vikram

Reply via email to