Re: Spark Application Stages and DAG

2015-04-07 Thread Vijay Innamuri
My Spark streaming application processes the data received in each interval.

In Spark Stages UI, all the stages are pointed to single line of code*
windowDStream.foreachRDD* only (not the actions inside the DStream)


   - Following is the information from Spark Stages UI page:


Stage IdDescription
Submitted   Duration   Tasks: Succeeded/TotalInput
OutputShuffle ReadShuffle Write
2foreachRDD at Parser.scala:58 +details 06-04-2015 16:21
19 min3125/3125 (43 failed) 154.4 MB23.9 MB
1foreachRDD at Parser.scala:58 +details 06-04-2015 16:19
2.3 min3125/3125 149.7 MB
0foreachRDD at Parser.scala:58 +details 06-04-2015 16:16
3.0 min3125/3125 149.7 MB


   - Following is the code snippet at Parser.scala:58:

val windowDStream = ssc.fileStream[LongWritable, Text,
CustomInputFormat](args(0), (x : Path) = true, false)
*windowDStream.foreachRDD *{ IncomingFiles =

println(Interval data processing
+Calendar.getInstance().getTime());
if (IncomingFiles.count() == 0) {
println(No files received in this interval)
} else {
println(IncomingFiles.count()+ files received in this
interval);
//convert each xml text to RDD[Elem]
val inputRDD = IncomingFiles.map(eachXML = {
MyXML.loadString(eachXML._2.toString().trim().replaceFirst(^([\\W]+),
)) });
//Create a schema RDD for querying the data
val MySchemaRDD = inputRDD.map(x = {

Bd((x \\ Oied \\ oeuo).text, List(placeholder1,
placeholder2, placeholder3))
//Bd is a case class - case class Bd(oeuo : String, mi
: List[String])
})
// Save the file for debuging
MySchemaRDD.saveAsTextFile(/home/spark/output/result.txt)
//Spark SQL processing starts from here
MySchemaRDD.registerTempTable(MySchemaTable)
//Todo processing with Sparl-SQL
MySchemaRDD.printSchema()

println(end of processing);

}
}

Spark UI Details for Stage 2
http://pastebin.com/c2QYeSJj

I have tested this with 150 MB of input data.
All the Spark memory options as default and with executor Memory 512.0 MB.


   - Is it possible to see the stages information within the *windowDStream*
   operation (which action inside the Dstream processing)?


   - During Stage 2 executor had restarted many times due to
   OutOfMemoryError. is this an expected behavior? (Please find the stage 2
   details)


Regards
Vijay

On 3 April 2015 at 13:21, Tathagata Das t...@databricks.com wrote:

 What he meant is that look it up in the Spark UI, specifically in the
 Stage tab to see what is taking so long. And yes code snippet helps us
 debug.

 TD

 On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You need open the Stage\'s page which is taking time, and see how long
 its spending on GC etc. Also it will be good to post that Stage and its
 previous transformation's code snippet to make us understand it better.

 Thanks
 Best Regards

 On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri vijay.innam...@gmail.com
 wrote:


 When I run the Spark application (streaming) in local mode I could see
 the execution progress as below..

 [Stage
 0:
 (1817 + 1) / 3125]
 
 [Stage
 2:===
 (740 + 1) / 3125]

 One of the stages is taking long time for execution.

 How to find the transformations/ actions associated with a particular
 stage?
 Is there anyway to find the execution DAG of a Spark Application?

 Regards
 Vijay






Re: Spark Streaming with compressed xml files

2015-03-16 Thread Vijay Innamuri
textFileStream and default fileStream recognizes the compressed
xml(.xml.gz) files.

Each line in the xml file is an element in RDD[string].

Then whole RDD is converted to a proper xml format data and stored in a *Scala
variable*.

   - I believe storing huge data in a *Scala variable* is inefficient. Is
   there any alternative processing for xml files?
   - How to create Spark SQL table  with the above xml data?

Regards
Vijay Innamuri


On 16 March 2015 at 12:12, Akhil Das ak...@sigmoidanalytics.com wrote:

 One approach would be, If you are using fileStream you can access the
 individual filenames from the partitions and with that filename you can
 apply your uncompression logic/parsing logic and get it done.


 Like:

 UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; 
 NewHadoopPartition npp = (NewHadoopPartition) upp.split();  String 
 *fPath* = npp.serializableHadoopSplit().value().toString();


 Another approach would be to create a custom inputReader and InpurFormat,
 then pass it along with your fileStream and within the reader, you do your
 uncompression/parsing etc. You can also look into XMLInputFormat
 https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java
 of mahout.




 Thanks
 Best Regards

 On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri vijay.innam...@gmail.com
  wrote:

 Hi All,

 Processing streaming JSON files with Spark features (Spark streaming and
 Spark SQL), is very efficient and works like a charm.

 Below is the code snippet to process JSON files.

 windowDStream.foreachRDD(IncomingFiles = {
 val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
 IncomingFilesTable.registerAsTable(IncomingFilesTable);
 val result = sqlContext.sql(select text from
 IncomingFilesTable).collect;
 sc.parallelize(result).saveAsTextFile(filepath);
 }


 But, I feel its difficult to use spark features efficiently with
 streaming xml files (each compressed file would be 4 MB).

 What is the best approach for processing compressed xml files?

 Regards
 Vijay





Spark Streaming with compressed xml files

2015-03-16 Thread Vijay Innamuri
Hi All,

Processing streaming JSON files with Spark features (Spark streaming and
Spark SQL), is very efficient and works like a charm.

Below is the code snippet to process JSON files.

windowDStream.foreachRDD(IncomingFiles = {
val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
IncomingFilesTable.registerAsTable(IncomingFilesTable);
val result = sqlContext.sql(select text from
IncomingFilesTable).collect;
sc.parallelize(result).saveAsTextFile(filepath);
}


But, I feel its difficult to use spark features efficiently with streaming
xml files (each compressed file would be 4 MB).

What is the best approach for processing compressed xml files?

Regards
Vijay