Thanks Tathagata. Here’s the code snippet: // insert the records read in this batch interval into DB flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
System.out.println(">>>>>> Received Spark Flume Events: " + events.size()); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); String[] fields = logRecord.split("|"); //TODO: Insert a record of format: [msisdn|ip_addr|start_time|end_time] into DB } System.out.println("Processed this batch in: " + (System.currentTimeMillis() - t1)/1000 + " seconds"); return null; } }); Regards, Vikram From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Tuesday, April 29, 2014 2:41 AM To: user@spark.apache.org Subject: Re: Java Spark Streaming - SparkFlumeEvent You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using SparkFlumeEvent.event. That should probably give you all the original text data. On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vikram.kulka...@hp.com<mailto:vikram.kulka...@hp.com>> wrote: Hi Spark-users, Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a ‘spooling directory source’ to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file. My Spark Streaming Code snippet here: System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort); //JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port); JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, avroServer, avroPort); flumeStream.count(); flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); System.out.println(">>>>>> Received Spark Flume Events: " + events.size()); while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); //System.out.println("SparkFlumeEvent = " + flumeEvent); //System.out.println(">>>>>>>>" + flumeEvent.toString()); //TODO: How to build each line in the file using this SparkFlumeEvent object? } return null; } }); Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory. Regards, Vikram