Hi Spark-users, Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a 'spooling directory source' to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file.
My Spark Streaming Code snippet here: System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort); //JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port); JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, avroServer, avroPort); flumeStream.count(); flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); System.out.println(">>>>>> Received Spark Flume Events: " + events.size()); while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); //System.out.println("SparkFlumeEvent = " + flumeEvent); //System.out.println(">>>>>>>>" + flumeEvent.toString()); //TODO: How to build each line in the file using this SparkFlumeEvent object? } return null; } }); Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory. Regards, Vikram