Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, 
port);

Each event is a delimited record. I like to use some of the transformation 
functions like map and reduce on this. Do I need to convert the 
JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these 
function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 
ticket.

I have to do transformations like this. Right now I am able to receives 
records. But I am struggling to transform them using spark transformation 
functions since they are not of type JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws 
Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = 
events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume 
Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + 
logRecord);
}

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I 
create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu


Reply via email to