My intention is not to write data directly from flume to hdfs. I have to collect messages from queue using flume and send it to spark streaming for additional processing. I will try what you have suggested.
Thanks, Muthu From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Monday, July 14, 2014 2:23 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: writing FLume data to HDFS Stepping a bit back, if you just want to write flume data to HDFS, you can use flume's HDFS sink for that. Trying to do this using Spark Streaming and SparkFlumeEvent is unnecessarily complex. And I guess it is tricky to write the raw bytes from the sparkflumevent into a file. If you want to do it this way, I suggest trying this (not tested, pure guess work). RDD[SparkFlumeEvent] ---> map to get the RDD of payload bytes ---> do RDD.mapPartition() to write the whole RDD's partition of bytes into a HDFS file (using HDFS's file output stream interface) You will have to take care of making the file names of each parititon unique, and dealing with failures in writing, etc. TD On Mon, Jul 14, 2014 at 9:29 AM, Sundaram, Muthu X. <muthu.x.sundaram....@sabre.com<mailto:muthu.x.sundaram....@sabre.com>> wrote: I am not sure how to write it…I tried writing to local file system using FileWriter and Print Writer. I tried it inside the while loop. I am able to get the text and able to print it but it fails when I use regular java classes. Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I have to create the file in HDFS using HDFS classes? I thought of using Spark’s SaveAsTextFile(). But I have JavaRDD<SparkFlumeEvent> of this..not JavaRDD<AvroEvent>. So I am not sure whether SaveAsText() will work. I appreciate any guidance here. How do I get more code examples? Books, URL? flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); ??I was trying to write the data to hdfs..but it fails… From: Tathagata Das [mailto:tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>] Sent: Friday, July 11, 2014 1:43 PM To: user@spark.apache.org<mailto:user@spark.apache.org> Cc: u...@spark.incubator.apache.org<mailto:u...@spark.incubator.apache.org> Subject: Re: writing FLume data to HDFS What is the error you are getting when you say "??I was trying to write the data to hdfs..but it fails…" TD On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. <muthu.x.sundaram....@sabre.com<mailto:muthu.x.sundaram....@sabre.com>> wrote: I am new to spark. I am trying to do the following. Netcat-->Flume-->Spark streaming(process Flume Data)-->HDFS. My flume config file has following set up. Source = netcat Sink=avrosink. Spark Streaming code: I am able to print data from flume to the monitor. But I am struggling to create a file. In order to get the real data I need to convert SparkEvent to avroEvent. JavaRDD.saveAsText()-->might not work..because JavaRDD is collection of SparkEvent..Do I need to convert this in to collection of JavaRDD<AvroEvent>? Please share any code examples… Thanks. Code: Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.foreachRDD(new Function2<JavaRDD<SparkFlumeEvent>,JavaRDD<SparkFlumeEvent>,Void>(){ @Override public Void call(JavaRDD<SparkFlumeEvent> events1,JavaRDD<SparkFlumeEvent> events2) throws Exception{ events1.saveasTextFile("output.txt"); return null; } }); /*flumeStream.count().map(new Function<Long, String>() { @Override public String call(Long in) { return "Received " + in + " flume events."; } }).print();*/ flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () { @Override public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception { String logRecord = null; List<SparkFlumeEvent> events = eventsData.collect(); Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(">>>>>>>>LOG RECORD = " + logRecord); ??I was trying to write the data to hdfs..but it fails… } System.out.println("Processed this batch in: " + (System.currentTimeMillis() - t1)/1000 + " seconds"); return null; } });