Re: Apply ML to grouped dataframe

2016-08-22 Thread Nirmal Fernando
On Tue, Aug 23, 2016 at 10:56 AM, Wen Pei Yu  wrote:

> We can group a dataframe by one column like
>
> df.groupBy(df.col("gender"))
>

On top of this DF, use a filter that would enable you to extract the
grouped DF as separated DFs. Then you can apply ML on top of each DF.

eg: xyzDF.filter(col("x").equalTo(x))

>
> It like split a dataframe to multiple dataframe. Currently, we can only
> apply simple sql function to this GroupedData like agg, max etc.
>
> What we want is apply one ML algorithm to each group.
>
> Regards.
>
> [image: Inactive hide details for Nirmal Fernando ---08/23/2016 01:14:48
> PM---Hi Wen, AFAIK Spark MLlib implements its machine learning]Nirmal
> Fernando ---08/23/2016 01:14:48 PM---Hi Wen, AFAIK Spark MLlib implements
> its machine learning algorithms on top of
>
> From: Nirmal Fernando 
> To: Wen Pei Yu/China/IBM@IBMCN
> Cc: User 
> Date: 08/23/2016 01:14 PM
>
> Subject: Re: Apply ML to grouped dataframe
> --
>
>
>
> Hi Wen,
>
> AFAIK Spark MLlib implements its machine learning algorithms on top of
> Spark dataframe API. What did you mean by a grouped dataframe?
>
> On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
> > wrote:
>
>Hi Nirmal
>
>I didn't get your point.
>Can you tell me more about how to use MLlib to grouped dataframe?
>
>Regards.
>Wenpei.
>
>[image: Inactive hide details for Nirmal Fernando ---08/23/2016
>10:26:36 AM---You can use Spark MLlib 
> http://spark.apache.org/docs/late]Nirmal
>Fernando ---08/23/2016 10:26:36 AM---You can use Spark MLlib
>
> *http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-bas*
>
> 
>
>From: Nirmal Fernando <*nir...@wso2.com* >
>To: Wen Pei Yu/China/IBM@IBMCN
>Cc: User <*user@spark.apache.org* >
>Date: 08/23/2016 10:26 AM
>Subject: Re: Apply ML to grouped dataframe
>--
>
>
>
>
>You can use Spark MLlib
>
> *http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api*
>
> 
>
>On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
>> wrote:
>   Hi
>
>  We have a dataframe, then want group it and apply a ML algorithm
>  or statistics(say t test) to each one. Is there any efficient way 
> for this
>  situation?
>
>  Currently, we transfer to pyspark, use groupbykey and apply
>  numpy function to array. But this wasn't an efficient way, right?
>
>  Regards.
>  Wenpei.
>
>
>
>
>--
>
>Thanks & regards,
>Nirmal
>
>Team Lead - WSO2 Machine Learner
>Associate Technical Lead - Data Technologies Team, WSO2 Inc.
>Mobile: *+94715779733* <%2B94715779733>
>Blog: *http://nirmalfdo.blogspot.com/* 
>
>
>
>
>
>
>
> --
>
> Thanks & regards,
> Nirmal
>
> Team Lead - WSO2 Machine Learner
> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
> Mobile: +94715779733
> Blog: *http://nirmalfdo.blogspot.com/* 
>
>
>
>


-- 

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/


Re: Apply ML to grouped dataframe

2016-08-22 Thread Wen Pei Yu

We can group a dataframe by one column like

df.groupBy(df.col("gender"))

It like split a dataframe to multiple dataframe. Currently, we can only
apply simple sql function to this GroupedData like agg, max etc.

What we want is apply one ML algorithm to each group.

Regards.



From:   Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date:   08/23/2016 01:14 PM
Subject:Re: Apply ML to grouped dataframe



Hi Wen,

AFAIK Spark MLlib implements its machine learning algorithms on top of
Spark dataframe API. What did you mean by a grouped dataframe?

On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu  wrote:
  Hi Nirmal

  I didn't get your point.
  Can you tell me more about how to use MLlib to grouped dataframe?

  Regards.
  Wenpei.

  Inactive hide details for Nirmal Fernando ---08/23/2016 10:26:36 AM---You
  can use Spark MLlib http://spark.apache.org/docs/lateNirmal Fernando
  ---08/23/2016 10:26:36 AM---You can use Spark MLlib
  http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-bas


  From: Nirmal Fernando 
  To: Wen Pei Yu/China/IBM@IBMCN
  Cc: User 
  Date: 08/23/2016 10:26 AM
  Subject: Re: Apply ML to grouped dataframe




  You can use Spark MLlib
  
http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api


  On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu  wrote:
Hi

We have a dataframe, then want group it and apply a ML algorithm or
statistics(say t test) to each one. Is there any efficient way for
this situation?

Currently, we transfer to pyspark, use groupbykey and apply numpy
function to array. But this wasn't an efficient way, right?

Regards.
Wenpei.




  --

  Thanks & regards,
  Nirmal

  Team Lead - WSO2 Machine Learner
  Associate Technical Lead - Data Technologies Team, WSO2 Inc.
  Mobile: +94715779733
  Blog: http://nirmalfdo.blogspot.com/








--

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/




Re: Apply ML to grouped dataframe

2016-08-22 Thread Nirmal Fernando
Hi Wen,

AFAIK Spark MLlib implements its machine learning algorithms on top of
Spark dataframe API. What did you mean by a grouped dataframe?

On Tue, Aug 23, 2016 at 10:42 AM, Wen Pei Yu  wrote:

> Hi Nirmal
>
> I didn't get your point.
> Can you tell me more about how to use MLlib to grouped dataframe?
>
> Regards.
> Wenpei.
>
> [image: Inactive hide details for Nirmal Fernando ---08/23/2016 10:26:36
> AM---You can use Spark MLlib http://spark.apache.org/docs/late]Nirmal
> Fernando ---08/23/2016 10:26:36 AM---You can use Spark MLlib
> http://spark.apache.org/docs/latest/ml-guide.html#
> announcement-dataframe-bas
>
> From: Nirmal Fernando 
> To: Wen Pei Yu/China/IBM@IBMCN
> Cc: User 
> Date: 08/23/2016 10:26 AM
> Subject: Re: Apply ML to grouped dataframe
> --
>
>
>
> You can use Spark MLlib
> *http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api*
> 
>
> On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu <*yuw...@cn.ibm.com*
> > wrote:
>
>Hi
>
>We have a dataframe, then want group it and apply a ML algorithm or
>statistics(say t test) to each one. Is there any efficient way for this
>situation?
>
>Currently, we transfer to pyspark, use groupbykey and apply numpy
>function to array. But this wasn't an efficient way, right?
>
>Regards.
>Wenpei.
>
>
>
>
>
> --
>
> Thanks & regards,
> Nirmal
>
> Team Lead - WSO2 Machine Learner
> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
> Mobile: +94715779733
> Blog: *http://nirmalfdo.blogspot.com/* 
>
>
>
>


-- 

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/


Re: Apply ML to grouped dataframe

2016-08-22 Thread Wen Pei Yu

Hi Nirmal

I didn't get your point.
Can you tell me more about how to use MLlib to grouped dataframe?

Regards.
Wenpei.



From:   Nirmal Fernando 
To: Wen Pei Yu/China/IBM@IBMCN
Cc: User 
Date:   08/23/2016 10:26 AM
Subject:Re: Apply ML to grouped dataframe



You can use Spark MLlib
http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api

On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu  wrote:
  Hi

  We have a dataframe, then want group it and apply a ML algorithm or
  statistics(say t test) to each one. Is there any efficient way for this
  situation?

  Currently, we transfer to pyspark, use groupbykey and apply numpy
  function to array. But this wasn't an efficient way, right?

  Regards.
  Wenpei.





--

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/




Re: Spark 2.0 - Join statement compile error

2016-08-22 Thread Deepak Sharma
Hi Subhajit
Try this in your join:
*val* *df** = 
**sales_demand**.**join**(**product_master**,**sales_demand**.$"INVENTORY_ITEM_ID"
=**== **product_master**.$"INVENTORY_ITEM_ID",**"inner"**)*

On Tue, Aug 23, 2016 at 2:30 AM, Subhajit Purkayastha 
wrote:

> *All,*
>
>
>
> *I have the following dataFrames and the temp table. *
>
>
>
> *I am trying to create a new DF , the following statement is not compiling*
>
>
>
> *val* *df** = **sales_demand**.**join**(**product_master**,(*
> *sales_demand**.INVENTORY_ITEM_ID**==**product_master*
> *.INVENTORY_ITEM_ID),**joinType**=**"inner"**)*
>
>
>
>
>
>
>
> *What am I doing wrong?*
>
>
>
> *==Code===*
>
>
>
> *var* sales_order_sql_stmt = s"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID,
> ORGANIZATION_ID,
>
>   from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'),
> '-MM-dd') AS schedule_date
>
>   FROM sales_order_demand
>
>   WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >= $
> planning_start_date  limit 10"""
>
>
>
> *val* sales_demand = spark.sql (sales_order_sql_stmt)
>
>
>
> //print the data
>
> *sales_demand**.**collect**()*.foreach { println }
>
>
>
>
>
> *val* product_sql_stmt = "select 
> SEGMENT1,INVENTORY_ITEM_ID,ORGANIZATION_ID
> from product limit 10"
>
> *val* product_master = spark.sql (product_sql_stmt)
>
>
>
> //print the data
>
> *product_master**.**collect**()*.foreach { println }
>
>
>
> *val* *df** = **sales_demand**.**join**(**product_master**,(*
> *sales_demand**.INVENTORY_ITEM_ID**==**product_master*
> *.INVENTORY_ITEM_ID),**joinType**=**"inner"**)*
>
>
>
>
>
>
>
>spark.stop()
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Apply ML to grouped dataframe

2016-08-22 Thread Nirmal Fernando
You can use Spark MLlib
http://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api

On Tue, Aug 23, 2016 at 7:34 AM, Wen Pei Yu  wrote:

> Hi
>
> We have a dataframe, then want group it and apply a ML algorithm or
> statistics(say t test) to each one. Is there any efficient way for this
> situation?
>
> Currently, we transfer to pyspark, use groupbykey and apply numpy function
> to array. But this wasn't an efficient way, right?
>
> Regards.
> Wenpei.
>



-- 

Thanks & regards,
Nirmal

Team Lead - WSO2 Machine Learner
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/


Apply ML to grouped dataframe

2016-08-22 Thread Wen Pei Yu

Hi

We have a dataframe, then want group it and apply a ML algorithm or
statistics(say t test) to each one. Is there any efficient way for this
situation?

Currently, we transfer to pyspark, use groupbykey and apply numpy function
to array. But this wasn't an efficient way, right?

Regards.
Wenpei.


Re: Spark with Parquet

2016-08-22 Thread shamu
Create a hive table x
Load your csv data in table x (LOAD DATA INPATH 'file/path' INTO TABLE x;)

create hive table y with same structure as x except add STORED AS PARQUET; 
INSERT OVERWRITE TABLE y SELECT * FROM x;


This would get you parquet files under /user/hive/warehouse/y (as an
example) you can use this file path for your processing... 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Parquet-tp4923p27584.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: word count on parquet file

2016-08-22 Thread shamu
I changed the code to below...

JavaPairRDD rdd = sc.newAPIHadoopFile(inputFile,
ParquetInputFormat.class, NullWritable.class, String.class, mrConf);
JavaRDD words = rdd.values().flatMap(
new FlatMapFunction() {
public Iterable call(String x) {
return Arrays.asList(x.split(","));
}
});
With  this I get below error
 java.lang.NullPointerException
at
org.apache.parquet.hadoop.ParquetInputFormat.getReadSupportInstance(ParquetInputFormat.java:280)
at
org.apache.parquet.hadoop.ParquetInputFormat.getReadSupport(ParquetInputFormat.java:257)
at
org.apache.parquet.hadoop.ParquetInputFormat.createRecordReader(ParquetInputFormat.java:245)

My input file is a simple comma separated employee record, I created a hive
table with STORED AS PARQUET and then loaded the table from another hive
table... I can treat them as simple lines and I just need to do a word
count. So, Does my Key class and Value class make sense?

Thanks a lot for your support.
Best..



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/word-count-on-parquet-file-tp27581p27583.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataFrameWriter bug after RandomSplit?

2016-08-22 Thread evanzamir
Trying to build a ML model using LogisticRegression, I ran into the following
unexplainable issue. Here's a snippet of code which 
training, testing = data.randomSplit([0.8, 0.2], seed=42)
print("number of rows in testing = {}".format(testing.count()))
print("number of rows in training =
{}".format(training.count()))
testing.coalesce(1).write.json('testing')
training.coalesce(1).write.json('training')

The first two print statements (there should be 1390 total samples or rows
in the data set):
number of rows in testing = 290
number of rows in training = 1100

The thing I can't explain is that in the json files for testing and training
there are 805 and 585 rows (json objects), respectively. That adds up to the
expected 1390, but somehow after coalescing and printing the number of
objects in each data set has changed! I have no clue why. Is this a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrameWriter-bug-after-RandomSplit-tp27582.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Combining multiple models in Spark-ML 2.0

2016-08-22 Thread janardhan shetty
Hi,

Are there any pointers, links on stacking multiple models in spark
dataframes ?. WHat strategies can be employed if we need to combine greater
than 2 models  ?


Re: Spark 2.0 - Join statement compile error

2016-08-22 Thread Vishal Maru
try putting join condition as String

On Mon, Aug 22, 2016 at 5:00 PM, Subhajit Purkayastha 
wrote:

> *All,*
>
>
>
> *I have the following dataFrames and the temp table. *
>
>
>
> *I am trying to create a new DF , the following statement is not compiling*
>
>
>
> *val* *df** = **sales_demand**.**join**(**product_master**,(*
> *sales_demand**.INVENTORY_ITEM_ID**==**product_master*
> *.INVENTORY_ITEM_ID),**joinType**=**"inner"**)*
>
>
>
>
>
>
>
> *What am I doing wrong?*
>
>
>
> *==Code===*
>
>
>
> *var* sales_order_sql_stmt = s"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID,
> ORGANIZATION_ID,
>
>   from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'),
> '-MM-dd') AS schedule_date
>
>   FROM sales_order_demand
>
>   WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >= $
> planning_start_date  limit 10"""
>
>
>
> *val* sales_demand = spark.sql (sales_order_sql_stmt)
>
>
>
> //print the data
>
> *sales_demand**.**collect**()*.foreach { println }
>
>
>
>
>
> *val* product_sql_stmt = "select 
> SEGMENT1,INVENTORY_ITEM_ID,ORGANIZATION_ID
> from product limit 10"
>
> *val* product_master = spark.sql (product_sql_stmt)
>
>
>
> //print the data
>
> *product_master**.**collect**()*.foreach { println }
>
>
>
> *val* *df** = **sales_demand**.**join**(**product_master**,(*
> *sales_demand**.INVENTORY_ITEM_ID**==**product_master*
> *.INVENTORY_ITEM_ID),**joinType**=**"inner"**)*
>
>
>
>
>
>
>
>spark.stop()
>


Re: word count on parquet file

2016-08-22 Thread ayan guha
You are missing input. Mrconf is not the way to add input files. In spark,
try Dataframe read functions or sc.textfile function.

Best
Ayan
On 23 Aug 2016 07:12, "shamu"  wrote:

> Hi All,
> I am a newbie to Spark/Hadoop.
> I want to read a parquet file and a perform a simple word-count. Below is
> my
> code, however I get an error:
> Exception in thread "main" java.io.IOException: No input paths specified in
> job
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.
> listStatus(FileInputFormat.java:239)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.listStatus(
> ParquetInputFormat.java:349)
> at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.
> getSplits(FileInputFormat.java:387)
> at
> org.apache.parquet.hadoop.ParquetInputFormat.getSplits(
> ParquetInputFormat.java:304)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(
> NewHadoopRDD.scala:120)
>
> Below is my code. I guess I am missing some core concepts wrt hadoop
> InputFormats and making it working with spark. Coul d you please explain
> the
> cause and solution to get this working/
> -code
> snippet-
> JavaSparkContext sc = new JavaSparkContext(conf);
> org.apache.hadoop.conf.Configuration mrConf = new Configuration();
> mrConf.addResource(inputFile);
> JavaPairRDD textInputFormatObjectJavaPairRDD =
> sc.newAPIHadoopRDD(mrConf, ParquetInputFormat.class, String.class,
> String.class);
> JavaRDD words = textInputFormatObjectJavaPairRDD.values().flatMap(
> new FlatMapFunction() {
> public Iterable call(String x) {
> return Arrays.asList(x.split(","));
> }
> });
> long x = words.count();
>
> --thanks!
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/word-count-on-parquet-file-tp27581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


word count on parquet file

2016-08-22 Thread shamu
Hi All,
I am a newbie to Spark/Hadoop. 
I want to read a parquet file and a perform a simple word-count. Below is my
code, however I get an error:
Exception in thread "main" java.io.IOException: No input paths specified in
job
at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:239)
at
org.apache.parquet.hadoop.ParquetInputFormat.listStatus(ParquetInputFormat.java:349)
at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
at
org.apache.parquet.hadoop.ParquetInputFormat.getSplits(ParquetInputFormat.java:304)
at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)

Below is my code. I guess I am missing some core concepts wrt hadoop
InputFormats and making it working with spark. Coul d you please explain the
cause and solution to get this working/
-code
snippet-
JavaSparkContext sc = new JavaSparkContext(conf);
org.apache.hadoop.conf.Configuration mrConf = new Configuration();
mrConf.addResource(inputFile);
JavaPairRDD textInputFormatObjectJavaPairRDD =
sc.newAPIHadoopRDD(mrConf, ParquetInputFormat.class, String.class,
String.class);
JavaRDD words = textInputFormatObjectJavaPairRDD.values().flatMap(
new FlatMapFunction() {
public Iterable call(String x) {
return Arrays.asList(x.split(","));
}
});
long x = words.count();

--thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/word-count-on-parquet-file-tp27581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.0 - Join statement compile error

2016-08-22 Thread Subhajit Purkayastha
All,

 

I have the following dataFrames and the temp table. 

 

I am trying to create a new DF , the following statement is not compiling

 

val df =
sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma
ster.INVENTORY_ITEM_ID),joinType="inner")



 



 

What am I doing wrong?

 

==Code===

 

var sales_order_sql_stmt = s"""SELECT ORDER_NUMBER , INVENTORY_ITEM_ID,
ORGANIZATION_ID,

  from_unixtime(unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd'),
'-MM-dd') AS schedule_date

  FROM sales_order_demand 

  WHERE unix_timestamp(SCHEDULE_SHIP_DATE,'-MM-dd') >=
$planning_start_date  limit 10"""

  

val sales_demand = spark.sql (sales_order_sql_stmt)



//print the data

sales_demand.collect().foreach { println }





val product_sql_stmt = "select
SEGMENT1,INVENTORY_ITEM_ID,ORGANIZATION_ID from product limit 10"

val product_master = spark.sql (product_sql_stmt)



//print the data

product_master.collect().foreach { println }

  

val df =
sales_demand.join(product_master,(sales_demand.INVENTORY_ITEM_ID==product_ma
ster.INVENTORY_ITEM_ID),joinType="inner")



 

 

   spark.stop()



Re: spark.lapply in SparkR: Error in writeBin(batch, con, endian = "big")

2016-08-22 Thread Felix Cheung
How big is the output from score()?

Also could you elaborate on what you want to broadcast?





On Mon, Aug 22, 2016 at 11:58 AM -0700, "Cinquegrana, Piero" 
> wrote:

Hello,

I am using the new R API in SparkR spark.lapply (spark 2.0). I am defining a 
complex function to be run across executors and I have to send the entire 
dataset, but there is not (that I could find) a way to broadcast the variable 
in SparkR. I am thus reading the dataset in each executor from disk, but I 
getting the following error:

Error in writeBin(batch, con, endian = "big") :
  attempting to add too many elements to raw vector

Any idea why this is happening?

Pseudo code:

scoreModel <- function(parameters){
   library(read.table)
   dat <- data.frame(fread("file.csv"))
   score(dat,parameters)
}

parameterList <- lapply(1:numModels, function(i) getParameters(i))

modelScores <- spark.lapply(parameterList, scoreModel)


Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook   
[New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook   
[New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



spark.lapply in SparkR: Error in writeBin(batch, con, endian = "big")

2016-08-22 Thread Cinquegrana, Piero
Hello,

I am using the new R API in SparkR spark.lapply (spark 2.0). I am defining a 
complex function to be run across executors and I have to send the entire 
dataset, but there is not (that I could find) a way to broadcast the variable 
in SparkR. I am thus reading the dataset in each executor from disk, but I 
getting the following error:

Error in writeBin(batch, con, endian = "big") :
  attempting to add too many elements to raw vector

Any idea why this is happening?

Pseudo code:

scoreModel <- function(parameters){
   library(read.table)
   dat <- data.frame(fread("file.csv"))
   score(dat,parameters)
}

parameterList <- lapply(1:numModels, function(i) getParameters(i))

modelScores <- spark.lapply(parameterList, scoreModel)


Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook   
[New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Piero Cinquegrana
MarketShare: A Neustar Solution / Data Science
Mobile: +39.329.17.62.539 / www.neustar.biz
Reduce your environmental footprint. Print only if necessary.
Follow Neustar:   [New%20Picture]  
Facebook   
[New%20Picture%20(1)(1)]  
LinkedIn
   [New%20Picture%20(2)]  Twitter
The information contained in this email message is intended only for the use of 
the recipient(s) named above and may contain confidential and/or privileged 
information. If you are not the intended recipient you have received this email 
message in error and any review, dissemination, distribution, or copying of 
this message is strictly prohibited. If you have received this communication in 
error, please notify us immediately and delete the original message.



Re: Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-22 Thread Jacek Laskowski
Hi,

I've not heard this. And moreover I see Kryo supported in Encoders
(SerDes) in Spark 2.0.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala#L151

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Aug 22, 2016 at 8:00 PM, Eric Ho  wrote:
> I heard that Kryo will get phased out at some point but not sure which Spark
> release.
> I'm using PySpark, does anyone has any docs on how to call / use Kryo
> Serializer in PySpark ?
>
> Thanks.
>
> --
>
> -eric ho
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Do we still need to use Kryo serializer in Spark 1.6.2 ?

2016-08-22 Thread Eric Ho
I heard that Kryo will get phased out at some point but not sure which
Spark release.
I'm using PySpark, does anyone has any docs on how to call / use Kryo
Serializer in PySpark ?

Thanks.

-- 

-eric ho


Re: Disable logger in SparkR

2016-08-22 Thread Felix Cheung
You should be able to do that with log4j.properties
http://spark.apache.org/docs/latest/configuration.html#configuring-logging

Or programmatically
https://spark.apache.org/docs/2.0.0/api/R/setLogLevel.html
_
From: Yogesh Vyas >
Sent: Monday, August 22, 2016 6:12 AM
Subject: Disable logger in SparkR
To: user >


Hi,

Is there any way of disabling the logging on console in SparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: Vector size mismatch in logistic regression - Spark ML 2.0

2016-08-22 Thread janardhan shetty
thanks Nick.
This Jira seems to be in stagnant state for a while any update when this
will be released ?

On Mon, Aug 22, 2016 at 5:07 AM, Nick Pentreath 
wrote:

> I believe it may be because of this issue (https://issues.apache.org/
> jira/browse/SPARK-13030). OHE is not an estimator - hence in cases where
> the number of categories differ between train and test, it's not usable in
> the current form.
>
> It's tricky to work around, though one option is to use feature hashing
> instead of the StringIndexer -> OHE combo (see https://lists.apache.org/
> thread.html/a7e06426fd958665985d2c4218ea2f9bf9ba136ddefe83e1ad6f1727@%
> 3Cuser.spark.apache.org%3E for some details).
>
>
>
> On Mon, 22 Aug 2016 at 03:20 janardhan shetty 
> wrote:
>
>> Thanks Krishna for your response.
>> Features in the training set has more categories than test set so when
>> vectorAssembler is used these numbers are usually different and I believe
>> it is as expected right ?
>>
>> Test dataset usually will not have so many categories in their features
>> as Train is the belief here.
>>
>> On Sun, Aug 21, 2016 at 4:44 PM, Krishna Sankar 
>> wrote:
>>
>>> Hi,
>>>Just after I sent the mail, I realized that the error might be with
>>> the training-dataset not the test-dataset.
>>>
>>>1. it might be that you are feeding the full Y vector for training.
>>>2. Which could mean, you are using ~50-50 training-test split.
>>>3. Take a good look at the code that does the data split and the
>>>datasets where they are allocated to.
>>>
>>> Cheers
>>> 
>>>
>>> On Sun, Aug 21, 2016 at 4:37 PM, Krishna Sankar 
>>> wrote:
>>>
 Hi,
   Looks like the test-dataset has different sizes for X & Y. Possible
 steps:

1. What is the test-data-size ?
   - If it is 15,909, check the prediction variable vector - it is
   now 29,471, should be 15,909
   - If you expect it to be 29,471, then the X Matrix is not right.
   2. It is also probable that the size of the test-data is
something else. If so, check the data pipeline.
3. If you print the count() of the various vectors, I think you can
find the error.

 Cheers & Good Luck
 

 On Sun, Aug 21, 2016 at 3:16 PM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Hi,
>
> I have built the logistic regression model using training-dataset.
> When I am predicting on a test-dataset, it is throwing the below error
> of size mismatch.
>
> Steps done:
> 1. String indexers on categorical features.
> 2. One hot encoding on these indexed features.
>
> Any help is appreciated to resolve this issue or is it a bug ?
>
> SparkException: *Job aborted due to stage failure: Task 0 in stage
> 635.0 failed 1 times, most recent failure: Lost task 0.0 in stage 635.0
> (TID 19421, localhost): java.lang.IllegalArgumentException: requirement
> failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching
> sizes: x.size = 15909, y.size = 29471* at
> scala.Predef$.require(Predef.scala:224) at 
> org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:104)
> at org.apache.spark.ml.classification.LogisticRegressionModel$$
> anonfun$19.apply(LogisticRegression.scala:505) at org.apache.spark.ml.
> classification.LogisticRegressionModel$$anonfun$19.apply(LogisticRegression.scala:504)
> at org.apache.spark.ml.classification.LogisticRegressionModel.
> predictRaw(LogisticRegression.scala:594) at org.apache.spark.ml.
> classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:484)
> at org.apache.spark.ml.classification.ProbabilisticClassificationMod
> el$$anonfun$1.apply(ProbabilisticClassifier.scala:112) at
> org.apache.spark.ml.classification.ProbabilisticClassificationMod
> el$$anonfun$1.apply(ProbabilisticClassifier.scala:111) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.evalExpr137$(Unknown Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply(Unknown Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply(Unknown Source) at
> scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>


>>>
>>


[pyspark] How to ensure rdd.takeSample produce the same set everytime?

2016-08-22 Thread Chua Jie Sheng
Hi all,

I have been trying on different machine to make rdd.takeSample produce the
same set but failed.
I have seed the method with the same value on different machine but the
result is different.
Any idea why?

Best Regards
Jie Sheng

Important: This email is confidential and may be privileged. If you are not
the intended recipient, please delete it and notify us immediately; you
should not copy or use it for any purpose, nor disclose its contents to any
other person. Thank you.


Re: Spark 2.0 regression when querying very wide data frames

2016-08-22 Thread mhornbech
I dont think thats the issue. It sound very much like this 
https://issues.apache.org/jira/browse/SPARK-16664

Morten

> Den 20. aug. 2016 kl. 21.24 skrev ponkin [via Apache Spark User List] 
> :
> 
> Did you try to load wide, for example, CSV file or Parquet? May be the 
> problem is in spark-cassandra-connector not Spark itself? Are you using 
> spark-cassandra-connector(https://github.com/datastax/spark-cassandra-connector)?
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-regression-when-querying-very-wide-data-frames-tp27567p27571.html
> To unsubscribe from Spark 2.0 regression when querying very wide data frames, 
> click here.
> NAML




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-regression-when-querying-very-wide-data-frames-tp27567p27580.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Below is source code for parsing xml RDD which has single line xml data.

import scala.xml.XML
import scala.xml.Elem
import scala.collection.mutable.ArrayBuffer
import scala.xml.Text
import scala.xml.Node


var dataArray= new ArrayBuffer[String]()
def processNode(node: Node, fp1: String):Unit = node match
{
   case Elem(prefix,label,attribs,scope,Text(text)) =>
  dataArray.+=:("Cust.001.001.03-"+fp1+","+text)
   case _ => for (n <- node.child)
  {
   val fp=fp1+"/"+n.label
   processNode(n, fp)
  }
}


val dataDF = xmlData
 .map { x =>
val p =
XML.loadString(x.get(0).toString.mkString)
val xsd = utils.getXSD(p)
println("xsd -- ",xsd)
val f  = "/" + p.label
val msgId = (p \\ "Fnd" \ "Mesg" \ "Paid" \
"Record" \ "CustInit" \ "GroupFirst" \ "MesgId").text
processNode(p,f,xsd)
(mesgId
,utils.dataArray,x.get(1).toString())
 }
   .flatMap{x =>
 val msgId = x._1
 val y = x._2.toIterable.map { x1 =>

 (mesgId,x1.split(",").apply(0),x1.split(",").apply(1),x._3)
 }
 y
 }.toDF("key","attribute","value","type")

On Mon, Aug 22, 2016 at 4:34 PM, Hyukjin Kwon  wrote:

> Do you mind share your codes and sample data? It should be okay with
> single XML if I remember this correctly.
>
> 2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com>:
>
>> Hi Darin,
>>
>> Ate  you  using  this  utility  to  parse single line XML?
>>
>>
>> Sent from Samsung Mobile.
>>
>>
>>  Original message 
>> From: Darin McBeath 
>> Date:21/08/2016 17:44 (GMT+05:30)
>> To: Hyukjin Kwon , Jörn Franke 
>>
>> Cc: Diwakar Dhanuskodi , Felix Cheung <
>> felixcheun...@hotmail.com>, user 
>> Subject: Re: Best way to read XML data from RDD
>>
>> Another option would be to look at spark-xml-utils.  We use this
>> extensively in the manipulation of our XML content.
>>
>> https://github.com/elsevierlabs-os/spark-xml-utils
>>
>>
>>
>> There are quite a few examples.  Depending on your preference (and what
>> you want to do), you could use xpath, xquery, or xslt to transform,
>> extract, or filter.
>>
>> Like mentioned below, you want to initialize the parser in a
>> mapPartitions call (one of the examples shows this).
>>
>> Hope this is helpful.
>>
>> Darin.
>>
>>
>>
>>
>>
>> 
>> From: Hyukjin Kwon 
>> To: Jörn Franke 
>> Cc: Diwakar Dhanuskodi ; Felix Cheung <
>> felixcheun...@hotmail.com>; user 
>> Sent: Sunday, August 21, 2016 6:10 AM
>> Subject: Re: Best way to read XML data from RDD
>>
>>
>>
>> Hi Diwakar,
>>
>> Spark XML library can take RDD as source.
>>
>> ```
>> val df = new XmlReader()
>>   .withRowTag("book")
>>   .xmlRdd(sqlContext, rdd)
>> ```
>>
>> If performance is critical, I would also recommend to take care of
>> creation and destruction of the parser.
>>
>> If the parser is not serializble, then you can do the creation for each
>> partition within mapPartition just like
>>
>> https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a
>> 48fed9bb188140423/sql/core/src/main/scala/org/apache/
>> spark/sql/DataFrameReader.scala#L322-L325
>>
>>
>> I hope this is helpful.
>>
>>
>>
>>
>> 2016-08-20 15:10 GMT+09:00 Jörn Franke :
>>
>> I fear the issue is that this will create and destroy a XML parser object
>> 2 mio times, which is very inefficient - it does not really look like a
>> parser performance issue. Can't you do something about the format choice?
>> Ask your supplier to deliver another format (ideally avro or sth like
>> this?)?
>> >Otherwise you could just create one XML Parser object / node, but
>> sharing this among the parallel tasks on the same node is tricky.
>> >The other possibility could be simply more hardware ...
>> >
>> >On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>> >
>> >
>> >Yes . It accepts a xml file as source but not RDD. The XML data
>> embedded  inside json is streamed from kafka cluster.  So I could get it as
>> RDD.
>> >>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD
>> map function  but  performance  wise I am not happy as it takes 4 minutes
>> to parse XML from 2 million messages in a 3 nodes 100G 4 cpu each
>> environment.
>> >>
>> >>
>> >>
>> >>
>> >>Sent 

mutable.LinkedHashMap kryo serialization issues

2016-08-22 Thread Rahul Palamuttam
Hi,

Just sending this again to see if others have had this issue.

I recently switched to using kryo serialization and I've been running into 
errors
with the mutable.LinkedHashMap class.

If I don't register the mutable.LinkedHashMap class then I get an 
ArrayStoreException seen below.
If I do register the class, then when the LinkedHashMap is collected on the 
driver, it does not contain any elements.

Here is the snippet of code I used : 
val sc = new SparkContext(new SparkConf()
  .setMaster("local[*]")
  .setAppName("Sample")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))

val collect = sc.parallelize(0 to 10)
  .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
"bonjour"), ("good", "bueno")))

val mapSideSizes = collect.map(p => p.size).collect()(0)
val driverSideSizes = collect.collect()(0).size

println("The sizes before collect : " + mapSideSizes)
println("The sizes after collect : " + driverSideSizes)

** The following only occurs if I did not register the mutable.LinkedHashMap 
class **
16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result
java.lang.ArrayStoreException: scala.collection.mutable.HashMap
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at 
org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I hope this is a known issue and/or I'm missing something important in my setup.
Appreciate any help or advice!

Best,

Rahul Palamuttam

UDTRegistration (in Java)

2016-08-22 Thread raghukiran
Hi,

After moving to Spark 2.0, the UDTRegistration is giving me some issues. I
am trying the following (in Java):

UDTRegistration.register(userclassName, udtclassName);

After this, when I try creating a DataFrame, it throws an exception
that the userclassName is not registered. Can anyone point me to what I am
missing?

Regards,
Raghu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UDTRegistration-in-Java-tp27579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using spark to distribute jobs to standalone servers

2016-08-22 Thread Larry White
Hi,

I have a bit of an unusual use-case and would *greatly* *appreciate* some
feedback as to whether it is a good fit for spark.

I have a network of compute/data servers configured as a tree as shown below

   - controller
   - server 1
  - server 2
  - server 3
  - etc.

There are ~20 servers, but the number is increasing to ~100.

Each server contains a different dataset, all in the same format. Each is
hosted by a different organization, and the data on every individual server
is unique to that organization.

Data *cannot* be replicated across servers using RDDs or any other means,
for privacy/ownership reasons.

Data *cannot* be retrieved to the controller, except in aggregate form, as
the result of a query, for example.

Because of this, there are currently no operations that treats the data as
if it were a single data set: We could run a classifier on each site
individually, but cannot for legal reasons, pull all the data into a single
*physical* dataframe to run the classifier on all of it together.

The servers are located across a wide geographic region (1,000s of miles)

We would like to send jobs from the controller to be executed in parallel
on all the servers, and retrieve the results to the controller. The jobs
would consist of SQL-Heavy Java code for 'production' queries, and python
or R code for ad-hoc queries and predictive modeling.

Spark seems to have the capability to meet many of the individual
requirements, but is it a reasonable platform overall for building this
application?

Thank you very much for your assistance.

Larry


Re: Best way to read XML data from RDD

2016-08-22 Thread Darin McBeath
Yes, you can use it for single line XML or even a multi-line XML.
In our typical mode of operation, we have sequence files (where the value is 
the XML).  We then run operations over the XML to extract certain values or to 
transform the XML into another format (such as json).
If i understand your question, your content is in json.  Some of the values 
within this json are XML strings.  You should be able to use spark-xml-utils to 
parse this string and filter/evaluate the result of an xpath expression (or 
xquery/xslt).
One limitation of spark-xml-utils when using the evaluate operation is that it 
returns a string.  So, you have to be a little creative when returning multiple 
values (such as delimiting the values with a special character and then 
splitting on this delimiter).  
Darin.

  From: Diwakar Dhanuskodi 
 To: Darin McBeath ; Hyukjin Kwon ; 
Jörn Franke  
Cc: Felix Cheung ; user 
 Sent: Monday, August 22, 2016 6:53 AM
 Subject: Re: Best way to read XML data from RDD
   
Hi Darin, 
Ate  you  using  this  utility  to  parse single line XML?

Sent from Samsung Mobile.

 Original message From: Darin McBeath  
Date:21/08/2016 17:44 (GMT+05:30) To: Hyukjin Kwon , Jörn 
Franke  Cc: Diwakar Dhanuskodi 
, Felix Cheung , user 
 Subject: Re: Best way to read XML data from RDD 
Another option would be to look at spark-xml-utils.  We use this extensively in 
the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon 
To: Jörn Franke  
Cc: Diwakar Dhanuskodi ; Felix Cheung 
; user 
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke :

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more hardware ...
>
>On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi  
>wrote:
>
>
>Yes . It accepts a xml file as source but not RDD. The XML data embedded  
>inside json is streamed from kafka cluster.  So I could get it as RDD. 
>>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
>>function  but  performance  wise I am not happy as it takes 4 minutes to 
>>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 
>>
>>
>>
>>
>>Sent from Samsung Mobile.
>>
>>
>> Original message 
>>From: Felix Cheung  
>>Date:20/08/2016  09:49  (GMT+05:30) 
>>To: Diwakar Dhanuskodi  , user 
>> 
>>Cc: 
>>Subject: Re: Best way to read XML data from RDD 
>>
>>
>>Have you tried
>>
>>https://github.com/databricks/ spark-xml
>>?
>>
>>
>>
>>
>>
>>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
>> wrote:
>>
>>
>>Hi,  
>>
>>
>>There is a RDD with json data. I could read json data using rdd.read.json . 
>>The json data has XML data in couple of key-value paris. 
>>
>>
>>Which is the best method to read and parse XML from rdd. Is there any 
>>specific xml libraries for spark. Could anyone help on this.
>>
>>
>>Thanks.


  

Disable logger in SparkR

2016-08-22 Thread Yogesh Vyas
Hi,

Is there any way of disabling the logging on console in SparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-22 Thread Ben Teeuwen
I don’t think scaling RAM is a sane strategy to fixing these problems with 
using a dataframe / transformer approach to creating large sparse vectors.

One, though yes it will delay when it will fail, it will still fail. The 
original case I emailed about I tried this, and after waiting 50 minutes, it 
still broke.

Second, if you don’t use dataframes / transformers, but write your own 
functions to do one hot encoding and creating sparse vectors, it will easily 
work on small boxes. E.g. build up a dictionary with unique index numbers for 
all unique values, and access these indexes when creating sparse vectors:

def makeDict(df,columnName):
dict = df.select(columnName).map(lambda 
x:unicode(x[0])).distinct().zipWithIndex().collectAsMap()
dict["missing"] = len(dict)
return dict

def encodeOneHot(x, column):
key = "missing"
if unicode(x) in mappings_bc[column]:
key = unicode(x)
return 
Vectors.sparse(len(mappings_bc[column]),[mappings_bc[column][key]],[1.0])

Ben

> On Aug 19, 2016, at 11:34 PM, Davies Liu  wrote:
> 
> The OOM happen in driver, you may also need more memory for driver.
> 
> On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu  wrote:
>> You are using lots of tiny executors (128 executor with only 2G
>> memory), could you try with bigger executor (for example 16G x 16)?
>> 
>> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>>> 
>>> So I wrote some code to reproduce the problem.
>>> 
>>> I assume here that a pipeline should be able to transform a categorical 
>>> feature with a few million levels.
>>> So I create a dataframe with the categorical feature (‘id’), apply a 
>>> StringIndexer and OneHotEncoder transformer, and run a loop where I 
>>> increase the amount of levels.
>>> It breaks at 1.276.000 levels.
>>> 
>>> Shall I report this as a ticket in JIRA?
>>> 
>>> 
>>> 
>>> 
>>> from pyspark.sql.functions import rand
>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>>> from pyspark.ml import Pipeline
>>> 
>>> start_id = 10
>>> n = 500
>>> step = (n - start_id) / 25
>>> 
>>> for i in xrange(start_id,start_id + n,step):
>>>print "#\n {}".format(i)
>>>dfr = (sqlContext
>>>   .range(start_id, start_id + i)
>>>   .withColumn(‘label', rand(seed=10))
>>>   .withColumn('feat2', rand(seed=101))
>>>#.withColumn('normal', randn(seed=27))
>>>   ).repartition(32).cache()
>>># dfr.select("id", rand(seed=10).alias("uniform"), 
>>> randn(seed=27).alias("normal")).show()
>>>dfr.show(1)
>>>print "This dataframe has {0} rows (and therefore {0} levels will be one 
>>> hot encoded)".format(dfr.count())
>>> 
>>>categorical_feature  = ['id']
>>>stages = []
>>> 
>>>for c in categorical_feature:
>>>stages.append(StringIndexer(inputCol=c, 
>>> outputCol="{}Index".format(c)))
>>>stages.append(OneHotEncoder(dropLast= False, inputCol = 
>>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>> 
>>>columns = ["{}OHE".format(x) for x in categorical_feature]
>>>columns.append('feat2')
>>> 
>>>assembler = VectorAssembler(
>>>inputCols=columns,
>>>outputCol="features")
>>>stages.append(assembler)
>>> 
>>>df2 = dfr
>>> 
>>>pipeline = Pipeline(stages=stages)
>>>pipeline_fitted = pipeline.fit(df2)
>>>df3 = pipeline_fitted.transform(df2)
>>>df3.show(1)
>>>dfr.unpersist()
>>> 
>>> 
>>> 
>>> 
>>> Output:
>>> 
>>> 
>>> #
>>> 10
>>> +--+---+---+
>>> |id|label  |  feat2|
>>> +--+---+---+
>>> |183601|0.38693226548356197|0.04485291680169634|
>>> +--+---+---+
>>> only showing top 1 row
>>> 
>>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>>> encoded)
>>> +--+---+---+---+++
>>> |id|label  |  feat2|idIndex|
>>>idOHE|features|
>>> +--+---+---+---+++
>>> |183601|
>>> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
>>> +--+---+---+---+++
>>> only showing top 1 row
>>> 
>>> #
>>> 296000
>>> +--+---+---+
>>> |id|label  |  feat2|
>>> +--+---+---+
>>> |137008| 0.2996020619810592|0.38693226548356197|
>>> +--+---+---+
>>> only showing top 1 row
>>> 
>>> This 

Avoid RDD shuffling in a join after Distributed Matrix operation

2016-08-22 Thread Tharindu
hi,
Just wanted to get your input how to avoid RDD shuffling in a join after
Distributed Matrix operation
spark 

Following is what my app would look like 

1. created a dense matrix as a input to calculate cosine distance between
columns


val rowMarixIn = sc.textFile("input.csv").map{ line =>
val values = line.split(" ").map(_.toDouble)
Vectors.dense(values)
}

2. Extracted set of entries from co-ordinated matrix after the cosine
calculations  

val coMarix = new RowMatrix(rowMarixIn)
val similerRows = coMatrix.columnSimilarities()

//extract entires over a specific Threshold

val rowIndices = similerRows.entries.map {case MatrixEntry(row:
Long, col: Long, sim: Double) =>
if (sim > someTreshold )){
col,sim
}

2. We have a another RDD with rdd2(key,Val2) 

just want to join the two rdd's,  rowIndices(key,Val) , rdd2(key,Val2)

   val joinedRDD = rowIndices.join(rdd2)`

its evident that this will result in a shuffle 

What are best practices to follow in order to avoid shuffle,
Any suggestion on a better approach to handle a RowMarix calculation and
utilize the result after that would be much appreciated 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Avoid-RDD-shuffling-in-a-join-after-Distributed-Matrix-operation-tp27574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: How to avoid RDD shuffling in join after Distributed Matrix calculation

2016-08-22 Thread Tharindu Thundeniya
hi,
Just wanted to get your input how to avoid RDD shuffling in a join after
Distributed Matrix operation
spark

Following is what my app would look like

1. created a dense matrix as a input to calculate cosine distance between
columns


val rowMarixIn = sc.textFile("input.csv").map{ line =>
val values = line.split(" ").map(_.toDouble)
Vectors.dense(values)
}

2. Extracted set of entries from co-ordinated matrix after the cosine
calculations

val coMarix = new RowMatrix(rowMarixIn)
val similerRows = coMatrix.columnSimilarities()

//extract entires over a specific Threshold

val rowIndices = similerRows.entries.map {case MatrixEntry(row:
Long, col: Long, sim: Double) =>
if (sim > someTreshold )){
col,sim
}

2. We have a another RDD with rdd2(key,Val2)

just want to join the two rdd's,  rowIndices(key,Val) , rdd2(key,Val2)

   val joinedRDD = rowIndices.join(rdd2)`

its evident that this will result in a shuffle

What are best practices to follow in order to avoid shuffle,
Any suggestion on a better approach to handle a RowMarix calculation and
utilize the result after that would be much appreciated

Thanks,
Tharindu


Re: Vector size mismatch in logistic regression - Spark ML 2.0

2016-08-22 Thread Nick Pentreath
I believe it may be because of this issue (
https://issues.apache.org/jira/browse/SPARK-13030). OHE is not an estimator
- hence in cases where the number of categories differ between train and
test, it's not usable in the current form.

It's tricky to work around, though one option is to use feature hashing
instead of the StringIndexer -> OHE combo (see
https://lists.apache.org/thread.html/a7e06426fd958665985d2c4218ea2f9bf9ba136ddefe83e1ad6f1727@%3Cuser.spark.apache.org%3E
for
some details).



On Mon, 22 Aug 2016 at 03:20 janardhan shetty 
wrote:

> Thanks Krishna for your response.
> Features in the training set has more categories than test set so when
> vectorAssembler is used these numbers are usually different and I believe
> it is as expected right ?
>
> Test dataset usually will not have so many categories in their features as
> Train is the belief here.
>
> On Sun, Aug 21, 2016 at 4:44 PM, Krishna Sankar 
> wrote:
>
>> Hi,
>>Just after I sent the mail, I realized that the error might be with
>> the training-dataset not the test-dataset.
>>
>>1. it might be that you are feeding the full Y vector for training.
>>2. Which could mean, you are using ~50-50 training-test split.
>>3. Take a good look at the code that does the data split and the
>>datasets where they are allocated to.
>>
>> Cheers
>> 
>>
>> On Sun, Aug 21, 2016 at 4:37 PM, Krishna Sankar 
>> wrote:
>>
>>> Hi,
>>>   Looks like the test-dataset has different sizes for X & Y. Possible
>>> steps:
>>>
>>>1. What is the test-data-size ?
>>>   - If it is 15,909, check the prediction variable vector - it is
>>>   now 29,471, should be 15,909
>>>   - If you expect it to be 29,471, then the X Matrix is not right.
>>>   2. It is also probable that the size of the test-data is
>>>something else. If so, check the data pipeline.
>>>3. If you print the count() of the various vectors, I think you can
>>>find the error.
>>>
>>> Cheers & Good Luck
>>> 
>>>
>>> On Sun, Aug 21, 2016 at 3:16 PM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Hi,

 I have built the logistic regression model using training-dataset.
 When I am predicting on a test-dataset, it is throwing the below error
 of size mismatch.

 Steps done:
 1. String indexers on categorical features.
 2. One hot encoding on these indexed features.

 Any help is appreciated to resolve this issue or is it a bug ?

 SparkException: *Job aborted due to stage failure: Task 0 in stage
 635.0 failed 1 times, most recent failure: Lost task 0.0 in stage 635.0
 (TID 19421, localhost): java.lang.IllegalArgumentException: requirement
 failed: BLAS.dot(x: Vector, y:Vector) was given Vectors with non-matching
 sizes: x.size = 15909, y.size = 29471* at
 scala.Predef$.require(Predef.scala:224) at
 org.apache.spark.ml.linalg.BLAS$.dot(BLAS.scala:104) at
 org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$19.apply(LogisticRegression.scala:505)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel$$anonfun$19.apply(LogisticRegression.scala:504)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:594)
 at 
 org.apache.spark.ml.classification.LogisticRegressionModel.predictRaw(LogisticRegression.scala:484)
 at 
 org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:112)
 at 
 org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:111)
 at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr137$(Unknown
 Source) at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source) at
 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

>>>
>>>
>>
>


RE: Best way to read XML data from RDD

2016-08-22 Thread Puneet Tripathi
I was building a small app to stream messages from kafka via spark. The message 
was an xml, every message is a new xml. I wrote a simple app to do so[ this app 
expects the xml to be a single line]

from __future__ import print_function
from pyspark.sql import Row
import xml.etree.ElementTree as ET
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## This is where you parse the XML
dict ={}

def create_dict(rt,new=None):
global parent_tag

for child in rt:
if new == None :
parent_tag = child.tag
else :
parent_tag = parent_tag

if child.getchildren():
create_dict(child,parent_tag)
else:
# if child.tag in dict.keys():
# tag = tag + child.tag

# else:
# tag=child.tag
dict[parent_tag]=child.text
return dict

def parse_xml_to_row(xmlString):
dct={}
root = ET.fromstring(xmlString.encode('utf-8'))
dct = create_dict(root)
return Row(**dct)

def toCSVLine(data):
return ','.join(str(d) for d in data)

## Parsing code part ends here

#sc.stop()
# Configure Spark
conf = SparkConf().setAppName("PythonStreamingKafkaWordCount")
conf = conf.setMaster("local[*]")
sc   = SparkContext(conf=conf)
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 10)

zkQuorum, topic = 'localhost:2182', 'topic-name'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1})
lines = kvs.map(lambda x: x[1]).map(parse_xml_to_row).map(toCSVLine)
# lines.pprint()
lines.saveAsTextFiles('where you want to write the file ')

ssc.start()
ssc.awaitTerminationOrTimeout(50)
ssc.stop()

Hope this is helpful.

Puneet

From: Hyukjin Kwon [mailto:gurwls...@gmail.com]
Sent: Monday, August 22, 2016 4:34 PM
To: Diwakar Dhanuskodi
Cc: Darin McBeath; Jörn Franke; Felix Cheung; user
Subject: Re: Best way to read XML data from RDD

Do you mind share your codes and sample data? It should be okay with single XML 
if I remember this correctly.

2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi 
>:
Hi Darin,

Ate  you  using  this  utility  to  parse single line XML?


Sent from Samsung Mobile.

 Original message 
From: Darin McBeath >
Date:21/08/2016 17:44 (GMT+05:30)
To: Hyukjin Kwon >, Jörn Franke 
>
Cc: Diwakar Dhanuskodi 
>, Felix 
Cheung >, user 
>
Subject: Re: Best way to read XML data from RDD

Another option would be to look at spark-xml-utils.  We use this extensively in 
the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon >
To: Jörn Franke >
Cc: Diwakar Dhanuskodi 
>; Felix 
Cheung >; user 
>
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke 
>:

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more 

Re: Populating tables using hive and spark

2016-08-22 Thread Mich Talebzadeh
Ok This is my test

1) create table in Hive and populate it with two rows

hive> create table testme (col1 int, col2 string);
OK
hive> insert into testme values (1,'London');
Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
OK
hive> insert into testme values (2,'NY');
Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
OK
hive> select * from testme;
OK
1   London
2   NY

So the rows are there

Now use  Spark to create two more rows

scala> case class columns (col1: Int, col2: String)
defined class columns
scala> val df =sc.parallelize(Array((3,"California"),(4,"Dehli"))).map(p =>
columns(p._1.toString.toInt, p._2.toString)).toDF()
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
scala> df.show
++--+
|col1|  col2|
++--+
|   3|California|
|   4| Dehli|
++--+

// register it as tempTable
scala> df.registerTempTable("tmp")
scala> sql("insert into test.testme select * from tmp")
res9: org.apache.spark.sql.DataFrame = []
scala> sql("select * from testme").show
++--+
|col1|  col2|
++--+
|   1|London|
|   2|NY|
|   3|California|
|   4| Dehli|
++--+
So the rows are there.

Let me go to Hive again now


hive>  select * from testme;
OK
1   London
2   NY
3   California
4   Dehli

hive> analyze table testme compute statistics for columns;

So is there any issue here?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 22 August 2016 at 11:51, Nitin Kumar  wrote:

> Hi Furcy,
>
> If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS"
> before checking the count from hive, Hive returns the correct count albeit
> it does not spawn a map-reduce job for computing the count.
>
> I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1
>
> If others can concur we can go ahead and report it as a bug.
>
> Regards,
> Nitin
>
>
>
> On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin  wrote:
>
>> Hi Nitin,
>>
>> I confirm that there is something odd here.
>>
>> I did the following test :
>>
>> create table test_orc (id int, name string, dept string) stored as ORC;
>> insert into table test_orc values (1, 'abc', 'xyz');
>> insert into table test_orc values (2, 'def', 'xyz');
>> insert into table test_orc values (3, 'pqr', 'xyz');
>> insert into table test_orc values (4, 'ghi', 'xyz');
>>
>>
>> I ended up with 4 files on hdfs:
>>
>> 00_0
>> 00_0_copy_1
>> 00_0_copy_2
>> 00_0_copy_3
>>
>>
>> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) =
>> 4 with hive.
>> So this is not a file name issue.
>>
>> I then removed one of the files, and I got this :
>>
>> > SELECT COUNT(1) FROM test_orc ;
>> +--+--+
>> | _c0  |
>> +--+--+
>> | 4|
>> +--+--+
>>
>> > SELECT * FROM test_orc ;
>> +--+++--+
>> | test_orc.id  | test_orc.name  | test_orc.dept  |
>> +--+++--+
>> | 1| abc| xyz|
>> | 2| def| xyz|
>> | 4| ghi| xyz|
>> +--+++--+
>> 3 rows selected (0.162 seconds)
>>
>> So, my guess is that when Hive inserts data, it must keep somewhere in
>> the metastore the number of rows in the table.
>> However, if the files are modified by someone else than Hive itself,
>> (either manually or with Spark), you end up with an inconsistency.
>>
>> So I guess we can call it a bug:
>>
>> Hive should detect that the files changed and invalidate its
>> pre-calculated count.
>> Optionally, Spark should be nice with Hive and update the the count when
>> inserting.
>>
>> I don't know if this bug has already been reported, and I tested on Hive
>> 1.1.0, so perhaps it is already solved in later releases.
>>
>> Regards,
>>
>> Furcy
>>
>>
>> On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar 
>> wrote:
>>
>>> Hi!
>>>
>>> I've noticed that hive has problems in registering new data records if
>>> the same table is written to using both the hive terminal and spark sql.
>>> The problem is demonstrated through the commands listed below
>>>
>>> 
>>> hive> use default;
>>> hive> create table test_orc (id int, name string, dept string) stored 

Re: Best way to read XML data from RDD

2016-08-22 Thread Hyukjin Kwon
Do you mind share your codes and sample data? It should be okay with single
XML if I remember this correctly.

2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi 
:

> Hi Darin,
>
> Ate  you  using  this  utility  to  parse single line XML?
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Darin McBeath 
> Date:21/08/2016 17:44 (GMT+05:30)
> To: Hyukjin Kwon , Jörn Franke 
>
> Cc: Diwakar Dhanuskodi , Felix Cheung <
> felixcheun...@hotmail.com>, user 
> Subject: Re: Best way to read XML data from RDD
>
> Another option would be to look at spark-xml-utils.  We use this
> extensively in the manipulation of our XML content.
>
> https://github.com/elsevierlabs-os/spark-xml-utils
>
>
>
> There are quite a few examples.  Depending on your preference (and what
> you want to do), you could use xpath, xquery, or xslt to transform,
> extract, or filter.
>
> Like mentioned below, you want to initialize the parser in a mapPartitions
> call (one of the examples shows this).
>
> Hope this is helpful.
>
> Darin.
>
>
>
>
>
> 
> From: Hyukjin Kwon 
> To: Jörn Franke 
> Cc: Diwakar Dhanuskodi ; Felix Cheung <
> felixcheun...@hotmail.com>; user 
> Sent: Sunday, August 21, 2016 6:10 AM
> Subject: Re: Best way to read XML data from RDD
>
>
>
> Hi Diwakar,
>
> Spark XML library can take RDD as source.
>
> ```
> val df = new XmlReader()
>   .withRowTag("book")
>   .xmlRdd(sqlContext, rdd)
> ```
>
> If performance is critical, I would also recommend to take care of
> creation and destruction of the parser.
>
> If the parser is not serializble, then you can do the creation for each
> partition within mapPartition just like
>
> https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9b
> b188140423/sql/core/src/main/scala/org/apache/spark/sql/
> DataFrameReader.scala#L322-L325
>
>
> I hope this is helpful.
>
>
>
>
> 2016-08-20 15:10 GMT+09:00 Jörn Franke :
>
> I fear the issue is that this will create and destroy a XML parser object
> 2 mio times, which is very inefficient - it does not really look like a
> parser performance issue. Can't you do something about the format choice?
> Ask your supplier to deliver another format (ideally avro or sth like
> this?)?
> >Otherwise you could just create one XML Parser object / node, but sharing
> this among the parallel tasks on the same node is tricky.
> >The other possibility could be simply more hardware ...
> >
> >On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
> >
> >
> >Yes . It accepts a xml file as source but not RDD. The XML data embedded
> inside json is streamed from kafka cluster.  So I could get it as RDD.
> >>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map
> function  but  performance  wise I am not happy as it takes 4 minutes to
> parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment.
> >>
> >>
> >>
> >>
> >>Sent from Samsung Mobile.
> >>
> >>
> >> Original message 
> >>From: Felix Cheung 
> >>Date:20/08/2016  09:49  (GMT+05:30)
> >>To: Diwakar Dhanuskodi  , user <
> user@spark.apache.org>
> >>Cc:
> >>Subject: Re: Best way to read XML data from RDD
> >>
> >>
> >>Have you tried
> >>
> >>https://github.com/databricks/ spark-xml
> >>?
> >>
> >>
> >>
> >>
> >>
> >>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" <
> diwakar.dhanusk...@gmail.com> wrote:
> >>
> >>
> >>Hi,
> >>
> >>
> >>There is a RDD with json data. I could read json data using
> rdd.read.json . The json data has XML data in couple of key-value paris.
> >>
> >>
> >>Which is the best method to read and parse XML from rdd. Is there any
> specific xml libraries for spark. Could anyone help on this.
> >>
> >>
> >>Thanks.
>


Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Hi Darin, 

Ate  you  using  this  utility  to  parse single line XML?


Sent from Samsung Mobile.

 Original message From: Darin McBeath 
 Date:21/08/2016  17:44  (GMT+05:30) 
To: Hyukjin Kwon , Jörn Franke 
 Cc: Diwakar Dhanuskodi 
, Felix Cheung , user 
 Subject: Re: Best way to read XML data from 
RDD 
Another option would be to look at spark-xml-utils.  We use this 
extensively in the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon 
To: Jörn Franke  
Cc: Diwakar Dhanuskodi ; Felix Cheung 
; user 
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke :

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more hardware ...
>
>On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi  
>wrote:
>
>
>Yes . It accepts a xml file as source but not RDD. The XML data embedded  
>inside json is streamed from kafka cluster.  So I could get it as RDD. 
>>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
>>function  but  performance  wise I am not happy as it takes 4 minutes to 
>>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 
>>
>>
>>
>>
>>Sent from Samsung Mobile.
>>
>>
>> Original message 
>>From: Felix Cheung  
>>Date:20/08/2016  09:49  (GMT+05:30) 
>>To: Diwakar Dhanuskodi  , user 
>> 
>>Cc: 
>>Subject: Re: Best way to read XML data from RDD 
>>
>>
>>Have you tried
>>
>>https://github.com/databricks/ spark-xml
>>?
>>
>>
>>
>>
>>
>>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
>> wrote:
>>
>>
>>Hi,  
>>
>>
>>There is a RDD with json data. I could read json data using rdd.read.json . 
>>The json data has XML data in couple of key-value paris. 
>>
>>
>>Which is the best method to read and parse XML from rdd. Is there any 
>>specific xml libraries for spark. Could anyone help on this.
>>
>>
>>Thanks.


Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Hi Franke, 
Source  format  cannot  be  changed as  of  now  add  it  is  a pretty standard 
format working  for  years.
Yeah  creating  one  parser I can  tryout .

Sent from Samsung Mobile.

 Original message From: Jörn Franke 
 Date:20/08/2016  11:40  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
Felix Cheung , user  
Subject: Re: Best way to read XML data from RDD 
I fear the issue is that this will create and destroy a XML parser object 
2 mio times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
Otherwise you could just create one XML Parser object / node, but sharing this 
among the parallel tasks on the same node is tricky.
The other possibility could be simply more hardware ...

On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi  
wrote:

Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.


 Original message 
From: Felix Cheung 
Date:20/08/2016 09:49 (GMT+05:30)
To: Diwakar Dhanuskodi , user 

Cc:
Subject: Re: Best way to read XML data from RDD

Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
 wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 

Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi

Hi Kwon, 

Was trying  out  spark  XML library .  I keep  on  getting  errors in inferring 
schema. Looks like it cannot infer single line  XML data. 

Sent from Samsung Mobile.

 Original message 
From: Hyukjin Kwon 
Date:21/08/2016 15:40 (GMT+05:30)
To: Jörn Franke 
Cc: Diwakar Dhanuskodi , Felix Cheung 
, user 
Subject: Re: Best way to read XML data from RDD

Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325

I hope this is helpful.



2016-08-20 15:10 GMT+09:00 Jörn Franke :
I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
Otherwise you could just create one XML Parser object / node, but sharing this 
among the parallel tasks on the same node is tricky.
The other possibility could be simply more hardware ...

On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi  
wrote:

Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.


 Original message 
From: Felix Cheung 
Date:20/08/2016 09:49 (GMT+05:30) 
To: Diwakar Dhanuskodi , user 

Cc:
Subject: Re: Best way to read XML data from RDD

Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
 wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 



Re: Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi Furcy,

If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS" before
checking the count from hive, Hive returns the correct count albeit it does
not spawn a map-reduce job for computing the count.

I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1

If others can concur we can go ahead and report it as a bug.

Regards,
Nitin



On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin  wrote:

> Hi Nitin,
>
> I confirm that there is something odd here.
>
> I did the following test :
>
> create table test_orc (id int, name string, dept string) stored as ORC;
> insert into table test_orc values (1, 'abc', 'xyz');
> insert into table test_orc values (2, 'def', 'xyz');
> insert into table test_orc values (3, 'pqr', 'xyz');
> insert into table test_orc values (4, 'ghi', 'xyz');
>
>
> I ended up with 4 files on hdfs:
>
> 00_0
> 00_0_copy_1
> 00_0_copy_2
> 00_0_copy_3
>
>
> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) = 4
> with hive.
> So this is not a file name issue.
>
> I then removed one of the files, and I got this :
>
> > SELECT COUNT(1) FROM test_orc ;
> +--+--+
> | _c0  |
> +--+--+
> | 4|
> +--+--+
>
> > SELECT * FROM test_orc ;
> +--+++--+
> | test_orc.id  | test_orc.name  | test_orc.dept  |
> +--+++--+
> | 1| abc| xyz|
> | 2| def| xyz|
> | 4| ghi| xyz|
> +--+++--+
> 3 rows selected (0.162 seconds)
>
> So, my guess is that when Hive inserts data, it must keep somewhere in the
> metastore the number of rows in the table.
> However, if the files are modified by someone else than Hive itself,
> (either manually or with Spark), you end up with an inconsistency.
>
> So I guess we can call it a bug:
>
> Hive should detect that the files changed and invalidate its
> pre-calculated count.
> Optionally, Spark should be nice with Hive and update the the count when
> inserting.
>
> I don't know if this bug has already been reported, and I tested on Hive
> 1.1.0, so perhaps it is already solved in later releases.
>
> Regards,
>
> Furcy
>
>
> On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar 
> wrote:
>
>> Hi!
>>
>> I've noticed that hive has problems in registering new data records if
>> the same table is written to using both the hive terminal and spark sql.
>> The problem is demonstrated through the commands listed below
>>
>> 
>> hive> use default;
>> hive> create table test_orc (id int, name string, dept string) stored as
>> ORC;
>> hive> insert into table test_orc values (1, 'abc', 'xyz');
>> hive> insert into table test_orc values (2, 'def', 'xyz');
>> hive> select count(*) from test_orc;
>> OK
>> 2
>> hive> select distinct(name) from test_orc;
>> OK
>> abc
>> def
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>>
>> ​
>>
>> >>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>> >>> column_names = ['identity_id', 'emp_name', 'dept_name']
>> >>> data_df = sqlContext.createDataFrame(data_points, column_names)
>> >>> data_df.show()
>>
>> +---++-+
>> |identity_id|emp_name|dept_name|
>> +---++-+
>> |  3| pqr|  xyz|
>> |  4| ghi|  xyz|
>> +---++-+
>>
>> >>> data_df.registerTempTable('temp_table')
>> >>> sqlContext.sql('insert into table default.test_orc select * from
>> temp_table')
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>> ​
>> hive> select count(*) from test_orc; (Does not launch map-reduce job)
>> OK
>> 2
>> hive> select distinct(name) from test_orc; (Launches map-reduce job)
>> abc
>> def
>> ghi
>> pqr
>>
>> hive> create table test_orc_new like test_orc stored as ORC;
>> hive> insert into table test_orc_new select * from test_orc;
>> hive> select count(*) from test_orc_new;
>> OK
>> 4
>> ==
>>
>> Even if I restart the hive services I cannot get the proper count output
>> from hive. This problem only occurs if the table is written to using both
>> hive and spark. If only spark is used to insert records into the table
>> multiple times, the count query in the hive terminal works perfectly fine.
>>
>> This problem occurs for tables stored with different storage formats as
>> well (textFile etc.)
>>
>> Is this because of the different naming conventions used by hive and
>> spark to write records to hdfs? Or maybe it is not a recommended practice
>> to write tables using different services?
>>
>> Your thoughts and comments on this matter would be highly appreciated!
>>
>> Thanks!
>> Nitin
>>
>>
>>
>


updateStateByKey for window batching

2016-08-22 Thread Dávid Szakállas
Hi!

I’m curious about the fault-tolerance properties of stateful streaming 
operations. I am specifically interested about updateStateByKey.
What happens if a node fails during processing? Is the state recoverable?

Our use case is the following: we have messages arriving from a message queue 
about updating a resource specified in the message. When such update request 
arrives, we wait a specific amount of times and if in that window another 
update message arrives pointing in the same resource, we batch these, and 
update the after the time elapsed since the first in this window and update the 
resource. We thought about using updateStateByKey with key as the resource 
identifier.

It is important to guarantee exactly once processing for the messages so every 
update should happen, and no more than once.

Is it a good way to go?

Cheers,

--
David Szakallas | Software Engineer, RisingStack
Monitoring with Trace: http://trace.risingstack.com 

http://risingstack.com  | http://blog.risingstack.com 

Twitter: @szdavid92







signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-22 Thread Bedrytski Aliaksandr
Hi Everett,

HiveContext is initialized only once as a lazy val, so if you mean
initializing different jvms for each (or a group of) test(s), then in
this case the context will not, obviously, be shared.

But specs2 (by default) launches specs (inside of tests classes) in
parallel threads and in this case the context is shared.

To sum up, tests are launched sequentially, but specs inside of tests
are launched in parallel. We don't have anything specific in our .sbt
file in regards to the parallel test execution and hive context is
initialized only once.

In my opinion (correct me if I'm wrong), if you already have >1 specs
per test, the CPU will be already saturated, so total parallel execution
of tests will not give additional gains.

Regards
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Sun, Aug 21, 2016, at 18:30, Everett Anderson wrote:
>
>
> On Sun, Aug 21, 2016 at 3:08 AM, Bedrytski Aliaksandr
>  wrote:
>> __
>> Hi,
>>
>> we share the same spark/hive context between tests (executed in
>> parallel), so the main problem is that the temporary tables are
>> overwritten each time they are created, this may create race
>> conditions
>> as these tempTables may be seen as global mutable shared state.
>>
>> So each time we create a temporary table, we add an unique,
>> incremented,
>> thread safe id (AtomicInteger) to its name so that there are only
>> specific, non-shared temporary tables used for a test.
>
> Makes sense.
>
> But when you say you're sharing the same spark/hive context between
> tests, I'm assuming that's between the same tests within one test
> class, but you're not sharing across test classes (which a build tool
> like Maven or Gradle might have executed in separate JVMs).
>
> Is that right?
>
>
>
>>
>>
>> --
>>   Bedrytski Aliaksandr
>>   sp...@bedryt.ski
>>
>>
>>
>>> On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote:
>>> Hi!
>>>
>>> Just following up on this --
>>>
>>> When people talk about a shared session/context for testing
>>> like this,
>>> I assume it's still within one test class. So it's still the
>>> case that
>>> if you have a lot of test classes that test Spark-related
>>> things, you
>>> must configure your build system to not run in them in parallel.
>>> You'll get the benefit of not creating and tearing down a Spark
>>> session/context between test cases with a test class, though.
>>>
>>> Is that right?
>>>
>>> Or have people figured out a way to have sbt (or Maven/Gradle/etc)
>>> share Spark sessions/contexts across integration tests in a
>>> safe way?
>>>
>>>
>>> On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau
>>>  wrote:
>>> Thats a good point - there is an open issue for spark-testing-
>>> base to
>>> support this shared sparksession approach - but I haven't had the
>>> time ( https://github.com/holdenk/spark-testing-base/issues/123 ).
>>> I'll try and include this in the next release :)
>>>
>>> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers
>>>  wrote:
>>> we share a single single sparksession across tests, and they can run
>>> in parallel. is pretty fast
>>>
>>> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson
>>>  wrote:
>>> Hi,
>>>
>>> Right now, if any code uses DataFrame/Dataset, I need a test setup
>>> that brings up a local master as in this article[1].
>>>
>>>
>>> That's a lot of overhead for unit testing and the tests can't run
>>> in parallel, so testing is slow -- this is more like what I'd call
>>> an integration test.
>>>
>>> Do people have any tricks to get around this? Maybe using spy mocks
>>> on fake DataFrame/Datasets?
>>>
>>> Anyone know if there are plans to make more traditional unit
>>> testing possible with Spark SQL, perhaps with a stripped down in-
>>> memory implementation? (I admit this does seem quite hard since
>>> there's so much functionality in these classes!)
>>>
>>> Thanks!
>>>
>>>
>>> - Everett
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>


Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi!

I've noticed that hive has problems in registering new data records if the
same table is written to using both the hive terminal and spark sql. The
problem is demonstrated through the commands listed below


hive> use default;
hive> create table test_orc (id int, name string, dept string) stored as
ORC;
hive> insert into table test_orc values (1, 'abc', 'xyz');
hive> insert into table test_orc values (2, 'def', 'xyz');
hive> select count(*) from test_orc;
OK
2
hive> select distinct(name) from test_orc;
OK
abc
def

*** files in hdfs path in warehouse for the created table ***


​

>>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>>> column_names = ['identity_id', 'emp_name', 'dept_name']
>>> data_df = sqlContext.createDataFrame(data_points, column_names)
>>> data_df.show()

+---++-+
|identity_id|emp_name|dept_name|
+---++-+
|  3| pqr|  xyz|
|  4| ghi|  xyz|
+---++-+

>>> data_df.registerTempTable('temp_table')
>>> sqlContext.sql('insert into table default.test_orc select * from
temp_table')

*** files in hdfs path in warehouse for the created table ***

​
hive> select count(*) from test_orc; (Does not launch map-reduce job)
OK
2
hive> select distinct(name) from test_orc; (Launches map-reduce job)
abc
def
ghi
pqr

hive> create table test_orc_new like test_orc stored as ORC;
hive> insert into table test_orc_new select * from test_orc;
hive> select count(*) from test_orc_new;
OK
4
==

Even if I restart the hive services I cannot get the proper count output
from hive. This problem only occurs if the table is written to using both
hive and spark. If only spark is used to insert records into the table
multiple times, the count query in the hive terminal works perfectly fine.

This problem occurs for tables stored with different storage formats as
well (textFile etc.)

Is this because of the different naming conventions used by hive and spark
to write records to hdfs? Or maybe it is not a recommended practice to
write tables using different services?

Your thoughts and comments on this matter would be highly appreciated!

Thanks!
Nitin


Re: [Spark2] Error writing "complex" type to CSV

2016-08-22 Thread Hyukjin Kwon
Whether it writes the data as garbage or string representation, this is not
able to load back. So, I'd say both are wrong and bugs.

I think it'd be great if we can write and read back CSV in its own format
but I guess we can't for now.


2016-08-20 2:54 GMT+09:00 Efe Selcuk :

> Okay so this is partially PEBKAC. I just noticed that there's a debugging
> field at the end that's another case class with its own simple fields -
> *that's* the struct that was showing up in the error, not the entry
> itself.
>
> This raises a different question. What has changed that this is no longer
> possible? The pull request said that it prints garbage. Was that some
> regression in 2.0? The same code prints fine in 1.6.1. The field prints as
> an array of the values of its fields.
>
> On Thu, Aug 18, 2016 at 5:56 PM, Hyukjin Kwon  wrote:
>
>> Ah, BTW, there is an issue, SPARK-16216, about printing dates and
>> timestamps here. So please ignore the integer values for dates
>>
>> 2016-08-19 9:54 GMT+09:00 Hyukjin Kwon :
>>
>>> Ah, sorry, I should have read this carefully. Do you mind if I ask your
>>> codes to test?
>>>
>>> I would like to reproduce.
>>>
>>>
>>> I just tested this by myself but I couldn't reproduce as below (is this
>>> what your doing, right?):
>>>
>>> case class ClassData(a: String, b: Date)
>>>
>>> val ds: Dataset[ClassData] = Seq(
>>>   ("a", Date.valueOf("1990-12-13")),
>>>   ("a", Date.valueOf("1990-12-13")),
>>>   ("a", Date.valueOf("1990-12-13"))
>>> ).toDF("a", "b").as[ClassData]
>>> ds.write.csv("/tmp/data.csv")
>>> spark.read.csv("/tmp/data.csv").show()
>>>
>>> prints as below:
>>>
>>> +---++
>>> |_c0| _c1|
>>> +---++
>>> |  a|7651|
>>> |  a|7651|
>>> |  a|7651|
>>> +---++
>>>
>>> ​
>>>
>>> 2016-08-19 9:27 GMT+09:00 Efe Selcuk :
>>>
 Thanks for the response. The problem with that thought is that I don't
 think I'm dealing with a complex nested type. It's just a dataset where
 every record is a case class with only simple types as fields, strings and
 dates. There's no nesting.

 That's what confuses me about how it's interpreting the schema. The
 schema seems to be one complex field rather than a bunch of simple fields.

 On Thu, Aug 18, 2016, 5:07 PM Hyukjin Kwon  wrote:

> Hi Efe,
>
> If my understanding is correct, supporting to write/read complex types
> is not supported because CSV format can't represent the nested types in 
> its
> own format.
>
> I guess supporting them in writing in external CSV is rather a bug.
>
> I think it'd be great if we can write and read back CSV in its own
> format but I guess we can't.
>
> Thanks!
>
> On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:
>
>> We have an application working in Spark 1.6. It uses the databricks
>> csv library for the output format when writing out.
>>
>> I'm attempting an upgrade to Spark 2. When writing with both the
>> native DataFrameWriter#csv() method and with first specifying the
>> "com.databricks.spark.csv" format (I suspect underlying format is the 
>> same
>> but I don't know how to verify), I get the following error:
>>
>> java.lang.UnsupportedOperationException: CSV data source does not
>> support struct<[bunch of field names and types]> data type
>>
>> There are 20 fields, mostly plain strings with a couple of dates. The
>> source object is a Dataset[T] where T is a case class with various fields
>> The line just looks like: someDataset.write.csv(outputPath)
>>
>> Googling returned this fairly recent pull request:
>> https://mail-archives.apache.org/mod_mbox/spark-com
>> mits/201605.mbox/%3c65d35a72bd05483392857098a2635...@git.apa
>> che.org%3E
>>
>> If I'm reading that correctly, the schema shows that each record has
>> one field of this complex struct type? And the validation thinks it's
>> something that it can't serialize. I would expect the schema to have a
>> bunch of fields in it matching the case class, so maybe there's something
>> I'm misunderstanding.
>>
>> Efe
>>
>
>>>
>>
>