Re: Tuning Resource Allocation during runtime

2018-04-27 Thread jogesh anand
Hi Donni,

Please check spark dynamic allocation and external shuffle service .

On Fri, 27 Apr 2018 at 2:52 AM, Donni Khan 
wrote:

> Hi All,
>
> Is there any way to change the  number of executors/cores  during running
> Saprk Job.
> I have Spark Job containing two tasks: First task need many executors to
> run fastly. the second task has many input and output opeartions and
> shuffling, so it needs  few executors, otherwise it taks loong time to
> finish.
> Does anyone knows if that possible in YARN?
>
>
> Thank you.
> Donni
>
-- 

*Regards,*

*Jogesh Anand*


Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy

2017-10-13 Thread Anand Chandrashekar
Greetings!

I would like to accomplish a custom kafka checkpoint strategy (instead of
hdfs, i would like to use redis). is there a strategy I can use to change
this behavior; any advise will help. Thanks!

Regards,
Anand.


How to read large size files from a directory ?

2017-05-09 Thread ashwini anand
I had posted this question yesterday but formatting of my question was very
bad. So I am posting the same question again. Below is my question:

I am reading a directory of files using wholeTextFiles. After that I am
calling a function on each element of the rdd using map . The whole program
uses just 50 lines of each file. Please find the code at below link

https://gist.github.com/ashwini-anand/0e468da9b4ab7863dff14833d34de79e

The size of each file of the directory can be very large in my case and
because of this reason use of wholeTextFiles api will be inefficient in this
case. Right now wholeTextFiles loads full file content into the memory. can
we make wholeTextFiles to load only first 50 lines of each file ? Apart from
using wholeTextFiles, other solution I can think of is iterating over each
file of the directory one by one but that also seems to be inefficient. I am
new to spark. Please let me know if there is any efficient way to do this. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-large-size-files-from-a-directory-tp28673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to read large size files from a directory ?

2017-05-08 Thread ashwini anand
I am reading each file of a directory using wholeTextFiles. After that I am
calling a function on each element of the rdd using map . The whole program
uses just 50 lines of each file. The code is as below:def
processFiles(fileNameContentsPair):  fileName= fileNameContentsPair[0] 
result = "\n\n"+fileName  resultEr = "\n\n"+fileName  input =
StringIO.StringIO(fileNameContentsPair[1])  reader =
csv.reader(input,strict=True)  try:   i=0   for row in reader:
if i==50:   break // do some processing and get result
string i=i+1  except csv.Error as e:resultEr = resultEr +"error
occured\n\n"return resultEr  return resultif __name__ == "__main__": 
inputFile = sys.argv[1]  outputFile = sys.argv[2]  sc = SparkContext(appName
= "SomeApp")  resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
resultRDD.saveAsTextFile(outputFile)The size of each file of the directory
can be very large in my case and because of this reason use of
wholeTextFiles api will be inefficient in this case. Right now
wholeTextFiles loads full file content into the memory. can we make
wholeTextFiles to load only first 50 lines of each file ? Apart from using
wholeTextFiles, other solution I can think of is iterating over each file of
the directory one by one but that also seems to be inefficient. I am new to
spark. Please let me know if there is any efficient way to do this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-large-size-files-from-a-directory-tp28669.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How does partitioning happen for binary files in spark ?

2017-04-06 Thread ashwini anand
By looking into the source code, I found that for textFile(), the
partitioning is computed by the computeSplitSize() function in
FileInputFormat class. This function takes into consideration the
minPartitions value passed by user. As per my understanding , the same thing
for binaryFiles() is computed by the setMinPartitions() function of
PortableDataStream class. This setMinPartitions() function completely
ignores the minPartitions value passed by user. However I find that in my
application somehow the partition varies based on the minPartition value in
case of binaryFiles() too. I have no idea how this is happening.Please help
me understand how the partitioning happens in case of binaryFiles().

source code for setMinPartitions() is as below:def setMinPartitions(sc:
SparkContext, context: JobContext, minPartitions: Int) {val
defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)   
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)val
defaultParallelism = sc.defaultParallelismval files =
listStatus(context).asScalaval totalBytes =
files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sumval
bytesPerCore = totalBytes / defaultParallelismval maxSplitSize =
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))   
super.setMaxSplitSize(maxSplitSize)  } 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-partitioning-happen-for-binary-files-in-spark-tp28575.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Insert a JavaPairDStream into multiple cassandra table on the basis of key.

2016-11-02 Thread Abhishek Anand
Hi All,

I have a JavaPairDStream. I want to insert this dstream into multiple
cassandra tables on the basis of key. One approach is to filter each key
and then insert it into cassandra table. But this would call filter
operation "n" times depending on the number of keys.

Is there any better approach to achieve this more quickly ?

Thanks
Abhi


Re: spark infers date to be timestamp type

2016-10-26 Thread Anand Viswanathan
Hi,

you can use the customSchema(for DateType) and specify dateFormat in .option().
or 
at spark dataframe side, you can convert the timestamp to date using cast to 
the column.

Thanks and regards,
Anand Viswanathan

> On Oct 26, 2016, at 8:07 PM, Koert Kuipers  wrote:
> 
> hey,
> i create a file called test.csv with contents:
> date
> 2015-01-01
> 2016-03-05
> 
> next i run this code in spark 2.0.1:
> spark.read
>   .format("csv")
>   .option("header", true)
>   .option("inferSchema", true)
>   .load("test.csv")
>   .printSchema
> 
> the result is:
> root
>  |-- date: timestamp (nullable = true)
> 
> 
> On Wed, Oct 26, 2016 at 7:35 PM, Hyukjin Kwon  <mailto:gurwls...@gmail.com>> wrote:
> There are now timestampFormat for TimestampType and dateFormat for DateType.
> 
> Do you mind if I ask to share your codes?
> 
> 
> On 27 Oct 2016 2:16 a.m., "Koert Kuipers"  <mailto:ko...@tresata.com>> wrote:
> is there a reason a column with dates in format -mm-dd in a csv file is 
> inferred to be TimestampType and not DateType?
> 
> thanks! koert
> 



Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Thank you so much Mich,

I am using yarn as my master.

I found a statement in Spark mentioning the amount of memory depends on 
individual application.
http://spark.apache.org/docs/1.5.2/hardware-provisioning.html#memory 
<http://spark.apache.org/docs/1.5.2/hardware-provisioning.html#memory>

I guess my assumption that "default resources (memory and cores) can handle any 
application" is wrong.

Thanks and regards,
Anand Viswanathan

> On Sep 19, 2016, at 6:56 PM, Mich Talebzadeh  
> wrote:
> 
> If you make your driver memory too low it is likely you are going to hit OOM 
> error.
> 
> You have not mentioned with Spark mode you are using (Local, Standalone, Yarn 
> etc)
> 
> HTH
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 19 September 2016 at 23:48, Anand Viswanathan 
> mailto:anand_v...@ymail.com.invalid>> wrote:
> Thank you so much, Kevin.
> 
> My data size is around 4GB.
> I am not using collect(), take() or takeSample()
> At the final job, number of tasks grows up to 200,000
> 
> Still the driver crashes with OOM with default —driver-memory 1g but Job 
> succeeds if i specify 2g.
> 
> Thanks and regards,
> Anand Viswanathan
> 
>> On Sep 19, 2016, at 4:00 PM, Kevin Mellott > <mailto:kevin.r.mell...@gmail.com>> wrote:
>> 
>> Hi Anand,
>> 
>> Unfortunately, there is not really a "one size fits all" answer to this 
>> question; however, here are some things that you may want to consider when 
>> trying different sizes.
>> What is the size of the data you are processing?
>> Whenever you invoke an action that requires ALL of the data to be sent to 
>> the driver (such as collect), you'll need to ensure that your memory setting 
>> can handle it.
>> What level of parallelization does your code support? The more processing 
>> you can do on the worker nodes, the less your driver will need to do.
>> Related to these comments, keep in mind that the --executor-memory, 
>> --num-executors, and --executor-cores configurations can be useful when 
>> tuning the worker nodes. There is some great information in the Spark Tuning 
>> Guide (linked below) that you may find useful as well.
>> 
>> http://spark.apache.org/docs/latest/tuning.html 
>> <http://spark.apache.org/docs/latest/tuning.html>
>> 
>> Hope that helps!
>> Kevin
>> 
>> On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan 
>> mailto:anand_v...@ymail.com.invalid>> wrote:
>> Hi,
>> 
>> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 
>> 
>> I am running a machine learning program, which runs perfectly by specifying 
>> 2G for —driver-memory.
>> However the program cannot be run with default 1G, driver crashes with OOM 
>> error.
>> 
>> What is the recommended configuration for —driver-memory…? Please suggest.
>> 
>> Thanks and regards,
>> Anand.
>> 
>> 
> 
> 



Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Thank you so much, Kevin.

My data size is around 4GB.
I am not using collect(), take() or takeSample()
At the final job, number of tasks grows up to 200,000

Still the driver crashes with OOM with default —driver-memory 1g but Job 
succeeds if i specify 2g.

Thanks and regards,
Anand Viswanathan

> On Sep 19, 2016, at 4:00 PM, Kevin Mellott  wrote:
> 
> Hi Anand,
> 
> Unfortunately, there is not really a "one size fits all" answer to this 
> question; however, here are some things that you may want to consider when 
> trying different sizes.
> What is the size of the data you are processing?
> Whenever you invoke an action that requires ALL of the data to be sent to the 
> driver (such as collect), you'll need to ensure that your memory setting can 
> handle it.
> What level of parallelization does your code support? The more processing you 
> can do on the worker nodes, the less your driver will need to do.
> Related to these comments, keep in mind that the --executor-memory, 
> --num-executors, and --executor-cores configurations can be useful when 
> tuning the worker nodes. There is some great information in the Spark Tuning 
> Guide (linked below) that you may find useful as well.
> 
> http://spark.apache.org/docs/latest/tuning.html 
> <http://spark.apache.org/docs/latest/tuning.html>
> 
> Hope that helps!
> Kevin
> 
> On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan 
> mailto:anand_v...@ymail.com.invalid>> wrote:
> Hi,
> 
> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 
> 
> I am running a machine learning program, which runs perfectly by specifying 
> 2G for —driver-memory.
> However the program cannot be run with default 1G, driver crashes with OOM 
> error.
> 
> What is the recommended configuration for —driver-memory…? Please suggest.
> 
> Thanks and regards,
> Anand.
> 
> 



driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Hi,

Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 

I am running a machine learning program, which runs perfectly by specifying 2G 
for —driver-memory.
However the program cannot be run with default 1G, driver crashes with OOM 
error.

What is the recommended configuration for —driver-memory…? Please suggest.

Thanks and regards,
Anand.



Re: Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
Hi Ayan,

How will I get column wise distinct items using this approach ?

On Mon, Sep 19, 2016 at 3:31 PM, ayan guha  wrote:

> Create an array out of cilumns, convert to Dataframe,
> explode,distinct,write.
> On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:
>
>> You can use distinct over you data frame or rdd
>>
>> rdd.distinct
>>
>> It will give you distinct across your row.
>>
>> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand 
>> wrote:
>>
>>> I have an rdd which contains 14 different columns. I need to find the
>>> distinct across all the columns of rdd and write it to hdfs.
>>>
>>> How can I acheive this ?
>>>
>>> Is there any distributed data structure that I can use and keep on
>>> updating it as I traverse the new rows ?
>>>
>>> Regards,
>>> Abhi
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>


Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
I have an rdd which contains 14 different columns. I need to find the
distinct across all the columns of rdd and write it to hdfs.

How can I acheive this ?

Is there any distributed data structure that I can use and keep on updating
it as I traverse the new rows ?

Regards,
Abhi


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Thanks Nihed.

I was able to do this in exactly the same way.


Cheers!!
Abhi

On Mon, Jul 18, 2016 at 5:56 PM, nihed mbarek  wrote:

> and if we have this static method
> df.show();
> Column c = concatFunction(df, "l1", "firstname,lastname");
> df.select(c).show();
>
> with this code :
> Column concatFunction(DataFrame df, String fieldName, String columns) {
> String[] array = columns.split(",");
> Column[] concatColumns = new Column[array.length];
> for (int i = 0; i < concatColumns.length; i++) {
> concatColumns[i]=df.col(array[i]);
> }
>
> return functions.concat(concatColumns).alias(fieldName);
> }
>
>
>
> On Mon, Jul 18, 2016 at 2:14 PM, Abhishek Anand 
> wrote:
>
>> Hi Nihed,
>>
>> Thanks for the reply.
>>
>> I am looking for something like this :
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1")));
>>
>>
>> Here I have to give C0 and C1 columns, I am looking to write a generic
>> function that concatenates the columns depending on input columns.
>>
>> like if I have something
>> String str = "C0,C1,C2"
>>
>> Then it should work as
>>
>> DataFrame training = orgdf.withColumn("I1",
>> functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));
>>
>>
>>
>> Thanks,
>> Abhi
>>
>> On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:
>>
>>> Hi,
>>>
>>>
>>> I just wrote this code to help you. Is it what you need ??
>>>
>>>
>>> SparkConf conf = new
>>> SparkConf().setAppName("hello").setMaster("local");
>>> JavaSparkContext sc = new JavaSparkContext(conf);
>>> SQLContext sqlContext = new SQLContext(sc);
>>> List persons = new ArrayList<>();
>>> persons.add(new Person("nihed", "mbarek", "nihed.com"));
>>> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>>>
>>> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>>>
>>> df.show();
>>> final String[] columns = df.columns();
>>>     Column[] selectColumns = new Column[columns.length + 1];
>>> for (int i = 0; i < columns.length; i++) {
>>> selectColumns[i]=df.col(columns[i]);
>>> }
>>>
>>>
>>> selectColumns[columns.length]=functions.concat(df.col("firstname"),
>>> df.col("lastname"));
>>>
>>> df.select(selectColumns).show();
>>>   ---
>>> public static class Person {
>>>
>>> private String firstname;
>>> private String lastname;
>>> private String address;
>>> }
>>>
>>>
>>>
>>> Regards,
>>>
>>> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>>>
>>>> I need to create interaction variables to be taken as input for my
>>>> program.
>>>>
>>>> For eg -
>>>>
>>>> I need to create I1 as concatenation of C0,C3,C5
>>>>
>>>> Similarly, I2  = concat(C4,C5)
>>>>
>>>> and so on ..
>>>>
>>>>
>>>> How can I achieve this in my Java code for concatenation of any columns
>>>> given input by the user.
>>>>
>>>> Thanks,
>>>> Abhi
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> M'BAREK Med Nihed,
>>> Fedora Ambassador, TUNISIA, Northern Africa
>>> http://www.nihed.com
>>>
>>> <http://tn.linkedin.com/in/nihed>
>>>
>>>
>>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Re: Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi Nihed,

Thanks for the reply.

I am looking for something like this :

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1")));


Here I have to give C0 and C1 columns, I am looking to write a generic
function that concatenates the columns depending on input columns.

like if I have something
String str = "C0,C1,C2"

Then it should work as

DataFrame training = orgdf.withColumn("I1",
functions.concat(orgdf.col("C0"),orgdf.col("C1"),orgdf.col("C2")));



Thanks,
Abhi

On Mon, Jul 18, 2016 at 4:39 PM, nihed mbarek  wrote:

> Hi,
>
>
> I just wrote this code to help you. Is it what you need ??
>
>
> SparkConf conf = new
> SparkConf().setAppName("hello").setMaster("local");
> JavaSparkContext sc = new JavaSparkContext(conf);
> SQLContext sqlContext = new SQLContext(sc);
> List persons = new ArrayList<>();
> persons.add(new Person("nihed", "mbarek", "nihed.com"));
> persons.add(new Person("mark", "zuckerberg", "facebook.com"));
>
> DataFrame df = sqlContext.createDataFrame(persons, Person.class);
>
> df.show();
> final String[] columns = df.columns();
> Column[] selectColumns = new Column[columns.length + 1];
> for (int i = 0; i < columns.length; i++) {
> selectColumns[i]=df.col(columns[i]);
> }
>
>
> selectColumns[columns.length]=functions.concat(df.col("firstname"),
> df.col("lastname"));
>
>     df.select(selectColumns).show();
>   ---
> public static class Person {
>
> private String firstname;
> private String lastname;
> private String address;
> }
>
>
>
> Regards,
>
> On Mon, Jul 18, 2016 at 12:45 PM, Abhishek Anand 
> wrote:
>
>> Hi,
>>
>> I have a dataframe say having C0,C1,C2 and so on as columns.
>>
>> I need to create interaction variables to be taken as input for my
>> program.
>>
>> For eg -
>>
>> I need to create I1 as concatenation of C0,C3,C5
>>
>> Similarly, I2  = concat(C4,C5)
>>
>> and so on ..
>>
>>
>> How can I achieve this in my Java code for concatenation of any columns
>> given input by the user.
>>
>> Thanks,
>> Abhi
>>
>
>
>
> --
>
> M'BAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com
>
> <http://tn.linkedin.com/in/nihed>
>
>


Concatenate the columns in dataframe to create new collumns using Java

2016-07-18 Thread Abhishek Anand
Hi,

I have a dataframe say having C0,C1,C2 and so on as columns.

I need to create interaction variables to be taken as input for my program.

For eg -

I need to create I1 as concatenation of C0,C3,C5

Similarly, I2  = concat(C4,C5)

and so on ..


How can I achieve this in my Java code for concatenation of any columns
given input by the user.

Thanks,
Abhi


Change spark dataframe to LabeledPoint in Java

2016-06-30 Thread Abhishek Anand
Hi ,

I have a dataframe which i want to convert to labeled point.

DataFrame labeleddf = model.transform(newdf).select("label","features");

How can I convert this to a LabeledPoint to use in my Logistic Regression
model.

I could do this in scala using
val trainData = labeleddf.map(row =>
LabeledPoint(row.getDouble(0), row(1).asInstanceOf[Vector])).cache()


How to achieve the same in Java,

Thanks,
Abhi


Re: spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
I also tried


jsc.sparkContext().sc().hadoopConfiguration().set("dfs.replication", "2")

But, still its not working.

Any ideas why its not working ?


Abhi

On Tue, May 31, 2016 at 4:03 PM, Abhishek Anand 
wrote:

> My spark streaming checkpoint directory is being written to HDFS with
> default replication factor of 3.
>
> In my streaming application where I am listening from kafka and setting
> the dfs.replication = 2 as below the files are still being written with
> replication factor=3
>
> SparkConf sparkConfig = new
> SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");
>
> Is there anything else that I need to do ??
>
>
> Thanks !!!
> Abhi
>


spark.hadoop.dfs.replication parameter not working for kafka-spark streaming

2016-05-31 Thread Abhishek Anand
My spark streaming checkpoint directory is being written to HDFS with
default replication factor of 3.

In my streaming application where I am listening from kafka and setting the
dfs.replication = 2 as below the files are still being written with
replication factor=3

SparkConf sparkConfig = new
SparkConf().setMaster("mymaster").set("spark.hadoop.dfs.replication", "2");

Is there anything else that I need to do ??


Thanks !!!
Abhi


Re: Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Thanks Yanbo.

So, you mean that if I have a variable which is of type double but I want
to treat it like String in my model I just have to cast those columns into
string and simply run the glm model. String columns will be directly
one-hot encoded by the glm provided by sparkR ?

Just wanted to clarify as in R we need to apply as.factor for categorical
variables.

val dfNew = df.withColumn("C0",df.col("C0").cast("String"))


Abhi !!

On Mon, May 30, 2016 at 2:58 PM, Yanbo Liang  wrote:

> Hi Abhi,
>
> In SparkR glm, category features (columns of type string) will be one-hot
> encoded automatically.
> So pre-processing like `as.factor` is not necessary, you can directly feed
> your data to the model training.
>
> Thanks
> Yanbo
>
> 2016-05-30 2:06 GMT-07:00 Abhishek Anand :
>
>> Hi ,
>>
>> I want to run glm variant of sparkR for my data that is there in a csv
>> file.
>>
>> I see that the glm function in sparkR takes a spark dataframe as input.
>>
>> Now, when I read a file from csv and create a spark dataframe, how could
>> I take care of the factor variables/columns in my data ?
>>
>> Do I need to convert it to a R dataframe, convert to factor using
>> as.factor and create spark dataframe and run glm over it ?
>>
>> But, running as.factor over big dataset is not possible.
>>
>> Please suggest what is the best way to acheive this ?
>>
>> What pre-processing should be done, and what is the best way to achieve
>> it  ?
>>
>>
>> Thanks,
>> Abhi
>>
>
>


Running glm in sparkR (data pre-processing step)

2016-05-30 Thread Abhishek Anand
Hi ,

I want to run glm variant of sparkR for my data that is there in a csv file.

I see that the glm function in sparkR takes a spark dataframe as input.

Now, when I read a file from csv and create a spark dataframe, how could I
take care of the factor variables/columns in my data ?

Do I need to convert it to a R dataframe, convert to factor using as.factor
and create spark dataframe and run glm over it ?

But, running as.factor over big dataset is not possible.

Please suggest what is the best way to acheive this ?

What pre-processing should be done, and what is the best way to achieve it
 ?


Thanks,
Abhi


Unable to write stream record to cassandra table with multiple columns

2016-05-10 Thread Anand N Ilkal
I am trying to write incoming stream data to database. Following is the example 
program, this code creates a thread to listen to incoming stream of data which 
is csv data. this data needs to be split with delimiter and the array of data 
needs to be pushed to database as separate columns in the TABLE.

object dbwrite {
  case class record(id: Long, time: java.sql.Timestamp, rx: Int, tx: Int, 
total: Int, multi: Double)
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: CustomReceiver  ")
  System.exit(1)
}

// Create the context with a 1 second batch size
val sparkConf = new SparkConf()
.set(“spark.cassandra.connection.host", "localhost")
.setAppName("dbwrite")
.set("spark.driver.allowMultipleContexts", "true")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext

// Create a input stream with the custom receiver on target ip:port and 
count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val splitRdd = lines.map(line => line.split(",") )
//val wordCounts = splitRdd.map(x => (x, 1)).reduceByKey(_ + _)
// RDD[Array[String]

val yourRdd = splitRdd.flatMap(arr => {
  val id = arr(0).toLong
  val rx = arr(2).toInt
  val tx = arr(3).toInt
  val total = arr(4).toInt
  val mul = arr(5).toInt
  val parsedDate = new java.util.Date()
  val timestamp = new java.sql.Timestamp(parsedDate.getTime());
  val reco = records(id, timestamp, rx, tx, total, mul);
  Seq(reco)
})

yourRdd.foreachRDD { rdd =>
for(item <- rdd.collect().toArray)
  print(item)
}
val rec = sc.parallelize(Seq(yourRdd))
rec.saveToCassandra("records", "record", SomeColumns(“id”, "time", "rx", 
"tx", "total”, "multi"))

ssc.start()
ssc.awaitTermination()
  }
}
but spark does gives following error -
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Columns not found in 
org.apache.spark.streaming.dstream.DStream[dbwrite.records]: [mdn, time, rx, 
tx, total, multi]
at scala.Predef$.require(Predef.scala:233)
at 
com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.(MappedToGettableDataConverter.scala:29)
at 
com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:20)
at 
com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:17)
at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at 
com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at 
com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:272)
at 
com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at dbwrite$.main(dbwrite.scala:63)
at dbwrite.main(dbwrite.scala)
i am using spark-1.6.1 and cassandra 3.5
the TABLE already created on cassandra has same column names. But the column 
display in alphabetical order, but all columns are avaialble.
help me with the error.

thanks.

Calculating log-loss for the trained model in Spark ML

2016-05-03 Thread Abhishek Anand
I am building a ML pipeline for logistic regression.

val lr = new LogisticRegression()

lr.setMaxIter(100).setRegParam(0.001)

val pipeline = new
Pipeline().setStages(Array(geoDimEncoder,clientTypeEncoder,
   devTypeDimIdEncoder,pubClientIdEncoder,tmpltIdEncoder,
   hourEncoder,assembler,lr))

val model = pipeline.fit(trainingDF)

Now, when the model is trained, I want to see the value
the probabilities for the training set and compute certain
validation parameters like log-loss. But, I am unable to find
this using "model".

The only thing I could find is

model.transform(testDF).select()

Cannot I get the metrics using the trained set for training set validation ?

Thanks !!


Re: removing header from csv file

2016-05-03 Thread Abhishek Anand
You can use this function to remove the header from your dataset(applicable
to RDD)

def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
  if (idx == 0) {
lines.drop(1)
  }
  lines
})
}


Abhi

On Wed, Apr 27, 2016 at 12:55 PM, Marco Mistroni 
wrote:

> If u r using Scala api you can do
> Myrdd.zipwithindex.filter(_._2 >0).map(_._1)
>
> Maybe a little bit complicated but will do the trick
> As per spark CSV, you will get back a data frame which you can reconduct
> to rdd. .
> Hth
> Marco
> On 27 Apr 2016 6:59 am, "nihed mbarek"  wrote:
>
>> You can add a filter with string that you are sure available only in the
>> header
>>
>> Le mercredi 27 avril 2016, Divya Gehlot  a
>> écrit :
>>
>>> yes you can remove the headers by removing the first row
>>>
>>> can first() or head() to do that
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>> On 27 April 2016 at 13:24, Ashutosh Kumar 
>>> wrote:
>>>
 I see there is a library spark-csv which can be used for removing
 header and processing of csv files. But it seems it works with sqlcontext
 only. Is there a way to remove header from csv files without sqlcontext ?

 Thanks
 Ashutosh

>>>
>>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>


Clear Threshold in Logistic Regression ML Pipeline

2016-05-03 Thread Abhishek Anand
Hi All,

I am trying to build a logistic regression pipeline in ML.

How can I clear the threshold which by default is 0.5. In mllib I am able
to clear the threshold to get the raw predictions using
model.clearThreshold() function.


Regards,
Abhi


Fwd: Facing Unusual Behavior with the executors in spark streaming

2016-04-05 Thread Abhishek Anand
Hi ,

Needed inputs for a couple of issue that I am facing in my production
environment.

I am using spark version 1.4.0 spark streaming.

1) It so happens that the worker is lost on a machine and the executor
still shows up in the executor's tab in the UI.

Even when I kill a worker using kill -9 command the worker and executor
both dies on that machine but executor still shows up in the executors tab
on the UI. The number of active tasks sometimes shows negative on that
executor and my job keeps failing with following exception.

This usually happens when a job is running. When no computation is taking
place on the cluster i.e suppose a 1 min batch gets completed in 20 secs
and I kill the worker then executor entry is also gone from the UI but when
I kill the worker when a job is still running I run into this issue always.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more



 When I relaunch the worker new executors are added but the dead one's
entry is still there until the application is killed.

 2) Another issue is when the disk becomes full on one of the workers, the
executor becomes unresponsive and the job stucks at a particular stage. The
exception that I can see in the executor logs is


 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)


As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are
recomputed. But, is it really the case that when the space is full on a
machine then my application gets stuck ?




This is really becoming a bottleneck and leads to unstability of my
production stack.

Please share your insights on this.


Thanks,
Abhi


Timeout in mapWithState

2016-04-04 Thread Abhishek Anand
What exactly is timeout in mapWithState ?

I want the keys to get remmoved from the memory if there is no data
received on that key for 10 minutes.

How can I acheive this in mapWithState ?

Regards,
Abhi


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-04-01 Thread Abhishek Anand
Hi Ted,

Any thoughts on this ???

I am getting the same kind of error when I kill a worker on one of the
machines.
Even after killing the worker using kill -9 command, the executor shows up
on the spark UI with negative active tasks.

All the tasks on that worker starts to fail with the following exception.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more




Cheers !!
Abhi

On Fri, Apr 1, 2016 at 9:04 AM, Abhishek Anand 
wrote:

> This is what I am getting in the executor logs
>
> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
> reverting partial writes to file
> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
> java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:315)
> at
> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)
>
>
>
> It happens every time the disk is full.
>
> On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu  wrote:
>
>> Can you show the stack trace ?
>>
>> The log message came from
>> DiskBlockObjectWriter#revertPartialWritesAndClose().
>> Unfortunately, the method doesn't throw exception, making it a bit hard
>> for caller to know of the disk full condition.
>>
>> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand > > wrote:
>>
>>>
>>> Hi,
>>>
>>> Why is it so that when my disk space is full on one of the workers then
>>> the executor on that worker becomes unresponsive and the jobs on that
>>> worker fails with the exception
>>>
>>>
>>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>>> reverting partial writes to file
>>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>>> java.io.IOException: No space left on device
>>>
>>>
>>> This is leading to my job getting stuck.
>>>
>>> As a workaround I have to kill the executor, clear the space on disk and
>>> new executor  relaunched by the worker and the failed stages are recomputed.
>>>
>>>
>>> How can I get rid of this problem i.e why my job get stuck on disk full
>>> issue on one of the workers ?
>>>
>>>
>>> Cheers !!!
>>> Abhi
>>>
>>>
>>
>


Re: Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
This is what I am getting in the executor logs

16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)



It happens every time the disk is full.

On Fri, Apr 1, 2016 at 2:18 AM, Ted Yu  wrote:

> Can you show the stack trace ?
>
> The log message came from
> DiskBlockObjectWriter#revertPartialWritesAndClose().
> Unfortunately, the method doesn't throw exception, making it a bit hard
> for caller to know of the disk full condition.
>
> On Thu, Mar 31, 2016 at 11:32 AM, Abhishek Anand 
> wrote:
>
>>
>> Hi,
>>
>> Why is it so that when my disk space is full on one of the workers then
>> the executor on that worker becomes unresponsive and the jobs on that
>> worker fails with the exception
>>
>>
>> 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
>> reverting partial writes to file
>> /data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
>> java.io.IOException: No space left on device
>>
>>
>> This is leading to my job getting stuck.
>>
>> As a workaround I have to kill the executor, clear the space on disk and
>> new executor  relaunched by the worker and the failed stages are recomputed.
>>
>>
>> How can I get rid of this problem i.e why my job get stuck on disk full
>> issue on one of the workers ?
>>
>>
>> Cheers !!!
>> Abhi
>>
>>
>


Disk Full on one Worker is leading to Job Stuck and Executor Unresponsive

2016-03-31 Thread Abhishek Anand
Hi,

Why is it so that when my disk space is full on one of the workers then the
executor on that worker becomes unresponsive and the jobs on that worker
fails with the exception


16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device


This is leading to my job getting stuck.

As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are recomputed.


How can I get rid of this problem i.e why my job get stuck on disk full
issue on one of the workers ?


Cheers !!!
Abhi


Output the data to external database at particular time in spark streaming

2016-03-08 Thread Abhishek Anand
I have a spark streaming job where I am aggregating the data by doing
reduceByKeyAndWindow with inverse function.

I am keeping the data in memory for upto 2 hours and In order to output the
reduced data to an external storage I conditionally need to puke the data
to DB say at every 15th minute of the each hour.

How can this be achieved.


Regards,
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-29 Thread Abhishek Anand
Hi Ryan,

Its not working even after removing the reduceByKey.

So, basically I am doing the following
- reading from kafka
- flatmap inside transform
- mapWithState
- rdd.count on output of mapWithState

But to my surprise still dont see checkpointing taking place.

Is there any restriction to the type of operation that we can perform
inside mapWithState ?

Really need to resolve this one as currently if my application is restarted
from checkpoint it has to repartition 120 previous stages which takes hell
lot of time.

Thanks !!
Abhi

On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu  wrote:

> Sorry that I forgot to tell you that you should also call `rdd.count()`
> for "reduceByKey" as well. Could you try it and see if it works?
>
> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand 
> wrote:
>
>> Hi Ryan,
>>
>> I am using mapWithState after doing reduceByKey.
>>
>> I am right now using mapWithState as you suggested and triggering the
>> count manually.
>>
>> But, still unable to see any checkpointing taking place. In the DAG I can
>> see that the reduceByKey operation for the previous batches are also being
>> computed.
>>
>>
>> Thanks
>> Abhi
>>
>>
>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>
>>> JavaMapWithStateDStream<...> stateDStream =
>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>> stateDStream.foreachRDD(new Function1<...>() {
>>>   @Override
>>>   public Void call(JavaRDD<...> rdd) throws Exception {
>>> rdd.count();
>>>   }
>>> });
>>> return stateDStream.stateSnapshots();
>>>
>>>
>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> Reposting the code.
>>>>
>>>> Basically my use case is something like - I am receiving the web
>>>> impression logs and may get the notify (listening from kafka) for those
>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>> (upto 2 hours). Now, when I receive notify for a particular impression I
>>>> need to swap the date field in impression with the date field in notify
>>>> logs. The notify for an impression has the same key as impression.
>>>>
>>>> static Function3, State,
>>>> Tuple2> mappingFunc =
>>>> new Function3, State, Tuple2>>> MyClass>>() {
>>>> @Override
>>>> public Tuple2 call(String key, Optional one,
>>>> State state) {
>>>> MyClass nullObj = new MyClass();
>>>> nullObj.setImprLog(null);
>>>> nullObj.setNotifyLog(null);
>>>> MyClass current = one.or(nullObj);
>>>>
>>>> if(current!= null && current.getImprLog() != null &&
>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>>> 3 /*notify for the impression received*/){
>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>  //swappping the dates
>>>> return new Tuple2<>(key, oldState);
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>> }
>>>> else{
>>>> return new Tuple2<>(key, null);
>>>> }
>>>>
>>>> }
>>>> };
>>>>
>>>>
>>>> return
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>
>>>>
>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>> and I am able to get the correct data. But, issue the might arise is when I
>>>> have to restart my application from checkpoint and it repartitions and
>>>> computes the previous 120 partitions, which delays the incoming batches.
>>>>
>>>>
>>>> Thanks !!
>>>> Abh

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-29 Thread Abhishek Anand
Hi Ryan,

I was able to resolve this issue. The /tmp location was mounted with
"noexec" option. Removing this noexec in the fstab resolved the issue. The
snappy shared object file is created at the /tmp location so either
removing the noexec from mount or changing the default temp location solved
ths issue.

export _JAVA_OPTIONS=-Djava.io.tmpdir=/mycustometemplocation



Thanks !!
Abhi


On Mon, Feb 29, 2016 at 3:46 AM, Shixiong(Ryan) Zhu  wrote:

> This is because the Snappy library cannot load the native library. Did you
> forget to install the snappy native library in your new machines?
>
> On Fri, Feb 26, 2016 at 11:05 PM, Abhishek Anand 
> wrote:
>
>> Any insights on this ?
>>
>> On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
>> wrote:
>>
>>> On changing the default compression codec which is snappy to lzf the
>>> errors are gone !!
>>>
>>> How can I fix this using snappy as the codec ?
>>>
>>> Is there any downside of using lzf as snappy is the default codec that
>>> ships with spark.
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand >> > wrote:
>>>
>>>> Hi ,
>>>>
>>>> I am getting the following exception on running my spark streaming job.
>>>>
>>>> The same job has been running fine since long and when I added two new
>>>> machines to my cluster I see the job failing with the following exception.
>>>>
>>>>
>>>>
>>>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>>>> (TID 22594)
>>>> java.io.IOException: java.lang.reflect.InvocationTargetException
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>> at
>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>>>> at
>>>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>>>> at org.apache.spark.broadcast.TorrentBroadcast.org
>>>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>>>> ... 11 more
>>>> Caused by: java.lang.IllegalArgumentException
>>>> at
>>>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>>>> ... 20 more
>>>>
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>
>>>
>>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-27 Thread Abhishek Anand
Hi Ryan,

I am using mapWithState after doing reduceByKey.

I am right now using mapWithState as you suggested and triggering the count
manually.

But, still unable to see any checkpointing taking place. In the DAG I can
see that the reduceByKey operation for the previous batches are also being
computed.


Thanks
Abhi


On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Abhi,
>
> Using reducebykeyandwindow and mapWithState will trigger the bug
> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>
> JavaMapWithStateDStream<...> stateDStream =
> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
> stateDStream.foreachRDD(new Function1<...>() {
>   @Override
>   public Void call(JavaRDD<...> rdd) throws Exception {
> rdd.count();
>   }
> });
> return stateDStream.stateSnapshots();
>
>
> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand 
> wrote:
>
>> Hi Ryan,
>>
>> Reposting the code.
>>
>> Basically my use case is something like - I am receiving the web
>> impression logs and may get the notify (listening from kafka) for those
>> impressions in the same interval (for me its 1 min) or any next interval
>> (upto 2 hours). Now, when I receive notify for a particular impression I
>> need to swap the date field in impression with the date field in notify
>> logs. The notify for an impression has the same key as impression.
>>
>> static Function3, State,
>> Tuple2> mappingFunc =
>> new Function3, State, Tuple2> MyClass>>() {
>> @Override
>> public Tuple2 call(String key, Optional one,
>> State state) {
>> MyClass nullObj = new MyClass();
>> nullObj.setImprLog(null);
>> nullObj.setNotifyLog(null);
>> MyClass current = one.or(nullObj);
>>
>> if(current!= null && current.getImprLog() != null &&
>> current.getMyClassType() == 1 /*this is impression*/){
>> return new Tuple2<>(key, null);
>> }
>> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
>> /*notify for the impression received*/){
>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>> if(oldState!= null && oldState.getNotifyLog() != null){
>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>  //swappping the dates
>> return new Tuple2<>(key, oldState);
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>> }
>> else{
>> return new Tuple2<>(key, null);
>> }
>>
>> }
>> };
>>
>>
>> return
>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>
>>
>> Currently I am using reducebykeyandwindow without the inverse function
>> and I am able to get the correct data. But, issue the might arise is when I
>> have to restart my application from checkpoint and it repartitions and
>> computes the previous 120 partitions, which delays the incoming batches.
>>
>>
>> Thanks !!
>> Abhi
>>
>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Hey Abhi,
>>>
>>> Could you post how you use mapWithState? By default, it should do
>>> checkpointing every 10 batches.
>>> However, there is a known issue that prevents mapWithState from
>>> checkpointing in some special cases:
>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>
>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand >> > wrote:
>>>
>>>> Any Insights on this one ?
>>>>
>>>>
>>>> Thanks !!!
>>>> Abhi
>>>>
>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>> abhis.anan...@gmail.com> wrote:
>>>>
>>>>> I am now trying to use mapWithState in the following way using some
>>>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>>>> the state and when restarting the application from checkpoint, it
>>>>> re-partitions all the previous batches data from kafka.
>>>>>
>>>>> static Function3, State,
>>>>> Tuple2> mappingFunc =
>>>>> new Function3, State,
>>>>> Tuple2>() {
>>>>> @Override
>>>>> public Tuple2 call(String key, Optional one,
>>>>> State state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotif

Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-26 Thread Abhishek Anand
Any insights on this ?

On Fri, Feb 26, 2016 at 1:21 PM, Abhishek Anand 
wrote:

> On changing the default compression codec which is snappy to lzf the
> errors are gone !!
>
> How can I fix this using snappy as the codec ?
>
> Is there any downside of using lzf as snappy is the default codec that
> ships with spark.
>
>
> Thanks !!!
> Abhi
>
> On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I am getting the following exception on running my spark streaming job.
>>
>> The same job has been running fine since long and when I added two new
>> machines to my cluster I see the job failing with the following exception.
>>
>>
>>
>> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
>> (TID 22594)
>> java.io.IOException: java.lang.reflect.InvocationTargetException
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
>> at
>> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
>> at org.apache.spark.broadcast.TorrentBroadcast.org
>> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
>> ... 11 more
>> Caused by: java.lang.IllegalArgumentException
>> at
>> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
>> ... 20 more
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>
>


Re: java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-25 Thread Abhishek Anand
On changing the default compression codec which is snappy to lzf the errors
are gone !!

How can I fix this using snappy as the codec ?

Is there any downside of using lzf as snappy is the default codec that
ships with spark.


Thanks !!!
Abhi

On Mon, Feb 22, 2016 at 7:42 PM, Abhishek Anand 
wrote:

> Hi ,
>
> I am getting the following exception on running my spark streaming job.
>
> The same job has been running fine since long and when I added two new
> machines to my cluster I see the job failing with the following exception.
>
>
>
> 16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
> (TID 22594)
> java.io.IOException: java.lang.reflect.InvocationTargetException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
> at
> org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
> ... 11 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
> ... 20 more
>
>
>
> Thanks !!!
> Abhi
>


Query Kafka Partitions from Spark SQL

2016-02-23 Thread Abhishek Anand
Is there a way to query the json (or any other format) data stored in kafka
using spark sql by providing the offset range on each of the brokers ?

I just want to be able to query all the partitions in a sq manner.

Thanks !
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Hi Ryan,

Reposting the code.

Basically my use case is something like - I am receiving the web impression
logs and may get the notify (listening from kafka) for those impressions in
the same interval (for me its 1 min) or any next interval (upto 2 hours).
Now, when I receive notify for a particular impression I need to swap the
date field in impression with the date field in notify logs. The notify for
an impression has the same key as impression.

static Function3, State, Tuple2> mappingFunc =
new Function3, State, Tuple2>() {
@Override
public Tuple2 call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1 /*this is impression*/){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3
/*notify for the impression received*/){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
 //swappping the dates
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


return
myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();


Currently I am using reducebykeyandwindow without the inverse function and
I am able to get the correct data. But, issue the might arise is when I
have to restart my application from checkpoint and it repartitions and
computes the previous 120 partitions, which delays the incoming batches.


Thanks !!
Abhi

On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu  wrote:

> Hey Abhi,
>
> Could you post how you use mapWithState? By default, it should do
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from
> checkpointing in some special cases:
> https://issues.apache.org/jira/browse/SPARK-6847
>
> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand 
> wrote:
>
>> Any Insights on this one ?
>>
>>
>> Thanks !!!
>> Abhi
>>
>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand > > wrote:
>>
>>> I am now trying to use mapWithState in the following way using some
>>> example codes. But, by looking at the DAG it does not seem to checkpoint
>>> the state and when restarting the application from checkpoint, it
>>> re-partitions all the previous batches data from kafka.
>>>
>>> static Function3, State,
>>> Tuple2> mappingFunc =
>>> new Function3, State, Tuple2>> MyClass>>() {
>>> @Override
>>> public Tuple2 call(String key, Optional one,
>>> State state) {
>>> MyClass nullObj = new MyClass();
>>> nullObj.setImprLog(null);
>>> nullObj.setNotifyLog(null);
>>> MyClass current = one.or(nullObj);
>>>
>>> if(current!= null && current.getImprLog() != null &&
>>> current.getMyClassType() == 1){
>>> return new Tuple2<>(key, null);
>>> }
>>> else if (current.getNotifyLog() != null  && current.getMyClassType() ==
>>> 3){
>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>> return new Tuple2<>(key, oldState);
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>> }
>>> else{
>>> return new Tuple2<>(key, null);
>>> }
>>>
>>> }
>>> };
>>>
>>>
>>> Please suggest if this is the proper way or am I doing something wrong.
>>>
>>>
>>> Thanks !!
>>> Abhi
>>>
>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
>>> wrote:
>>>
>>>> If you don't want to update your only option will be updateStateByKey
>>>> then
>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>>>>
>>>>> mapWithState supports checkpoint.
>>>>>
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState
>>>>> with KryoSerializer
>>>>>
>>>>> which is in the upcoming 1.6.1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek An

java.io.IOException: java.lang.reflect.InvocationTargetException on new spark machines

2016-02-22 Thread Abhishek Anand
Hi ,

I am getting the following exception on running my spark streaming job.

The same job has been running fine since long and when I added two new
machines to my cluster I see the job failing with the following exception.



16/02/22 19:23:01 ERROR Executor: Exception in task 2.0 in stage 4229.0
(TID 22594)
java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:59)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1254)
... 11 more
Caused by: java.lang.IllegalArgumentException
at
org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
... 20 more



Thanks !!!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-22 Thread Abhishek Anand
Any Insights on this one ?


Thanks !!!
Abhi

On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand 
wrote:

> I am now trying to use mapWithState in the following way using some
> example codes. But, by looking at the DAG it does not seem to checkpoint
> the state and when restarting the application from checkpoint, it
> re-partitions all the previous batches data from kafka.
>
> static Function3, State, Tuple2 MyClass>> mappingFunc =
> new Function3, State, Tuple2 MyClass>>() {
> @Override
> public Tuple2 call(String key, Optional one,
> State state) {
> MyClass nullObj = new MyClass();
> nullObj.setImprLog(null);
> nullObj.setNotifyLog(null);
> MyClass current = one.or(nullObj);
>
> if(current!= null && current.getImprLog() != null &&
> current.getMyClassType() == 1){
> return new Tuple2<>(key, null);
> }
> else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
> MyClass oldState = (state.exists() ? state.get() : nullObj);
> if(oldState!= null && oldState.getNotifyLog() != null){
> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
> return new Tuple2<>(key, oldState);
> }
> else{
> return new Tuple2<>(key, null);
> }
> }
> else{
> return new Tuple2<>(key, null);
> }
>
> }
> };
>
>
> Please suggest if this is the proper way or am I doing something wrong.
>
>
> Thanks !!
> Abhi
>
> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
> wrote:
>
>> If you don't want to update your only option will be updateStateByKey then
>> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>>
>>> mapWithState supports checkpoint.
>>>
>>> There has been some bug fix since release of 1.6.0
>>> e.g.
>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>>> KryoSerializer
>>>
>>> which is in the upcoming 1.6.1
>>>
>>> Cheers
>>>
>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <
>>> abhis.anan...@gmail.com> wrote:
>>>
>>>> Does mapWithState checkpoints the data ?
>>>>
>>>> When my application goes down and is restarted from checkpoint, will
>>>> mapWithState need to recompute the previous batches data ?
>>>>
>>>> Also, to use mapWithState I will need to upgrade my application as I am
>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>>> other work around ?
>>>>
>>>> Cheers!!
>>>> Abhi
>>>>
>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu >>> > wrote:
>>>>
>>>>> Looks like mapWithState could help you?
>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have an use case like follows in my production environment where I
>>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>>> hours.
>>>>>>
>>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>>> but with different value,which might appear in the same batch or some 
>>>>>> next
>>>>>> batch.
>>>>>>
>>>>>> When the key appears second time I need to update a field in value of
>>>>>> previous key with a field in the later key. The keys for which the
>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>
>>>>>> At the end of each second I need to output the result to external
>>>>>> database.
>>>>>>
>>>>>> For example :
>>>>>>
>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>> At t=1sec I am getting
>>>>>> key0,value0(0,"prev0")
>>>>>> key1,value1 (1, "prev1")
>>>>>> key2,value2 (2,"prev2")
>>>>>> key2,value3 (3, "next2")
>>>>>>
>>>>>> Output to database after 1 sec
>>>>>> key2, newValue (2,"next2")
>>>>>>
>>>>>> At t=2 sec getting
>>>>>> key3,value4(4,"prev3")
>>>>>> key1,value5(5,"next1")
>>>>>>
>>>>>> Output to database after 2 sec
>>>>>> key1,newValue(1,"next1")
>>>>>>
>>>>>> At t=3 sec
>>>>>> key4,value6(6,"prev4")
>>>>>> key3,value7(7,"next3")
>>>>>> key5,value5(8,"prev5")
>>>>>> key5,value5(9,"next5")
>>>>>> key0,value0(10,"next0")
>>>>>>
>>>>>> Output to database after 3 sec
>>>>>> key0,newValue(0,"next0")
>>>>>> key3,newValue(4,"next3")
>>>>>> key5,newValue(8,"next5")
>>>>>>
>>>>>>
>>>>>> Please suggest how this can be achieved.
>>>>>>
>>>>>>
>>>>>> Thanks a lot 
>>>>>> Abhi
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>


Spark Streaming with Kafka Use Case

2016-02-17 Thread Abhishek Anand
I have a spark streaming application running in production. I am trying to
find a solution for a particular use case when my application has a
downtime of say 5 hours and is restarted. Now, when I start my streaming
application after 5 hours there would be considerable amount of data then
in the Kafka and my cluster would be unable to repartition and process that.

Is there any workaround so that when my streaming application starts it
starts taking data for 1-2 hours, process it , then take the data for next
1 hour process it. Now when its done processing of previous 5 hours data
which missed, normal streaming should start with the given slide interval.

Please suggest any ideas and feasibility of this.


Thanks !!
Abhi


Re: Worker's BlockManager Folder not getting cleared

2016-02-17 Thread Abhishek Anand
Looking for answer to this.

Is it safe to delete the older files using

find . -type f -cmin +200 -name "shuffle*" -exec rm -rf {} \;

For a window duration of 2 hours how older files can we delete ?

Thanks.

On Sun, Feb 14, 2016 at 12:34 PM, Abhishek Anand 
wrote:

> Hi All,
>
> Any ideas on this one ?
>
> The size of this directory keeps on growing.
>
> I can see there are many files from a day earlier too.
>
> Cheers !!
> Abhi
>
> On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand 
> wrote:
>
>> Hi Adrian,
>>
>> I am running spark in standalone mode.
>>
>> The spark version that I am using is 1.4.0
>>
>> Thanks,
>> Abhi
>>
>> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
>> wrote:
>>
>>> Hi Abhi - are you running on Mesos perchance?
>>>
>>> If so then with spark <1.6 you will be hitting
>>> https://issues.apache.org/jira/browse/SPARK-10975
>>> With spark >= 1.6:
>>> https://issues.apache.org/jira/browse/SPARK-12430
>>> and also be aware of:
>>> https://issues.apache.org/jira/browse/SPARK-12583
>>>
>>>
>>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>>
>>> Hi All,
>>>
>>> How long the shuffle files and data files are stored on the block
>>> manager folder of the workers.
>>>
>>> I have a spark streaming job with window duration of 2 hours and slide
>>> interval of 15 minutes.
>>>
>>> When I execute the following command in my block manager path
>>>
>>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>>
>>> I see a lot of files which means that they are not getting cleared which
>>> I was expecting that they should get cleared.
>>>
>>> Subsequently, this size keeps on increasing and takes space on the disk.
>>>
>>> Please suggest how to get rid of this and help on understanding this
>>> behaviour.
>>>
>>>
>>>
>>> Thanks !!!
>>> Abhi
>>>
>>>
>>> --
>>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>>> <http://www.opensignal.com>
>>> _
>>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>>> Phone #: +44 777-377-8251
>>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>>> _
>>>
>>
>>
>


Re: Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-16 Thread Abhishek Anand
Hi Cody,

I am able to do using this piece of code

kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> {
Date currentBatchTime = new Date();
currentBatchTime.setTime(batchMilliSec.milliseconds());
List r = new ArrayList();
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
for(int partition = 0; partition < offsetRanges.length; partition++){
//Add offsets to the list
}
JavaSparkContext ctx = new JavaSparkContext(rdd.context());
JavaRDD currrentBatchOffsets = ctx.parallelize(r);
//write currrentBatchOffsets rdd to cassandra
return null;
});


Is this the correct way of doing this ?


Thanks !!
Abhi

On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger  wrote:

> You could use sc.parallelize... but the offsets are already available at
> the driver, and they're a (hopefully) small enough amount of data that's
> it's probably more straightforward to just use the normal cassandra client
> to save them from the driver.
>
> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand 
> wrote:
>
>> I have a kafka rdd and I need to save the offsets to cassandra table at
>> the begining of each batch.
>>
>> Basically I need to write the offsets of the type Offsets below that I am
>> getting inside foreachRD, to cassandra. The javafunctions api to write to
>> cassandra needs a rdd. How can I create a rdd from offsets and write to
>> cassandra table.
>>
>>
>> public static void writeOffsets(JavaPairDStream> String> kafkastream){
>> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>> return null;
>> });
>>
>>
>> Thanks !!
>> Abhi
>>
>>
>>
>


Saving Kafka Offsets to Cassandra at begining of each batch in Spark Streaming

2016-02-15 Thread Abhishek Anand
I have a kafka rdd and I need to save the offsets to cassandra table at the
begining of each batch.

Basically I need to write the offsets of the type Offsets below that I am
getting inside foreachRD, to cassandra. The javafunctions api to write to
cassandra needs a rdd. How can I create a rdd from offsets and write to
cassandra table.


public static void writeOffsets(JavaPairDStream kafkastream){
kafkastream.foreachRDD((rdd,batchMilliSec) -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
return null;
});


Thanks !!
Abhi


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-15 Thread Abhishek Anand
I am now trying to use mapWithState in the following way using some example
codes. But, by looking at the DAG it does not seem to checkpoint the state
and when restarting the application from checkpoint, it re-partitions all
the previous batches data from kafka.

static Function3, State, Tuple2> mappingFunc =
new Function3, State, Tuple2>() {
@Override
public Tuple2 call(String key, Optional one,
State state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


Please suggest if this is the proper way or am I doing something wrong.


Thanks !!
Abhi

On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu 
wrote:

> If you don't want to update your only option will be updateStateByKey then
> On 13 Feb 2016 8:48 p.m., "Ted Yu"  wrote:
>
>> mapWithState supports checkpoint.
>>
>> There has been some bug fix since release of 1.6.0
>> e.g.
>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>> KryoSerializer
>>
>> which is in the upcoming 1.6.1
>>
>> Cheers
>>
>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand > > wrote:
>>
>>> Does mapWithState checkpoints the data ?
>>>
>>> When my application goes down and is restarted from checkpoint, will
>>> mapWithState need to recompute the previous batches data ?
>>>
>>> Also, to use mapWithState I will need to upgrade my application as I am
>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>> other work around ?
>>>
>>> Cheers!!
>>> Abhi
>>>
>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu 
>>> wrote:
>>>
>>>> Looks like mapWithState could help you?
>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have an use case like follows in my production environment where I
>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>> hours.
>>>>>
>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>> but with different value,which might appear in the same batch or some next
>>>>> batch.
>>>>>
>>>>> When the key appears second time I need to update a field in value of
>>>>> previous key with a field in the later key. The keys for which the
>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>
>>>>> At the end of each second I need to output the result to external
>>>>> database.
>>>>>
>>>>> For example :
>>>>>
>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>> At t=1sec I am getting
>>>>> key0,value0(0,"prev0")
>>>>> key1,value1 (1, "prev1")
>>>>> key2,value2 (2,"prev2")
>>>>> key2,value3 (3, "next2")
>>>>>
>>>>> Output to database after 1 sec
>>>>> key2, newValue (2,"next2")
>>>>>
>>>>> At t=2 sec getting
>>>>> key3,value4(4,"prev3")
>>>>> key1,value5(5,"next1")
>>>>>
>>>>> Output to database after 2 sec
>>>>> key1,newValue(1,"next1")
>>>>>
>>>>> At t=3 sec
>>>>> key4,value6(6,"prev4")
>>>>> key3,value7(7,"next3")
>>>>> key5,value5(8,"prev5")
>>>>> key5,value5(9,"next5")
>>>>> key0,value0(10,"next0")
>>>>>
>>>>> Output to database after 3 sec
>>>>> key0,newValue(0,"next0")
>>>>> key3,newValue(4,"next3")
>>>>> key5,newValue(8,"next5")
>>>>>
>>>>>
>>>>> Please suggest how this can be achieved.
>>>>>
>>>>>
>>>>> Thanks a lot 
>>>>> Abhi
>>>>>
>>>>>
>>>>>
>>>
>>


Re: Worker's BlockManager Folder not getting cleared

2016-02-13 Thread Abhishek Anand
Hi All,

Any ideas on this one ?

The size of this directory keeps on growing.

I can see there are many files from a day earlier too.

Cheers !!
Abhi

On Tue, Jan 26, 2016 at 7:13 PM, Abhishek Anand 
wrote:

> Hi Adrian,
>
> I am running spark in standalone mode.
>
> The spark version that I am using is 1.4.0
>
> Thanks,
> Abhi
>
> On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
> wrote:
>
>> Hi Abhi - are you running on Mesos perchance?
>>
>> If so then with spark <1.6 you will be hitting
>> https://issues.apache.org/jira/browse/SPARK-10975
>> With spark >= 1.6:
>> https://issues.apache.org/jira/browse/SPARK-12430
>> and also be aware of:
>> https://issues.apache.org/jira/browse/SPARK-12583
>>
>>
>> On 25/01/2016 07:14, Abhishek Anand wrote:
>>
>> Hi All,
>>
>> How long the shuffle files and data files are stored on the block manager
>> folder of the workers.
>>
>> I have a spark streaming job with window duration of 2 hours and slide
>> interval of 15 minutes.
>>
>> When I execute the following command in my block manager path
>>
>> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>>
>> I see a lot of files which means that they are not getting cleared which
>> I was expecting that they should get cleared.
>>
>> Subsequently, this size keeps on increasing and takes space on the disk.
>>
>> Please suggest how to get rid of this and help on understanding this
>> behaviour.
>>
>>
>>
>> Thanks !!!
>> Abhi
>>
>>
>> --
>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>> <http://www.opensignal.com>
>> _
>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>> Phone #: +44 777-377-8251
>> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>
>>   |  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
>> _
>>
>
>


Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-13 Thread Abhishek Anand
Does mapWithState checkpoints the data ?

When my application goes down and is restarted from checkpoint, will
mapWithState need to recompute the previous batches data ?

Also, to use mapWithState I will need to upgrade my application as I am
using version 1.4.0 and mapWithState isnt supported there. Is there any
other work around ?

Cheers!!
Abhi

On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu 
wrote:

> Looks like mapWithState could help you?
> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" 
> wrote:
>
>> Hi All,
>>
>> I have an use case like follows in my production environment where I am
>> listening from kafka with slideInterval of 1 min and windowLength of 2
>> hours.
>>
>> I have a JavaPairDStream where for each key I am getting the same key but
>> with different value,which might appear in the same batch or some next
>> batch.
>>
>> When the key appears second time I need to update a field in value of
>> previous key with a field in the later key. The keys for which the
>> combination keys do not come should be rejected after 2 hours.
>>
>> At the end of each second I need to output the result to external
>> database.
>>
>> For example :
>>
>> Suppose valueX is object of MyClass with fields int a, String b
>> At t=1sec I am getting
>> key0,value0(0,"prev0")
>> key1,value1 (1, "prev1")
>> key2,value2 (2,"prev2")
>> key2,value3 (3, "next2")
>>
>> Output to database after 1 sec
>> key2, newValue (2,"next2")
>>
>> At t=2 sec getting
>> key3,value4(4,"prev3")
>> key1,value5(5,"next1")
>>
>> Output to database after 2 sec
>> key1,newValue(1,"next1")
>>
>> At t=3 sec
>> key4,value6(6,"prev4")
>> key3,value7(7,"next3")
>> key5,value5(8,"prev5")
>> key5,value5(9,"next5")
>> key0,value0(10,"next0")
>>
>> Output to database after 3 sec
>> key0,newValue(0,"next0")
>> key3,newValue(4,"next3")
>> key5,newValue(8,"next5")
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>> Thanks a lot 
>> Abhi
>>
>>
>>


Stateful Operation on JavaPairDStream Help Needed !!

2016-02-11 Thread Abhishek Anand
Hi All,

I have an use case like follows in my production environment where I am
listening from kafka with slideInterval of 1 min and windowLength of 2
hours.

I have a JavaPairDStream where for each key I am getting the same key but
with different value,which might appear in the same batch or some next
batch.

When the key appears second time I need to update a field in value of
previous key with a field in the later key. The keys for which the
combination keys do not come should be rejected after 2 hours.

At the end of each second I need to output the result to external database.

For example :

Suppose valueX is object of MyClass with fields int a, String b
At t=1sec I am getting
key0,value0(0,"prev0")
key1,value1 (1, "prev1")
key2,value2 (2,"prev2")
key2,value3 (3, "next2")

Output to database after 1 sec
key2, newValue (2,"next2")

At t=2 sec getting
key3,value4(4,"prev3")
key1,value5(5,"next1")

Output to database after 2 sec
key1,newValue(1,"next1")

At t=3 sec
key4,value6(6,"prev4")
key3,value7(7,"next3")
key5,value5(8,"prev5")
key5,value5(9,"next5")
key0,value0(10,"next0")

Output to database after 3 sec
key0,newValue(0,"next0")
key3,newValue(4,"next3")
key5,newValue(8,"next5")


Please suggest how this can be achieved.


Thanks a lot 
Abhi


Re: Repartition taking place for all previous windows even after checkpointing

2016-02-01 Thread Abhishek Anand
Any insights on this ?


On Fri, Jan 29, 2016 at 1:08 PM, Abhishek Anand 
wrote:

> Hi All,
>
> Can someone help me with the following doubts regarding checkpointing :
>
> My code flow is something like follows ->
>
> 1) create direct stream from kafka
> 2) repartition kafka stream
> 3)  mapToPair followed by reduceByKey
> 4)  filter
> 5)  reduceByKeyAndWindow without the inverse function
> 6)  write to cassandra
>
> Now when I restart my application from checkpoint, I see repartition and
> other steps being called for the previous windows which takes longer and
> delays my aggregations.
>
> My understanding  was that once data checkpointing is done it should not
> re-read from kafka and use the saved RDDs but guess I am wrong.
>
> Is there a way to avoid the repartition or any workaround for this.
>
> Spark Version is 1.4.0
>
> Cheers !!
> Abhi
>


Repartition taking place for all previous windows even after checkpointing

2016-01-28 Thread Abhishek Anand
Hi All,

Can someone help me with the following doubts regarding checkpointing :

My code flow is something like follows ->

1) create direct stream from kafka
2) repartition kafka stream
3)  mapToPair followed by reduceByKey
4)  filter
5)  reduceByKeyAndWindow without the inverse function
6)  write to cassandra

Now when I restart my application from checkpoint, I see repartition and
other steps being called for the previous windows which takes longer and
delays my aggregations.

My understanding  was that once data checkpointing is done it should not
re-read from kafka and use the saved RDDs but guess I am wrong.

Is there a way to avoid the repartition or any workaround for this.

Spark Version is 1.4.0

Cheers !!
Abhi


Re: Worker's BlockManager Folder not getting cleared

2016-01-26 Thread Abhishek Anand
Hi Adrian,

I am running spark in standalone mode.

The spark version that I am using is 1.4.0

Thanks,
Abhi

On Tue, Jan 26, 2016 at 4:10 PM, Adrian Bridgett 
wrote:

> Hi Abhi - are you running on Mesos perchance?
>
> If so then with spark <1.6 you will be hitting
> https://issues.apache.org/jira/browse/SPARK-10975
> With spark >= 1.6:
> https://issues.apache.org/jira/browse/SPARK-12430
> and also be aware of:
> https://issues.apache.org/jira/browse/SPARK-12583
>
>
> On 25/01/2016 07:14, Abhishek Anand wrote:
>
> Hi All,
>
> How long the shuffle files and data files are stored on the block manager
> folder of the workers.
>
> I have a spark streaming job with window duration of 2 hours and slide
> interval of 15 minutes.
>
> When I execute the following command in my block manager path
>
> find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;
>
> I see a lot of files which means that they are not getting cleared which I
> was expecting that they should get cleared.
>
> Subsequently, this size keeps on increasing and takes space on the disk.
>
> Please suggest how to get rid of this and help on understanding this
> behaviour.
>
>
>
> Thanks !!!
> Abhi
>
>
> --
> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
> <http://www.opensignal.com>
> _
> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
> Phone #: +44 777-377-8251
> Skype: abridgett  |  @adrianbridgett <http://twitter.com/adrianbridgett>  |
>  LinkedIn link  <https://uk.linkedin.com/in/abridgett>
> _
>


Worker's BlockManager Folder not getting cleared

2016-01-24 Thread Abhishek Anand
Hi All,

How long the shuffle files and data files are stored on the block manager
folder of the workers.

I have a spark streaming job with window duration of 2 hours and slide
interval of 15 minutes.

When I execute the following command in my block manager path

find . -type f -cmin +150 -name "shuffle*" -exec ls {} \;

I see a lot of files which means that they are not getting cleared which I
was expecting that they should get cleared.

Subsequently, this size keeps on increasing and takes space on the disk.

Please suggest how to get rid of this and help on understanding this
behaviour.



Thanks !!!
Abhi


Getting kafka offsets at beginning of spark streaming application

2016-01-11 Thread Abhishek Anand
Hi,

Is there a way so that I can fetch the offsets from where the spark
streaming starts reading from Kafka when my application starts ?

What I am trying is to create an initial RDD with offsest at a particular
time passed as input from the command line and the offsets from where my
spark streaming starts.

Eg -

Partition 0 -> 1000 to (offset at which my spark streaming starts)

Thanks !!


Error on using updateStateByKey

2015-12-18 Thread Abhishek Anand
I am trying to use updateStateByKey but receiving the following error.
(Spark Version 1.4.0)

Can someone please point out what might be the possible reason for this
error.


*The method
updateStateByKey(Function2,Optional,Optional>)
in the type JavaPairDStream is not applicable
for the arguments *
* 
(Function2,Optional,Optional>)*


This is the update function that I am using inside updateStateByKey.

I am applying updateStateByKey on a tuple of 

private static Function2,
Optional, Optional> updateFunction =
new Function2, Optional,
Optional>() {
/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public Optional call(List values,
Optional current) {
AggregationMetrics newSum = current.or(new AggregationMetrics(0L, 0L, 0L));
for(int i=0; i < values.size(); i++)
{
//set with new values
}
return Optional.of(newSum);
}
};



Thanks,
Abhi


Re: Unable to use "Batch Start Time" on worker nodes.

2015-11-30 Thread Abhishek Anand
Thanks TD !!

I think this should solve my purpose.




On Sun, Nov 29, 2015 at 6:17 PM, Tathagata Das  wrote:

> You can get the batch start (the expected, not the exact time when the
> jobs are submitted) from DStream operation "transform". There is a version
> of transform that allows you specify a function with two params - the
> parent RDD and the batch time at which the RDD was generated.
>
> TD
>
> On Thu, Nov 26, 2015 at 1:33 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I need to use batch start time in my spark streaming job.
>>
>> I need the value of batch start time inside one of the functions that is
>> called within a flatmap function in java.
>>
>> Please suggest me how this can be done.
>>
>> I tried to use the StreamingListener class and set the value of a
>> variable inside the onBatchSubmitted function something like this :
>>
>> public void onBatchSubmitted(StreamingListenerBatchSubmitted
>> batchSubmitted) { batchstarttime =
>> batchSubmitted.batchInfo().batchTime().milliseconds();
>>   CommandLineArguments.BATCH_START_TIME = batchstarttime;
>>  }
>>
>>
>> But, the issue is that the BATCH_START_TIME set only when the batch
>> starts. I see in the worker logs that BATCH_START_TIME takes the default
>> value and is not set.
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>>
>> BR,
>> Abhi
>>
>
>


Unable to use "Batch Start Time" on worker nodes.

2015-11-26 Thread Abhishek Anand
Hi ,

I need to use batch start time in my spark streaming job.

I need the value of batch start time inside one of the functions that is
called within a flatmap function in java.

Please suggest me how this can be done.

I tried to use the StreamingListener class and set the value of a variable
inside the onBatchSubmitted function something like this :

public void onBatchSubmitted(StreamingListenerBatchSubmitted
batchSubmitted) { batchstarttime =
batchSubmitted.batchInfo().batchTime().milliseconds();
  CommandLineArguments.BATCH_START_TIME = batchstarttime;
 }


But, the issue is that the BATCH_START_TIME set only when the batch starts.
I see in the worker logs that BATCH_START_TIME takes the default value and
is not set.


Please suggest how this can be achieved.



BR,
Abhi


Getting the batch time of the active batches in spark streaming

2015-11-24 Thread Abhishek Anand
Hi ,

I need to get the batch time of the active batches which appears on the UI
of spark streaming tab,

How can this be achieved in Java ?

BR,
Abhi


External Table not getting updated from parquet files written by spark streaming

2015-11-19 Thread Abhishek Anand
Hi ,

I am using spark streaming to write the aggregated output as parquet files
to the hdfs using SaveMode.Append. I have an external table created like :


CREATE TABLE if not exists rolluptable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "hdfs:"
);

I had an impression that in case of external table the queries should fetch
the data from newly parquet added files also. But, seems like the newly
written files are not being picked up.

Dropping and recreating the table every time works fine but not a solution.


Please suggest how can my table have the data from newer files also.



Thanks !!
Abhi


unsubscribe

2015-11-18 Thread VJ Anand
-- 
*VJ Anand*
*Founder *
*Sankia*
vjan...@sankia.com
925-640-1340
www.sankia.com

*Confidentiality Notice*: This e-mail message, including any attachments,
is for the sole use of the intended recipient(s) and may contain
confidential and privileged information. Any unauthorized review, use,
disclosure or distribution is prohibited. If you are not the intended
recipient, please contact the sender by reply e-mail and destroy all copies
of the original message


SparkSQL on hive error

2015-10-27 Thread Anand Nalya
Hi,

I've a partitioned table in Hive (Avro) that I can query alright from hive
cli.

When using SparkSQL, I'm able to query some of the partitions, but getting
exception on some of the partitions.

The query is:

sqlContext.sql("select * from myTable where source='http' and date =
20150812").take(5).foreach(println)

The exception is:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage
2.0 (TID 5, node1): java.lang.IllegalArgumentException: Error: type
expected at the position 0 of
'BIGINT:INT:INT:INT:INT:string:INT:string:string:string:string:string:string:string:string:string:string:string:string:string:string:INT:INT:string:BIGINT:string:string:BIGINT:BIGINT:string:string:string:string:string:FLOAT:FLOAT:string:string:string:string:BIGINT:BIGINT:string:string:string:string:string:string:BIGINT:string:string'
but 'BIGINT' is found.
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:348)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:392)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)
at
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:762)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:105)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:191)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$4$$anonfun$9.apply(TableReader.scala:188)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Any pointers, what might be wrong here?

Regards,
Anand


No suitable Constructor found while compiling

2015-10-18 Thread VJ Anand
I am trying to extend RDD in java, and when I call the parent constructor,
it gives the error: no suitable constructor found for RDD (SparkContext,
Seq, ClassTag).

 Here is the snippet of the code:

class QueryShard extends RDD {



sc (sc, (Seq)new ArrayBuffer>,
ClassTag$.MODULE$.apply(Tuple.class);

}

Compiler error: no suitable constructor found for RDD (SparkContext, Seq,
ClassTag)

Any thoughts? pointers..

Thanks
VJ


Re: Join Order Optimization

2015-10-11 Thread VJ Anand
Hi - Is there a design document for those operations that have been
implemented in 1.4.0? if so,where can I find them
-VJ

On Sun, Oct 11, 2015 at 7:27 PM, Cheng, Hao  wrote:

> Yes, I think the SPARK-2211 should be the right place to follow the CBO
> stuff, but probably that will not happen right away.
>
>
>
> The jira issue introduce the statistic info can be found at:
>
> https://issues.apache.org/jira/browse/SPARK-2393
>
>
>
> Hao
>
>
>
> *From:* Raajay [mailto:raaja...@gmail.com]
> *Sent:* Monday, October 12, 2015 10:17 AM
> *To:* Cheng, Hao
> *Cc:* user@spark.apache.org
> *Subject:* Re: Join Order Optimization
>
>
>
> Hi Cheng,
>
> Could you point me to the JIRA that introduced this change ?
>
>
> Also, is this SPARK-2211 the right issue to follow for cost-based
> optimization?
>
> Thanks
>
> Raajay
>
>
>
>
>
> On Sun, Oct 11, 2015 at 7:57 PM, Cheng, Hao  wrote:
>
> Spark SQL supports very basic join reordering optimization, based on the
> raw table data size, this was added couple major releases back.
>
>
>
> And the “EXPLAIN EXTENDED query” command is a very informative tool to
> verify whether the optimization taking effect.
>
>
>
> *From:* Raajay [mailto:raaja...@gmail.com]
> *Sent:* Sunday, October 11, 2015 9:22 AM
> *To:* user@spark.apache.org
> *Subject:* Join Order Optimization
>
>
>
> Hello,
>
> Does Spark-SQL support join order optimization as of the 1.5.1 release ?
> From the release notes, I did not see support for this feature, but figured
> will ask the users-list to be sure.
>
> Thanks
>
> Raajay
>
>
>



-- 
*VJ Anand*
*Founder *
*Sankia*
vjan...@sankia.com
925-640-1340
www.sankia.com

*Confidentiality Notice*: This e-mail message, including any attachments,
is for the sole use of the intended recipient(s) and may contain
confidential and privileged information. Any unauthorized review, use,
disclosure or distribution is prohibited. If you are not the intended
recipient, please contact the sender by reply e-mail and destroy all copies
of the original message


Custom RDD for Proprietary MPP database

2015-10-05 Thread VJ Anand
Hi,

I need to build a RDD that supports a custom built Database (Which is
sharded) across several nodes. I need to build an RDD that can support and
provide the partitions specific to this database.
I would like to do this in Java - I see there are JavaRDD, and other
specific RDD available - my question, is if I subclass or extend this RDD -
can I override the getPartitions, and other methods? Or is there any other
alternative? Any help or pointers much appreciated

Thanks
VJ


Re: Checkpoint file not found

2015-08-03 Thread Anand Nalya
Hi,

Its an application that maintains some state from the DStream using
updateStateByKey() operation. It then selects some of the records from
current batch using some criteria over current values and the state and
carries over the remaining values to next batch.

Following is the pseudo code :
var pending = emptyRDD
val dstream = kafkaStream
val stateStream = dstream.updateStateByKey(myfunc, partitioner,
initialState)
val joinedStream = dstream.transformWith(sumstream, transformer(pending) _ )
val toUpdate = joinedStream.flter(myfilter).saveToES()
val toNotUpdate = joinedStream.filter(notFilter).checkpoint(interval)

toNotUpdate.foreachRDD(rdd =>
pending = rdd
)

Thanks

On 3 August 2015 at 13:09, Tathagata Das  wrote:

> Can you tell us more about streaming app? DStream operation that you are
> using?
>
> On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya  wrote:
>
>> Hi,
>>
>> I'm writing a Streaming application in Spark 1.3. After running for some
>> time, I'm getting following execption. I'm sure, that no other process is
>> modifying the hdfs file. Any idea, what might be the cause of this?
>>
>> 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop:
>> DAGSchedulerEventProcessLoop failed; shutting down SparkContext
>> java.io.FileNotFoundException: File does not exist:
>> hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124)
>> at
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124)
>> at
>> org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
>> at scala.Option.map(Option.scala:145)
>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spa

Checkpoint file not found

2015-08-02 Thread Anand Nalya
Hi,

I'm writing a Streaming application in Spark 1.3. After running for some
time, I'm getting following execption. I'm sure, that no other process is
modifying the hdfs file. Any idea, what might be the cause of this?

15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop:
DAGSchedulerEventProcessLoop failed; shutting down SparkContext
java.io.FileNotFoundException: File does not exist:
hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124)
at
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferre

Re: updateStateByKey schedule time

2015-07-21 Thread Anand Nalya
I also ran into a similar use case. Is this possible?

On 15 July 2015 at 18:12, Michel Hubert  wrote:

>  Hi,
>
>
>
>
>
> I want to implement a time-out mechanism in de updateStateByKey(…)
> routine.
>
>
>
> But is there a way the retrieve the time of the start of the batch
> corresponding to the call to my updateStateByKey routines?
>
>
>
> Suppose the streaming has build up some delay then a 
> System.currentTimeMillis()
> will not be the time of the time the batch was scheduled.
>
>
>
> I want to retrieve the job/task schedule time of the batch for which my 
> updateStateByKey(..)
> routine is called.
>
>
>
> Is this possible?
>
>
>
> With kind regards,
>
> Michel Hubert
>
>
>
>
>


Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-10 Thread Anand Nalya
Thanks for the help Dean/TD,

I was able to cut the lineage with checkpointing with following code:

dstream.countByValue().foreachRDD((rdd, time) => {
val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base)
val toUpdate = joined.filter(myfilter).map(mymap)
val toNotUpdate = joined.filter(mynotfilter).map(mymap)

base = base.union(toUpdate).reduceByKey(_+_, 2)
current = toNotUpdate

if(time.isMultipleOf(duration)){
  base.checkpoint()
  current.checkpoint()
}
println(toUpdate.count()) // to persistence
  })

Thanks,
Anand

On 10 July 2015 at 02:16, Tathagata Das  wrote:

> Summarizing the main problems discussed by Dean
>
> 1. If you have an infinitely growing lineage, bad things will eventually
> happen. You HAVE TO periodically (say every 10th batch), checkpoint the
> information.
>
> 2. Unpersist the previous `current` RDD ONLY AFTER running an action on
> the `newCurrent`. Otherwise you are throwing current out of the cache
> before newCurrent has been computed. Modifying Dean's example.
>
> val newCurrent = rdd.union(current).reduceByKey(_+_)
> ...
> // join with newCurrent
> // collect or count or any action that uses newCurrent
> //
>
> // Now you can unpersist because the newCurrent has been persisted and
> wont require falling back to this cached current RDD.
> current.unpersist()
>
>
> On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler 
> wrote:
>
>> I think you're complicating the cache behavior by aggressively re-using
>> vars when temporary vals would be more straightforward. For example,
>> newBase = newBase.unpersist()... effectively means that newBase's data is
>> not actually cached when the subsequent .union(...) is performed, so it
>> probably goes back to the lineage... Same with the current.unpersist logic
>> before it.
>>
>> Names are cheap, so just use local vals:
>>
>> val newCurrent = rdd.union(current).reduceByKey(_+_)
>> current.unpersist()
>>
>> Also, what happens if you omit the "2" argument for the number of
>> partitions in reduceByKey?
>>
>> Other minor points:
>>
>> I would change the joined, toUpdate, toNotUpdate logic to this:
>>
>> val joined = current.leftOuterJoin(newBase).map(mymap).cache()
>>
>> val toUpdate = joined.filter(myfilter).cache()
>> val toNotUpdate = joined.filter(mynotfilter).cache()
>>
>>
>> Maybe it's just for this email example, but you don't need to call
>> collect on toUpdate before using foreach(println). If the RDD is huge, you
>> definitely don't want to do that.
>>
>> Hope this helps.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya 
>> wrote:
>>
>>> Yes, myRDD is outside of DStream. Following is the actual code where newBase
>>> and current are the rdds being updated with each batch:
>>>
>>>   val base = sc.textFile...
>>>   var newBase = base.cache()
>>>
>>>   val dstream: DStream[String] = ssc.textFileStream...
>>>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>>>
>>>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>>>
>>> current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>>>
>>> val joined = current.leftOuterJoin(newBase).cache()
>>> val toUpdate = joined.filter(myfilter).map(mymap).cache()
>>> val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>>>
>>> toUpdate.collect().foreach(println) // this goes to some store
>>>
>>> newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
>>> 2).cache()
>>>
>>> current = toNotUpdate.cache()
>>>
>>> toUpdate.unpersist()
>>> joined.unpersist()
>>> rdd.unpersist()
>>>   })
>>>
>>>
>>> Regards,
>>>
>>> Anand
>>>
>>>
>>> On 9 July 2015 at 18:16, Dean Wampler  wrote:
>>>
>>>> Is myRDD outside a DStream? If so are you persisting on each batch
>>>> iteration? It should be checkpointed frequently too.
>>>>
>>>> Dean Wampler, Ph.D.
>>>> Author: Programming Scala, 2nd Edition
>>>> <http://shop.oreilly.com/produ

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {

current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

val joined = current.leftOuterJoin(newBase).cache()
val toUpdate = joined.filter(myfilter).map(mymap).cache()
val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

toUpdate.collect().foreach(println) // this goes to some store

newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

current = toNotUpdate.cache()

toUpdate.unpersist()
joined.unpersist()
rdd.unpersist()
  })


Regards,

Anand


On 9 July 2015 at 18:16, Dean Wampler  wrote:

> Is myRDD outside a DStream? If so are you persisting on each batch
> iteration? It should be checkpointed frequently too.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya  wrote:
>
>> The data coming from dstream have the same keys that are in myRDD, so the 
>> reduceByKey
>> after union keeps the overall tuple count in myRDD fixed. Or even with
>> fixed tuple count, it will keep consuming more resources?
>>
>> On 9 July 2015 at 16:19, Tathagata Das  wrote:
>>
>>> If you are continuously unioning RDDs, then you are accumulating ever
>>> increasing data, and you are processing ever increasing amount of data in
>>> every batch. Obviously this is going to not last for very long. You
>>> fundamentally cannot keep processing ever increasing amount of data with
>>> finite resources, isnt it?
>>>
>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya 
>>> wrote:
>>>
>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>
>>>> On 9 July 2015 at 15:35, Michel Hubert  wrote:
>>>>
>>>>>  Hi,
>>>>>
>>>>>
>>>>>
>>>>> I was just wondering how you generated to second image with the charts.
>>>>>
>>>>> What product?
>>>>>
>>>>>
>>>>>
>>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>> *To:* spark users
>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I've an application in which an rdd is being updated with tuples
>>>>> coming from RDDs in a DStream with following pattern.
>>>>>
>>>>>
>>>>>
>>>>> dstream.foreachRDD(rdd => {
>>>>>
>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>> number of stages grow big enough, the overall delay due to scheduling 
>>>>> delay
>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>
>>>>>
>>>>>
>>>>> Following figures illustrate the problem:
>>>>>
>>>>>
>>>>>
>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Is there some pattern that I can use to avoid this?
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Anand
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
The data coming from dstream have the same keys that are in myRDD, so
the reduceByKey
after union keeps the overall tuple count in myRDD fixed. Or even with
fixed tuple count, it will keep consuming more resources?

On 9 July 2015 at 16:19, Tathagata Das  wrote:

> If you are continuously unioning RDDs, then you are accumulating ever
> increasing data, and you are processing ever increasing amount of data in
> every batch. Obviously this is going to not last for very long. You
> fundamentally cannot keep processing ever increasing amount of data with
> finite resources, isnt it?
>
> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya  wrote:
>
>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>
>> On 9 July 2015 at 15:35, Michel Hubert  wrote:
>>
>>>  Hi,
>>>
>>>
>>>
>>> I was just wondering how you generated to second image with the charts.
>>>
>>> What product?
>>>
>>>
>>>
>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
>>> *Sent:* donderdag 9 juli 2015 11:48
>>> *To:* spark users
>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I've an application in which an rdd is being updated with tuples coming
>>> from RDDs in a DStream with following pattern.
>>>
>>>
>>>
>>> dstream.foreachRDD(rdd => {
>>>
>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>
>>> })
>>>
>>>
>>>
>>> I'm using cache() and checkpointin to cache results. Over the time, the
>>> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
>>> increasing, even though all the earlier stages are skipped. When the number
>>> of stages grow big enough, the overall delay due to scheduling delay starts
>>> increasing. The processing time for each batch is still fixed.
>>>
>>>
>>>
>>> Following figures illustrate the problem:
>>>
>>>
>>>
>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>
>>> [image: Image removed by sender.]
>>>
>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>
>>> [image: Image removed by sender.]
>>>
>>> Is there some pattern that I can use to avoid this?
>>>
>>>
>>>
>>> Regards,
>>>
>>> Anand
>>>
>>
>>
>


Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Thats from the Streaming tab for Spark 1.4 WebUI.

On 9 July 2015 at 15:35, Michel Hubert  wrote:

>  Hi,
>
>
>
> I was just wondering how you generated to second image with the charts.
>
> What product?
>
>
>
> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
> *Sent:* donderdag 9 juli 2015 11:48
> *To:* spark users
> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>
>
>
> Hi,
>
>
>
> I've an application in which an rdd is being updated with tuples coming
> from RDDs in a DStream with following pattern.
>
>
>
> dstream.foreachRDD(rdd => {
>
>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>
> })
>
>
>
> I'm using cache() and checkpointin to cache results. Over the time, the
> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
> increasing, even though all the earlier stages are skipped. When the number
> of stages grow big enough, the overall delay due to scheduling delay starts
> increasing. The processing time for each batch is still fixed.
>
>
>
> Following figures illustrate the problem:
>
>
>
> Job execution: https://i.imgur.com/GVHeXH3.png?1
>
> [image: Image removed by sender.]
>
> Delays: https://i.imgur.com/1DZHydw.png?1
>
> [image: Image removed by sender.]
>
> Is there some pattern that I can use to avoid this?
>
>
>
> Regards,
>
> Anand
>


Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Hi,

I've an application in which an rdd is being updated with tuples coming
from RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd => {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the
lineage of myRDD keeps increasing and stages in each batch of dstream keeps
increasing, even though all the earlier stages are skipped. When the number
of stages grow big enough, the overall delay due to scheduling delay starts
increasing. The processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1


Delays: https://i.imgur.com/1DZHydw.png?1


Is there some pattern that I can use to avoid this?

Regards,
Anand


[no subject]

2015-07-07 Thread Anand Nalya
Hi,

Suppose I have an RDD that is loaded from some file and then I also have a
DStream that has data coming from some stream. I want to keep union some of
the tuples from the DStream into my RDD. For this I can use something like
this:

  var myRDD: RDD[(String, Long)] = sc.fromText...
  dstream.foreachRDD{ rdd =>
myRDD = myRDD.union(rdd.filter(myfilter))
  }

My questions is that for how long spark will keep RDDs underlying the
dstream around? Is there some configuratoin knob that can control that?

Regards,
Anand


Split RDD into two in a single pass

2015-07-06 Thread Anand Nalya
Hi,

I've a RDD which I want to split into two disjoint RDDs on with a boolean
function. I can do this with the following

val rdd1 = rdd.filter(f)
val rdd2 = rdd.filter(fnot)

I'm assuming that each of the above statement will traverse the RDD once
thus resulting in 2 passes.

Is there a way of doing this in a single pass over the RDD so that when f
returns true, the element goes to rdd1 and to rdd2 otherwise.

Regards,
Anand


Array fields in dataframe.write.jdbc

2015-07-02 Thread Anand Nalya
Hi,

I'm using spark 1.4. I've a array field in my data frame and when I'm
trying to write this dataframe to postgres, I'm getting the following
exception:

Exception in thread "main" java.lang.IllegalArgumentException: Can't
translate null value for field
StructField(filter,ArrayType(StringType,false),true)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply$mcI$sp(jdbc.scala:182)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:168)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:167)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.saveTable(jdbc.scala:167)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:258)
at
analytics.spark.summarizer.SparkBatchSummarizer.start(SparkBatchSummarizer.scala:155)

The schema for the dataframe is:

val schema = StructType(Seq(
StructField("ts", LongType, false),
StructField("filter", DataTypes.createArrayType(StringType, false),
true),
StructField("sort_by", StringType, true),
StructField("user_type", StringType, true),
StructField("count", LongType, false)
))

Sample dataframe contents:

+--+---+---+-+-+
|ts| filter|sort_by|user_type|count|
+--+---+---+-+-+
|1435052400|List(s)|abc| null|1|
|1435065300|List(s)|abc| null|1|
+--+---+---+-+-+

org.apache.spark.sql.jdbc.JDBCWriteDetails#saveTable has the following
definition which does not have the array type handling.

  def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties = new Properties()) {
  val dialect = JdbcDialects.get(url)
  val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
  field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case DecimalType.Unlimited => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
  s"Can't translate null value for field $field")
  })
  }

  val rddSchema = df.schema
  df.foreachPartition { iterator =>
JDBCWriteDetails.savePartition(url, table, iterator, rddSchema,
nullTypes, properties)
  }
}

Is there some way of getting arrays working for now?

Thanks,
Anand


Re: Cassandra Connection Issue with Spark-jobserver

2015-04-27 Thread Anand
I was able to fix the issues by providing right version of cassandra-all and
thrift libraries 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587p22664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark1.3.1 using mysql error!

2015-04-25 Thread Anand Mohan
Yes, you would need to add the MySQL driver jar to the Spark driver &
executor classpath.
Either using the deprecated SPARK_CLASSPATH environment variable (which the
latest docs still recommend anyway although its deprecated) like so
>export SPARK_CLASSPATH=/usr/share/java/mysql-connector.jar
>spark-shell or spark-sql

The other un-deprecated way is to set the below variables in
spark-defaults.conf
spark.driver.extraClassPath  /usr/share/java/mysql-connector.jar
spark.executor.extraClassPath  /usr/share/java/mysql-connector.jar

and then either run spark-shell or spark-sql or start-thriftserver and
beeline

Things would be better once SPARK-6966 is merged into 1.4.0 when you can use 
1. use the --jars parameter for spark-shell, spark-sql, etc or
2. sc.addJar to add the driver after starting spark-shell.

Good Luck,
Anand Mohan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-3-1-using-mysql-error-tp22643p22658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cassandra Connection Issue with Spark-jobserver

2015-04-21 Thread Anand
RROR]   at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR]   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
job-server[ERROR]   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
job-server[ERROR]   at java.lang.Thread.run(Thread.java:745)
job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
job-server[ERROR]   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
job-server[ERROR]   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
job-server[ERROR]   at java.security.AccessController.doPrivileged(Native
Method)
job-server[ERROR]   at
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
job-server[ERROR]   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
job-server[ERROR]   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
job-server[ERROR]   ... 8 more

*I have already added the $EXTRA_JAR variable to my
cassandra-spark-connector-assembly.

Regards,
Anand*




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OutOfMemory error in Spark Core

2015-01-15 Thread Anand Mohan
We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray.
We are using Kryo serializer for the Avro objects read from Parquet and we
are using our custom Kryo registrator (along the lines of  ADAM

 
, we just added batched writes and flushes to Kryo's Output for each 512 MB
in the stream, as below 
outstream.array.sliding(512MB).foreach(buf => {
  kryoOut.write(buf)
  kryoOut.flush()
})
)

Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after 
1. loading bulk data from Parquet
2. union-ing it with incremental data in Avro
3. doing timestamp based duplicate removal (including partitioning in
reduceByKey) and 
4. joining a couple of MySQL tables using JdbcRdd

Of late, we are seeing major instabilities where the app crashes on a lost
executor which itself failed due to a OutOfMemory error as below. This looks
almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even
though we are seeing this error in Spark 1.1

2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR
org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught
exception in thread Thread[handle-message-executor-13,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.write(Output.java:183)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)
at
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)
at
org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421)
at
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387)
at
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)


The driver log is as below

15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed; shutting down SparkContext
java.util.NoSuchElementException: key not found: 2539
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage

Re: Any patterns for multiplexing the streaming data

2014-11-07 Thread Anand Iyer
Hi TD,

This is a common pattern that is emerging today. Kafka --> SS --> Kafka.

Spark Streaming comes with a built in consumer to read from Kafka. It will
be great to have an easy way for users to write back to Kafka without
having to code a customer producer using the Kafka Producert APIs.

Are there any plans to commit the code in the above github repo? If so, do
you have a rough estimate of when.

Thanks,

Anand

On Fri, Nov 7, 2014 at 1:25 PM, Tathagata Das 
wrote:

> I am not aware of any obvious existing pattern that does exactly this.
> Generally this sort of computation (subset, denormalization) things are so
> generic sounding terms but actually have very specific requirements that it
> hard to refer to a design pattern without more requirement info.
>
> If you want to feed back to kafka, you can take a look at this pull request
>
> https://github.com/apache/spark/pull/2994
>
> On Thu, Nov 6, 2014 at 4:15 PM, bdev  wrote:
>
>> We are looking at consuming the kafka stream using Spark Streaming and
>> transform into various subsets like applying some transformation or
>> de-normalizing some fields, etc. and feed it back into Kafka as a
>> different
>> topic for downstream consumers.
>>
>> Wanted to know if there are any existing patterns for achieving this.
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Any-patterns-for-multiplexing-the-streaming-data-tp18303.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: AVRO specific records

2014-11-05 Thread Anand Iyer
You can also use the Kite SDK to read/write Avro records:
https://github.com/kite-sdk/kite-examples/tree/master/spark

- Anand

On Wed, Nov 5, 2014 at 2:24 PM, Laird, Benjamin <
benjamin.la...@capitalone.com> wrote:

> Something like this works and is how I create an RDD of specific records.
>
> val avroRdd = sc.newAPIHadoopFile("twitter.avro",
> classOf[AvroKeyInputFormat[twitter_schema]],
> classOf[AvroKey[twitter_schema]], classOf[NullWritable], conf) (From
> https://github.com/julianpeeters/avro-scala-macro-annotation-examples/blob/master/spark/src/main/scala/AvroSparkScala.scala)
> Keep in mind you'll need to use the kryo serializer as well.
>
> From: Frank Austin Nothaft 
> Date: Wednesday, November 5, 2014 at 5:06 PM
> To: Simone Franzini 
> Cc: "user@spark.apache.org" 
> Subject: Re: AVRO specific records
>
> Hi Simone,
>
> Matt Massie put together a good tutorial on his blog
> <http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/>. If you’re
> looking for more code using Avro, we use it pretty extensively in our
> genomics project. Our Avro schemas are here
> <https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl>,
> and we have serialization code here
> <https://github.com/bigdatagenomics/adam/tree/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization>.
> We use Parquet for storing the Avro records, but there is also an Avro
> HadoopInputFormat.
>
> Regards,
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
>
> On Nov 5, 2014, at 1:25 PM, Simone Franzini 
> wrote:
>
> How can I read/write AVRO specific records?
> I found several snippets using generic records, but nothing with specific
> records so far.
>
> Thanks,
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed.  If the reader of this message is not the
> intended recipient, you are hereby notified that any review,
> retransmission, dissemination, distribution, copying or other use of, or
> taking of any action in reliance upon this information is strictly
> prohibited. If you have received this communication in error, please
> contact the sender and delete the material from your computer.
>


Re: Spark SQL Percentile UDAF

2014-10-09 Thread Anand Mohan Tumuluri
Filed https://issues.apache.org/jira/browse/SPARK-3891

Thanks,
Anand Mohan

On Thu, Oct 9, 2014 at 7:13 PM, Michael Armbrust 
wrote:

> Please file a JIRA:https://issues.apache.org/jira/browse/SPARK/
> <https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK%2F&sa=D&sntz=1&usg=AFQjCNFS_GnMso2OCOITA0TSJ5U10b3JSQ>
>
> On Thu, Oct 9, 2014 at 6:48 PM, Anand Mohan  wrote:
>
>> Hi,
>>
>> I just noticed the Percentile UDAF PR being merged into trunk and decided
>> to test it.
>> So pulled in today's trunk and tested the percentile queries.
>> They work marvelously, Thanks a lot for bringing this into Spark SQL.
>>
>> However Hive percentile UDAF also supports an array mode where in you can
>> give the list of percentiles that you want and it would return an array of
>> double values one for each requested percentile.
>> This query is failing with the below error. However a query with the
>> individual percentiles like
>> percentile(turnaroundtime,0.25),percentile(turnaroundtime,0.5),percentile(turnaroundtime,0.75)
>> is working. (and so this issue is not of a high priority as there is this
>> workaround for us)
>>
>> Thanks,
>> Anand Mohan
>>
>> 0: jdbc:hive2://dev-uuppala.sfohi.philips.com> select name,
>> percentile(turnaroundtime,array(0,0.25,0.5,0.75,1)) from exam group by name;
>>
>> Error: org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 1 in stage 25.0 failed 4 times, most recent failure: Lost task 1.3 in
>> stage 25.0 (TID 305, Dev-uuppala.sfohi.philips.com):
>> java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot
>> be cast to [Ljava.lang.Object;
>>
>> org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83)
>>
>> org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259)
>>
>> org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349)
>>
>> org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170)
>>
>> org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342)
>>
>> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)
>>
>> org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
>> org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)
>> org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)
>>
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> org.apache.spark.scheduler.Task.run(Task.scala:56)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> java.lang.Thread.run(Thread.java:745)
>> Driver stacktrace: (state=,code=0)
>>
>>
>>
>> --
>> View this message in context: Spark SQL Percentile UDAF
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Percentile-UDAF-tp16092.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Spark SQL Percentile UDAF

2014-10-09 Thread Anand Mohan
Hi,

I just noticed the Percentile UDAF PR being merged into trunk and decided
to test it.
So pulled in today's trunk and tested the percentile queries.
They work marvelously, Thanks a lot for bringing this into Spark SQL.

However Hive percentile UDAF also supports an array mode where in you can
give the list of percentiles that you want and it would return an array of
double values one for each requested percentile.
This query is failing with the below error. However a query with the
individual percentiles like
percentile(turnaroundtime,0.25),percentile(turnaroundtime,0.5),percentile(turnaroundtime,0.75)
is working. (and so this issue is not of a high priority as there is this
workaround for us)

Thanks,
Anand Mohan

0: jdbc:hive2://dev-uuppala.sfohi.philips.com> select name,
percentile(turnaroundtime,array(0,0.25,0.5,0.75,1)) from exam group by name;

Error: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 1 in stage 25.0 failed 4 times, most recent failure: Lost task 1.3 in
stage 25.0 (TID 305, Dev-uuppala.sfohi.philips.com):
java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer cannot
be cast to [Ljava.lang.Object;

org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83)

org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259)

org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349)

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170)

org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167)

org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)

org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
org.apache.spark.scheduler.Task.run(Task.scala:56)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace: (state=,code=0)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Percentile-UDAF-tp16092.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark SQL HiveContext Projection Pushdown

2014-10-09 Thread Anand Mohan Tumuluri
Thanks a lot. It is working like a charm now. Even predicate push down on
optional fields is working.
Awesome!!!
Is there any plan to support windowing queries? I know that Shark supported
it in its last release and expected it to be already included.

Best regards,
Anand Mohan
On Oct 8, 2014 11:57 AM, "Michael Armbrust"  wrote:

> We are working to improve the integration here, but I can recommend the
> following when running spark 1.1:  create an external table and
> set spark.sql.hive.convertMetastoreParquet=true
>
> Note that even with a HiveContext we don't support window functions yet.
>
>
> On Wed, Oct 8, 2014 at 10:41 AM, Anand Mohan  wrote:
>
>> We have our analytics infra built on Spark and Parquet.
>> We are trying to replace some of our queries based on the direct Spark
>> RDD API to SQL based either on Spark SQL/HiveQL.
>> Our motivation was to take advantage of the transparent projection &
>> predicate pushdown that's offered by Spark SQL and eliminate the need for 
>> persisting
>> the RDD in memory. (Cache invalidation turned out to be a big problem for
>> us)
>>
>> The below tests are done with Spark 1.1.0 on CDH 5.1.0
>>
>>
>> 1. Spark SQL's (SQLContext) Parquet support was excellent for our case.
>> The ability to query in SQL and apply scala functions as UDFs in the SQL is
>> extremely convenient. Project pushdown works flawlessly, not much sure
>> about predicate pushdown
>> (we have 90% optional fields in our dataset and I remember Michael
>> Armbrust telling me that this is a bug in Parquet in that it doesnt allow
>> predicate pushdown for optional fields.)
>> However we have timestamp based duplicate removal which requires
>> windowing queries which are not working in SQLContext.sql parsing mode.
>>
>> 2. We then tried HiveQL using HiveContext by creating a Hive external
>> table backed by the same Parquet data. However, in this mode, projection
>> pushdown doesnt seem to work and it ends up reading the whole Parquet data
>> for each query.(which slows down a lot)
>> Please see attached the screenshot of this.
>> Hive itself doesnt seem to have any issues with the projection pushdown.
>> So this is weird. Is this due to any configuration problem?
>>
>> Thanks in advance,
>> Anand Mohan
>>
>> *SparkSQLHiveParquet.png* (316K) Download Attachment
>> <http://apache-spark-user-list.1001560.n3.nabble.com/attachment/15953/0/SparkSQLHiveParquet.png>
>>
>> --
>> View this message in context: Spark SQL HiveContext Projection Pushdown
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-HiveContext-Projection-Pushdown-tp15953.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Spark SQL HiveContext Projection Pushdown

2014-10-08 Thread Anand Mohan
We have our analytics infra built on Spark and Parquet.
We are trying to replace some of our queries based on the direct Spark RDD
API to SQL based either on Spark SQL/HiveQL.
Our motivation was to take advantage of the transparent projection &
predicate pushdown that's offered by Spark SQL and eliminate the need
for persisting
the RDD in memory. (Cache invalidation turned out to be a big problem for
us)

The below tests are done with Spark 1.1.0 on CDH 5.1.0


1. Spark SQL's (SQLContext) Parquet support was excellent for our case. The
ability to query in SQL and apply scala functions as UDFs in the SQL is
extremely convenient. Project pushdown works flawlessly, not much sure
about predicate pushdown
(we have 90% optional fields in our dataset and I remember Michael Armbrust
telling me that this is a bug in Parquet in that it doesnt allow predicate
pushdown for optional fields.)
However we have timestamp based duplicate removal which requires windowing
queries which are not working in SQLContext.sql parsing mode.

2. We then tried HiveQL using HiveContext by creating a Hive external table
backed by the same Parquet data. However, in this mode, projection pushdown
doesnt seem to work and it ends up reading the whole Parquet data for each
query.(which slows down a lot)
Please see attached the screenshot of this.
Hive itself doesnt seem to have any issues with the projection pushdown.
So this is weird. Is this due to any configuration problem?

Thanks in advance,
Anand Mohan


SparkSQLHiveParquet.png (316K) 
<http://apache-spark-user-list.1001560.n3.nabble.com/attachment/15953/0/SparkSQLHiveParquet.png>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-HiveContext-Projection-Pushdown-tp15953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: SparkContext startup time out

2014-07-26 Thread Anand Avati
I am bumping into this problem as well. I am trying to move to akka 2.3.x
from 2.2.x in order to port to Scala 2.11 - only akka 2.3.x is available in
Scala 2.11. All 2.2.x akka works fine, and all 2.3.x akka give the
following exception in "new SparkContext". Still investigating why..

  java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
  at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:107)
  at akka.remote.Remoting.start(Remoting.scala:180)
  at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
  at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
  at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)




On Fri, May 30, 2014 at 6:33 AM, Pierre B <
pierre.borckm...@realimpactanalytics.com> wrote:

> I was annoyed by this as well.
> It appears that just permuting the order of decencies inclusion solves this
> problem:
>
> first spark, than your cdh hadoop distro.
>
> HTH,
>
> Pierre
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p6582.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark on other parallel filesystems

2014-04-04 Thread Anand Avati
On Fri, Apr 4, 2014 at 5:12 PM, Matei Zaharia wrote:

> As long as the filesystem is mounted at the same path on every node, you
> should be able to just run Spark and use a file:// URL for your files.
>
> The only downside with running it this way is that Lustre won't expose
> data locality info to Spark, the way HDFS does. That may not matter if it's
> a network-mounted file system though.
>

Is the locality querying mechanism specific to HDFS mode, or is it possible
to implement plugins in Spark to query location in other ways on other
filesystems? I ask because, glusterfs can expose data location of a file
through virtual extended attributes and I would be interested in making
Spark exploit that locality when the file location is specified as
glusterfs:// (or querying the xattr blindly for file://). How much of a
difference does data locality make for Spark use cases anyways (since most
of the computation happens in memory)? Any sort of numbers?

Thanks!
Avati


>
>
Matei
>
> On Apr 4, 2014, at 4:56 PM, Venkat Krishnamurthy 
> wrote:
>
>  All
>
>  Are there any drawbacks or technical challenges (or any information,
> really) related to using Spark directly on a global parallel filesystem
>  like Lustre/GPFS?
>
>  Any idea of what would be involved in doing a minimal proof of concept?
> Is it just possible to run Spark unmodified (without the HDFS substrate)
> for a start, or will that not work at all? I do know that it's possible to
> implement Tachyon on Lustre and get the HDFS interface - just looking at
> other options.
>
>  Venkat
>
>
>