I tried to map SparkFlumeEvents to String of RDDs like below. But that map and 
call are not at all executed. I might be doing this in a wrong way. Any help 
would be appreciated.

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                                System.out.println("<<<<<<Inside for 
each...call>>>>");

                                JavaRDD<String> records = eventsData.map(
            new Function<SparkFlumeEvent, String>() {
                                @Override
                public String call(SparkFlumeEvent flume) throws Exception {
                    String logRecord = null;
                AvroFlumeEvent avroEvent = null;
      ByteBuffer bytePayload = null;

                                                                                
System.out.println("<<<<<<Inside Map..call>>>>");
                    /* List<SparkFlumeEvent> events = flume.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator(); 
                                                                                
                            SparkFlumeEvent flumeEvent = batchedEvents.next();*/
                            avroEvent = flume.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());        
                        
                                                                                
                                System.out.println("<<<<Record is" + logRecord);
                                                                                
                    return logRecord;
                }
            });                                               
                                return null;
}

-----Original Message-----
From: Sundaram, Muthu X. [mailto:muthu.x.sundaram....@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; d...@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these 
function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume 
Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord); }

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I 
create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu


Reply via email to