How to Fill Sparse Data With the Previous Non-Empty Value in SPARQL Dataset

2017-06-28 Thread Carlo Allocca

Dear All,

I am trying to propagate the last valid observation (e.g. not null) to the null 
values in a dataset.

Below I reported the partial solution:

Dataset tmp800=tmp700.select("uuid", "eventTime", "Washer_rinseCycles");
WindowSpec wspec= 
Window.partitionBy(tmp800.col("uuid")).orderBy(tmp800.col("uuid"),tmp800.col("eventTime"));
Column c1 = 
org.apache.spark.sql.functions.lag(tmp800.col("Washer_rinseCycles"),1).over(wspec);
Dataset tmp900=tmp800.withColumn("Washer_rinseCyclesFilled", 
when(tmp800.col("Washer_rinseCycles").isNull(), 
c1).otherwise(tmp800.col("Washer_rinseCycles")));
However, It does not solve the entire problem as the function lag(,1) returns 
the value that is the rows before the current row even if it is NULL (see the 
below table).

Is there in SPARK a similar method to Pandas' "backfill" for the DataFrame?

Is it possible to do it using SPARK API? How?

Many Thanks in advance.
Best Regards,
Carlo

[Immagine in linea con il testo]


How to Fill Sparse Data With the Previous Non-Empty Value in SPARQL Dataset

2017-06-25 Thread Carlo . Allocca
Dear All,

I need to apply a dataset transformation to replace null values with the 
previous Non-null Value.
As an example, I report the following:

from:

id | col1
-
1 null
1 null
2 4
2 null
2 null
3 5
3 null
3 null

to:

id  |  col1
-
1 null
1 null
2 4
2 4
2 4
3 5
3 5
3 5

I am using SPARK SQL 2 and the Dataset.

I searched on google but I only find solution in the context of database e.g 
(https://blog.jooq.org/2015/12/17/how-to-fill-sparse-data-with-the-previous-non-empty-value-in-sql/)

Please, any help how to implement this in SPARK ? I understood that I should 
use Windows and Lang but I cannot put them together.


Thank you in advance for your help.

Best Regards,
Carlo










Re: using spark-xml_2.10 to extract data from XML file

2017-02-15 Thread Carlo . Allocca
Hi Hyukjin,

Thank you very much for this.

Sure I am going to do it today based on data + java code.

Many Thanks for the support.
Best Regards,
Carlo


On 15 Feb 2017, at 00:22, Hyukjin Kwon 
<gurwls...@gmail.com<mailto:gurwls...@gmail.com>> wrote:

Hi Carlo,


There was a bug in lower versions when accessing to nested values in the 
library.

Otherwise, I suspect another issue about parsing malformed XML.

Could you maybe open an issue in https://github.com/databricks/spark-xml/issues 
with your sample data?

I will stick with it until it is solved.


Thanks.



2017-02-15 5:04 GMT+09:00 Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>:
more specifically:

Given the following XML data structure:

This is the Structure of the XML file:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)


CASE 1:

String rowTag="abstracts”;
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("abstract.ce:para"), 
df.col("abstract._original"),df.col("abstract._lang") ).show();

I got the right values.

CASE 2:

String rowTag="xocs:doc";
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("xocs:item.item.bibrecord.head.abstracts.abstract.ce:para")).show();

I got null values.


My question is: How Can I get it right to use String rowTag="xocs:doc”; and get 
the right values for  ….abstract.ce:para, etc? what am I doing wrong?

Many Thanks in advance.
Best Regards,
Carlo



On 14 Feb 2017, at 17:35, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Dear All,

I would like to ask you help about the following issue when using 
spark-xml_2.10:

Given a XML file with the following structure:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)

Using the below code to extract all the info from the abstract:

1) I got “null" for each three values: _original, _lang and ce:para when I use 
rowTag = “xocs:doc”.
2) I got the right values when I use rowTag = “abstracts”.

Of course, I need to write a general parser that works at xocs:doc level.
I have been reading the documentation at 
https://github.com/databricks/spark-xml but I did not help me much to solve the 
above issue.

Am I doing something wrong? or it may be related to bug the library I am using?

Please, could you advice?

Many Thanks,
Best Regards,
carlo





=== Code:
public static void main(String arr[]) {

// xocs:item/item/bibrecord/head/abstracts  section
StructType _abstract = new StructType(new StructField[]{
new StructField("_original", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("_lang", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("ce:para", DataTypes.StringType, true, 
Metadata.empty())
});
StructType _abstracts = new StructType(new StructField[]{
new StructField("abstract", _abstract, true, Metadata.empty())
});

StructType _head = new StructType(new StructField[]{
new StructField("abstracts", _abstracts, true, Metadata.empty())
});

StructType _bibrecord = new StructType(new StructField[]{
new StructField("head", _head, true, Metadata.empty())

});

StructType _item = new StructType(new StructField[]{
new StructField("bibrecord", _bibrecord, true, Metadata.empty())
});

StructType _xocs_item = new StructType(new StructField[]{
new StructField("item", _item, true, Metadata.empty()),});

StructType rexploreXMLDataSchema = new StructType(new StructField[]{
new StructField("xocs:item", _xocs_item, true, Metadata.empty()),});

String localxml = “..filename.xml";

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("Data

Re: using spark-xml_2.10 to extract data from XML file

2017-02-14 Thread Carlo . Allocca
more specifically:

Given the following XML data structure:

This is the Structure of the XML file:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)


CASE 1:

String rowTag="abstracts”;
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("abstract.ce:para"), 
df.col("abstract._original"),df.col("abstract._lang") ).show();

I got the right values.

CASE 2:

String rowTag="xocs:doc";
Dataset df = (new 
XmlReader()).withAttributePrefix("_").withRowTag(rowTag).xmlFile(sqlContext, 
localxml);
df.select(df.col("xocs:item.item.bibrecord.head.abstracts.abstract.ce:para")).show();

I got null values.


My question is: How Can I get it right to use String rowTag="xocs:doc”; and get 
the right values for  ….abstract.ce:para, etc? what am I doing wrong?

Many Thanks in advance.
Best Regards,
Carlo



On 14 Feb 2017, at 17:35, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Dear All,

I would like to ask you help about the following issue when using 
spark-xml_2.10:

Given a XML file with the following structure:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)

Using the below code to extract all the info from the abstract:

1) I got “null" for each three values: _original, _lang and ce:para when I use 
rowTag = “xocs:doc”.
2) I got the right values when I use rowTag = “abstracts”.

Of course, I need to write a general parser that works at xocs:doc level.
I have been reading the documentation at 
https://github.com/databricks/spark-xml but I did not help me much to solve the 
above issue.

Am I doing something wrong? or it may be related to bug the library I am using?

Please, could you advice?

Many Thanks,
Best Regards,
carlo





=== Code:
public static void main(String arr[]) {

// xocs:item/item/bibrecord/head/abstracts  section
StructType _abstract = new StructType(new StructField[]{
new StructField("_original", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("_lang", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("ce:para", DataTypes.StringType, true, 
Metadata.empty())
});
StructType _abstracts = new StructType(new StructField[]{
new StructField("abstract", _abstract, true, Metadata.empty())
});

StructType _head = new StructType(new StructField[]{
new StructField("abstracts", _abstracts, true, Metadata.empty())
});

StructType _bibrecord = new StructType(new StructField[]{
new StructField("head", _head, true, Metadata.empty())

});

StructType _item = new StructType(new StructField[]{
new StructField("bibrecord", _bibrecord, true, Metadata.empty())
});

StructType _xocs_item = new StructType(new StructField[]{
new StructField("item", _item, true, Metadata.empty()),});

StructType rexploreXMLDataSchema = new StructType(new StructField[]{
new StructField("xocs:item", _xocs_item, true, Metadata.empty()),});

String localxml = “..filename.xml";

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.getOrCreate();

String rowTag = "xocs:doc";



SQLContext sqlContext = new SQLContext(spark);
Dataset df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getFie

Re: using spark-xml_2.10 to extract data from XML file

2017-02-14 Thread Carlo . Allocca
Dear All,

I would like to ask you help about the following issue when using 
spark-xml_2.10:

Given a XML file with the following structure:

xocs:doc
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)
 ||||||-- ce:para: string (nullable = true)

Using the below code to extract all the info from the abstract:

1) I got “null" for each three values: _original, _lang and ce:para when I use 
rowTag = “xocs:doc”.
2) I got the right values when I use rowTag = “abstracts”.

Of course, I need to write a general parser that works at xocs:doc level.
I have been reading the documentation at 
https://github.com/databricks/spark-xml but I did not help me much to solve the 
above issue.

Am I doing something wrong? or it may be related to bug the library I am using?

Please, could you advice?

Many Thanks,
Best Regards,
carlo





=== Code:
public static void main(String arr[]) {

// xocs:item/item/bibrecord/head/abstracts  section
StructType _abstract = new StructType(new StructField[]{
new StructField("_original", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("_lang", DataTypes.StringType, true, 
Metadata.empty()),
new StructField("ce:para", DataTypes.StringType, true, 
Metadata.empty())
});
StructType _abstracts = new StructType(new StructField[]{
new StructField("abstract", _abstract, true, Metadata.empty())
});

StructType _head = new StructType(new StructField[]{
new StructField("abstracts", _abstracts, true, Metadata.empty())
});

StructType _bibrecord = new StructType(new StructField[]{
new StructField("head", _head, true, Metadata.empty())

});

StructType _item = new StructType(new StructField[]{
new StructField("bibrecord", _bibrecord, true, Metadata.empty())
});

StructType _xocs_item = new StructType(new StructField[]{
new StructField("item", _item, true, Metadata.empty()),});

StructType rexploreXMLDataSchema = new StructType(new StructField[]{
new StructField("xocs:item", _xocs_item, true, Metadata.empty()),});

String localxml = “..filename.xml";

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.getOrCreate();

String rowTag = "xocs:doc";



SQLContext sqlContext = new SQLContext(spark);
Dataset df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("_original"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getItem("_lang"),

df.col("xocs:item").getField("item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("ce:para")
).show();

//df.select(
//df.col("_original"),
//df.col("_lang"),
//df.col("ce:para")
//
//).show();

//df.select(
//df.col("abstract").getField("_original"),
//df.col("abstract").getField("_lang"),
//df.col("abstract").getField("ce:para")
//
//).show();


//df.select(
//
df.col("head").getField("abstracts").getField("abstract").getField("_original"),
//
df.col("head").getField("abstracts").getField("abstract").getField("_lang"),
//
df.col("head").getField("abstracts").getField("abstract").getField("ce:para")
//
//).show();




}




On 13 Feb 2017, at 18:17, Carlo.Allocca 
> wrote:

Dear All,

I am using spark-xml_2.10 to parse and extract some data from XML files.

I got the issue of getting null value whereas the XML file contains actually 
values.

++--++
|xocs:item.bibrecord.head.abstracts.abstract._original 

using spark-xml_2.10 to extract data from XML file

2017-02-13 Thread Carlo . Allocca
Dear All,

I am using spark-xml_2.10 to parse and extract some data from XML files.

I got the issue of getting null value whereas the XML file contains actually 
values.

++--++
|xocs:item.bibrecord.head.abstracts.abstract._original 
|xocs:item.bibrecord.head.abstracts.abstract._lang | 
xocs:item.bibrecord.head.abstracts.abstract.ce:para|
++--+-+
| null| 
null|   
 null|
++---+-+

Below, I report an example of XML that I processing and the code I am using to 
parse it.

What am I doing wrong?

Please, any help on this would be very appreciated.

Many Thanks in advance,
Best Regards,
Carlo




= An example


SPARK prints the following schema:

root
 |-- xocs:item: struct (nullable = true)
 ||-- bibrecord: struct (nullable = true)
 |||-- head: struct (nullable = true)
 ||||-- abstracts: struct (nullable = true)
 |||||-- abstract: struct (nullable = true)
 ||||||-- _original: string (nullable = true)
 ||||||-- _lang: string (nullable = true)



 XML file example

XML file structure:


 
   
 

df = sqlContext.read()
.format("com.databricks.spark.xml")
.option("rowTag", rowTag)
.option("attributePrefix", "_")
.schema(rexploreXMLDataSchema)
.load(localxml);

df.printSchema();

df.select(

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("_original"),

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getItem("_lang"),

df.col("xocs:item").getField("bibrecord").getItem("head").getField("abstracts").getField("abstract").getField("ce:para")
).show();

}

}


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-09 Thread Carlo . Allocca
Hi Masood,

Thanks for the answer.
Sure. I will do as suggested.

Many Thanks,
Best Regards,
Carlo
On 8 Nov 2016, at 17:19, Masood Krohy 
> wrote:

labels

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-08 Thread Carlo . Allocca
Hi Masood,

Thank you again for your suggestion.
I have got a question about the following:

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.


When applying the ML linear model as suggested above, it means that the 
predicted value is scaled. My question: Does it need be scaled-back? I mean to 
apply  the inverse of "calculate the average and std for each feature, deduct 
the avg, then divide by std.” to the predicted-value?
In practice, (predicted-value * std) + avg?

Is that correct? Am I missing anything?

Many Thanks in advance.
Best Regards,
Carlo


On 7 Nov 2016, at 17:14, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

I found it just google 
http://sebastianraschka.com/Articles/2014_about_feature_scaling.html

Thanks.
Carlo
On 7 Nov 2016, at 17:12, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>
A :Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>>
Cc :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>, Mohit Jaggi 
<mohitja...@gmail.com<mailto:mohitja...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>
A :Mohit Jaggi <mohitja...@gmail.com<mailto:mohitja...@gmail.com>>
Cc :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> <mohitja...@gmail.com<mailto:mohitja...@gmail.com>> wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
&g

Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Carlo . Allocca
I found it just google 
http://sebastianraschka.com/Articles/2014_about_feature_scaling.html

Thanks.
Carlo
On 7 Nov 2016, at 17:12, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>
A :Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>>
Cc :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>, Mohit Jaggi 
<mohitja...@gmail.com<mailto:mohitja...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
<masood.kr...@intact.net<mailto:masood.kr...@intact.net>> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>
A :Mohit Jaggi <mohitja...@gmail.com<mailto:mohitja...@gmail.com>>
Cc :Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> <mohitja...@gmail.com<mailto:mohitja...@gmail.com>> wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
>> <carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>> wrote:
>>
>> Hi All,
>>
>> I am using SPARK and in particular the MLib library.
>>
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>>
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>>
>> I checked the documentation and it seems that does not provide such methods.
>>
>> Am I missing anything?  Please, could you provide any help on thi

Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-07 Thread Carlo . Allocca
Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Masood Krohy 
>
Cc :Carlo.Allocca 
>, Mohit Jaggi 
>, 
"user@spark.apache.org" 
>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Mohit Jaggi >
Cc :Carlo.Allocca 
>, 
"user@spark.apache.org" 
>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> > wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
>> > wrote:
>>
>> Hi All,
>>
>> I am using SPARK and in particular the MLib library.
>>
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>>
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>>
>> I checked the documentation and it seems that does not provide such methods.
>>
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>>
>> Many Thanks in advance,
>>
>> Best Regards,
>> Carlo
>>
>>
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>> 038302). The Open University is authorised and regulated by the Financial 
>> Conduct Authority.
>>
>> -
>> To unsubscribe e-mail: 
>> user-unsubscr...@spark.apache.org
>>
>



Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-04 Thread Carlo . Allocca
Hi Robin,

On 4 Nov 2016, at 09:19, Robin East 
> wrote:

Hi

Do you mean the test of significance that you usually get with R output?
Yes, exactly.

I don’t think there is anything implemented in the standard MLLib libraries 
however I believe that the sparkR version provides that. See 
http://spark.apache.org/docs/1.6.2/sparkr.html#gaussian-glm-model

Glad to hear that as it means that I m not missing much.

Many Thanks.

Best Regards,
Carlo

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action





On 4 Nov 2016, at 07:38, Carlo.Allocca 
> wrote:

Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


On 3 Nov 2016, at 20:41, Mohit Jaggi 
> wrote:

For linear regression, it should be fairly easy. Just sort the co-efficients :)

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
> wrote:

Hi All,

I am using SPARK and in particular the MLib library.

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;

For my problem I am using the LinearRegressionWithSGD and I would like to 
perform a “Rank Features By Importance”.

I checked the documentation and it seems that does not provide such methods.

Am I missing anything?  Please, could you provide any help on this?
Should I change the approach?

Many Thanks in advance,

Best Regards,
Carlo


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-04 Thread Carlo . Allocca
Hi Mohit, 

Thank you for your reply. 
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi  wrote:
> 
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
> 
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
> 
> 
> 
> 
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca  wrote:
>> 
>> Hi All,
>> 
>> I am using SPARK and in particular the MLib library.
>> 
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>> 
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>> 
>> I checked the documentation and it seems that does not provide such methods.
>> 
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>> 
>> Many Thanks in advance,
>> 
>> Best Regards,
>> Carlo
>> 
>> 
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>> 038302). The Open University is authorised and regulated by the Financial 
>> Conduct Authority.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



LinearRegressionWithSGD and Rank Features By Importance

2016-11-03 Thread Carlo . Allocca
Hi All,

I am using SPARK and in particular the MLib library.

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;

For my problem I am using the LinearRegressionWithSGD and I would like to 
perform a “Rank Features By Importance”.

I checked the documentation and it seems that does not provide such methods.

Am I missing anything?  Please, could you provide any help on this?
Should I change the approach?

Many Thanks in advance,

Best Regards,
Carlo


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ClassNotFoundException org.apache.spark.Logging

2016-08-05 Thread Carlo . Allocca
Thanks Marcelo.
Problem solved.

Best,
Carlo


Hi Marcelo,

Thanks you for your help.
Problem solved as you suggested.

Best Regards,
Carlo

> On 5 Aug 2016, at 18:34, Marcelo Vanzin  wrote:
>
> On Fri, Aug 5, 2016 at 9:53 AM, Carlo.Allocca  
> wrote:
>>
>>org.apache.spark
>>spark-core_2.10
>>2.0.0
>>jar
>>
>>
>>org.apache.spark
>>spark-sql_2.10
>>2.0.0
>>jar
>>
>>
>>org.apache.spark
>>spark-mllib_2.10
>>1.3.0
>>jar
>>
>>
>>
>
> One of these is not like the others...
>
> --
> Marcelo

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ClassNotFoundException org.apache.spark.Logging

2016-08-05 Thread Carlo . Allocca
I have also executed:

mvn dependency:tree |grep log
[INFO] |  | +- com.esotericsoftware:minlog:jar:1.3.0:compile
[INFO] +- log4j:log4j:jar:1.2.17:compile
[INFO] +- org.slf4j:slf4j-log4j12:jar:1.7.16:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.1.3:compile


and the POM reports the above libraries.

Many Thanks for your help.

Carlo


On 5 Aug 2016, at 18:17, Carlo.Allocca 
> wrote:

Please Sean, could you detail the version mismatch?

Many thanks,
Carlo
On 5 Aug 2016, at 18:11, Sean Owen 
> wrote:

You also seem to have a
version mismatch here.


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: ClassNotFoundException org.apache.spark.Logging

2016-08-05 Thread Carlo . Allocca
Please Sean, could you detail the version mismatch?

Many thanks,
Carlo
On 5 Aug 2016, at 18:11, Sean Owen 
> wrote:

You also seem to have a
version mismatch here.

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: ClassNotFoundException org.apache.spark.Logging

2016-08-05 Thread Carlo . Allocca
Hi Ted,

Thanks for the promptly answer.
It is not yet clear to me what I should do.

How to fix it?

Many thanks,
Carlo

On 5 Aug 2016, at 17:58, Ted Yu 
> wrote:

private[spark] trait Logging {

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


ClassNotFoundException org.apache.spark.Logging

2016-08-05 Thread Carlo . Allocca
Dear All,

I would like to ask for your help about the following issue: 
java.lang.ClassNotFoundException: org.apache.spark.Logging

I checked and the class Logging is not present.
Moreover, the line of code where the exception is thrown

final org.apache.spark.mllib.regression.LinearRegressionModel lrModel
= LinearRegressionWithSGD.train(a, numIterations, stepSize);


My POM is as reported below.


What am I doing wrong or missing? How I can fix it?

Many Thanks in advice for your support.

Best,
Carlo



 POM




org.apache.spark
spark-core_2.10
2.0.0
jar




org.apache.spark
spark-sql_2.10
2.0.0
jar



log4j
log4j
1.2.17
test




org.slf4j
slf4j-log4j12
1.7.16
test




org.apache.hadoop
hadoop-client
2.7.2



junit
junit
4.12



org.hamcrest
hamcrest-core
1.3


org.apache.spark
spark-mllib_2.10
1.3.0
jar




-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataset and JavaRDD: how to eliminate the header.

2016-08-03 Thread Carlo . Allocca

On 3 Aug 2016, at 22:01, Mich Talebzadeh 
> wrote:

ok in other words the result set of joining two dataset ends up with 
inconsistent result as a header from one DS is joined with another row from 
another DS?
I am not 100% sure I got this point. Let me check if I got it right:

1) DS1 has got the schema 
2) DS2 has got the Schema 

The join operation is defined as follows:

Dataset DS1_Join_ DS2= DS1.join(DS2, DS1.col(“ PRD_Id”).equalTo(DS2.col(“ 
ORD_id”)), inner );

My understanding is that this operation should generate DS1_Join_ DS2 with the 
following schema: 

Where is the inconsistency that you mention before?


You really need to get rid of headers one way or other before joining.
If I do it, my next question is: how does SPARK know what column is what?

or try to register them as temp table before join to see where the failure is.
I need to try this.

Many Thanks for your help Mich.

Carlo.

HTH

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 3 August 2016 at 21:49, Carlo.Allocca 
> wrote:
Hi Mich,

Thanks again.
My issue is not when I read the csv from a file.
It is when you have a Dataset that is output of some join operations.

Any help on that?

Many Thanks,
Best,
Carlo

On 3 Aug 2016, at 21:43, Mich Talebzadeh 
> wrote:

hm odd.

Otherwise you can try using databricks to read the CSV file. This is scala 
example

//val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"false").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")


HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 3 August 2016 at 20:01, Carlo.Allocca 
> wrote:
One more:

it seems that the steps

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =dataset1_Join_dataset2.toJavaRDD();


and

  List someRows = dataPointsWithHeader.collect();
 someRows.forEach(System.out::println);

do not print the header.

So, Could I assume that any transformation I apply to a JavaRDD, the 
header is not considered by default?

Many thanks,
Best Regards,
Carlo


On 3 Aug 2016, at 18:58, Carlo.Allocca 
> wrote:

Thanks Mich.

Yes, I know both headers (categoryRankSchema, categorySchema ) as expressed 
below:

this.dataset1 = 
d1_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);

   this.dataset2 = d2_DFR.schema(categorySchema).csv(categoryFilePath);

Can you use filter to get rid of the header from both CSV files before joining 
them?
Well I can give a try. But my case is a bit more complex than joining two 
datasets. It joins data from at least six datasets which are processed 
separately (to clean, and extract the need info) and, only at the end I do join 
of three datasets for which I know the headers.

I do believe that there should be anther way to achieve my goal.

Any other suggestion would be very appreciated.

Many Thanks.
Best Regards,
carlo



On 3 Aug 2016, at 18:45, Mich Talebzadeh 
> wrote:

Do you know the headers?

Can you use filter to get rid of the header from both CSV files before joining 
them?



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on 

Re: Dataset and JavaRDD: how to eliminate the header.

2016-08-03 Thread Carlo . Allocca
Hi Mich,

Thanks again.
My issue is not when I read the csv from a file.
It is when you have a Dataset that is output of some join operations.

Any help on that?

Many Thanks,
Best,
Carlo

On 3 Aug 2016, at 21:43, Mich Talebzadeh 
> wrote:

hm odd.

Otherwise you can try using databricks to read the CSV file. This is scala 
example

//val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"true").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")
val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", 
"false").load("hdfs://rhes564:9000/data/stg/accounts/ll/18740868")


HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 3 August 2016 at 20:01, Carlo.Allocca 
> wrote:
One more:

it seems that the steps

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =dataset1_Join_dataset2.toJavaRDD();


and

  List someRows = dataPointsWithHeader.collect();
 someRows.forEach(System.out::println);

do not print the header.

So, Could I assume that any transformation I apply to a JavaRDD, the 
header is not considered by default?

Many thanks,
Best Regards,
Carlo


On 3 Aug 2016, at 18:58, Carlo.Allocca 
> wrote:

Thanks Mich.

Yes, I know both headers (categoryRankSchema, categorySchema ) as expressed 
below:

this.dataset1 = 
d1_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);

   this.dataset2 = d2_DFR.schema(categorySchema).csv(categoryFilePath);

Can you use filter to get rid of the header from both CSV files before joining 
them?
Well I can give a try. But my case is a bit more complex than joining two 
datasets. It joins data from at least six datasets which are processed 
separately (to clean, and extract the need info) and, only at the end I do join 
of three datasets for which I know the headers.

I do believe that there should be anther way to achieve my goal.

Any other suggestion would be very appreciated.

Many Thanks.
Best Regards,
carlo



On 3 Aug 2016, at 18:45, Mich Talebzadeh 
> wrote:

Do you know the headers?

Can you use filter to get rid of the header from both CSV files before joining 
them?



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 3 August 2016 at 18:32, Carlo.Allocca 
> wrote:
Hi Aseem,

Thank you very much for your help.

Please, allow me to be more specific for my case (to some extent I already do 
what you suggested):

Let us imagine that I two csv datasets d1 and d2. I generate the Dataset 
as in the following:

== Reading d1:

sparkSession=spark;

options = new HashMap();
options.put("header", "true");
options.put("delimiter", delimiter);
options.put("nullValue", nullValue);
DataFrameReader d1_DFR = spark.read().options(options);
this.dataset1 = 
d1_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);

== Reading d2

sparkSession=spark;

options = new HashMap();
options.put("header", "true");
options.put("delimiter", delimiter);
options.put("nullValue", nullValue);
DataFrameReader d2_DFR = spark.read().options(options);
this.dataset2 = 
d2_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);


So far, I have the header set to true.

Now, let us imagine that we need to do a Join between the two dataset:

Dataset dataset1_Join_dataset2 = dataset1.join(dataset2, “some condition”);

All the below process, Step1, Step2 and Step3, starts from 
dataset1_Join_dataset2. And, in particular, I realised that the steps

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =dataset1_Join_dataset2.toJavaRDD();

== Step 2: take the first row (I was thinking 

Re: Dataset and JavaRDD: how to eliminate the header.

2016-08-03 Thread Carlo . Allocca
One more:

it seems that the steps

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =dataset1_Join_dataset2.toJavaRDD();


and

  List someRows = dataPointsWithHeader.collect();
 someRows.forEach(System.out::println);

do not print the header.

So, Could I assume that any transformation I apply to a JavaRDD, the 
header is not considered by default?

Many thanks,
Best Regards,
Carlo


On 3 Aug 2016, at 18:58, Carlo.Allocca 
> wrote:

Thanks Mich.

Yes, I know both headers (categoryRankSchema, categorySchema ) as expressed 
below:

this.dataset1 = 
d1_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);

   this.dataset2 = d2_DFR.schema(categorySchema).csv(categoryFilePath);

Can you use filter to get rid of the header from both CSV files before joining 
them?
Well I can give a try. But my case is a bit more complex than joining two 
datasets. It joins data from at least six datasets which are processed 
separately (to clean, and extract the need info) and, only at the end I do join 
of three datasets for which I know the headers.

I do believe that there should be anther way to achieve my goal.

Any other suggestion would be very appreciated.

Many Thanks.
Best Regards,
carlo



On 3 Aug 2016, at 18:45, Mich Talebzadeh 
> wrote:

Do you know the headers?

Can you use filter to get rid of the header from both CSV files before joining 
them?



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 3 August 2016 at 18:32, Carlo.Allocca 
> wrote:
Hi Aseem,

Thank you very much for your help.

Please, allow me to be more specific for my case (to some extent I already do 
what you suggested):

Let us imagine that I two csv datasets d1 and d2. I generate the Dataset 
as in the following:

== Reading d1:

sparkSession=spark;

options = new HashMap();
options.put("header", "true");
options.put("delimiter", delimiter);
options.put("nullValue", nullValue);
DataFrameReader d1_DFR = spark.read().options(options);
this.dataset1 = 
d1_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);

== Reading d2

sparkSession=spark;

options = new HashMap();
options.put("header", "true");
options.put("delimiter", delimiter);
options.put("nullValue", nullValue);
DataFrameReader d2_DFR = spark.read().options(options);
this.dataset2 = 
d2_DFR.schema(categoryRankSchema).csv(categoryrankFilePath);


So far, I have the header set to true.

Now, let us imagine that we need to do a Join between the two dataset:

Dataset dataset1_Join_dataset2 = dataset1.join(dataset2, “some condition”);

All the below process, Step1, Step2 and Step3, starts from 
dataset1_Join_dataset2. And, in particular, I realised that the steps

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =dataset1_Join_dataset2.toJavaRDD();

== Step 2: take the first row (I was thinking that it was the header)
Row header= dataPointsWithHeader.first();

the header is not the first().

 So my question still is:

Is the an efficient way to access to the header and eliminate it ?

Many Thanks in advance for your support.

Best Regards,
Carlo




On 3 Aug 2016, at 18:13, Aseem Bansal 
> wrote:

Hi

Depending on how how you reading the data in the first place, can you  simply 
use the header as header instead of a row?

http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv(scala.collection.Seq)

See the header option

On Wed, Aug 3, 2016 at 10:14 PM, Carlo.Allocca 
> wrote:
Hi All,

I would like to apply a  regression to my data. One of the workflow is the 
prepare my data as a JavaRDD  starting from a Dataset with 
its header.  So, what I did was the following:

== Step 1: transform the Dataset  into JavaRDD
JavaRDD dataPointsWithHeader =modelDS.toJavaRDD();


== Step 2: take the first row (I was thinking that it was the header)
Row header= dataPointsWithHeader.first();

== Step 3: eliminate the row header by
JavaRDD dataPointsWithoutHeader = dataPointsWithHeader.filter((Row row) -> 
{
return !row.equals(header);
});

The issue with the above approach are:

a) the result of the 

Re: converting a Dataset into JavaRDD

2016-08-03 Thread Carlo . Allocca
problem solved.

The package org.apache.spark.api.java.function.Function was missing.

Thanks.
Carlo

On 3 Aug 2016, at 12:14, Carlo.Allocca 
> wrote:

Hi All,

I am trying to convert a Dataset into JavaRDD  in order to 
apply a linear regression.
I am using spark-core_2.10, version2.0.0 with Java 1.8.

My current approach is:

== Step 1:  convert the Dataset into JavaRDD
   JavaRDD dataPoints =modelDS.toJavaRDD();

== Step 2: convert  JavaRDD into JavaRDD
JavaRDD dataLabeledPoints = dataPoints.map(new Function() {
 @Override
public LabeledPoint call(Row line) throws Exception {
String lineAsString=line.toString();

String[] fields =lineAsString.split("\t");
LabeledPoint labeledPoint = new 
LabeledPoint(Integer.valueOf(fields[0]), 
Vectors.dense(Double.valueOf(fields[1]),Double.valueOf(fields[2]));
return labeledPoint;
}
});


The have got two compile time errors:




Please, I would like to ask  what I am doing wrong?
Any suggestion is very appreciated.

Thanks in advance.
Best,
Carlo




-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


converting a Dataset into JavaRDD

2016-08-03 Thread Carlo . Allocca
Hi All,

I am trying to convert a Dataset into JavaRDD  in order to 
apply a linear regression.
I am using spark-core_2.10, version2.0.0 with Java 1.8.

My current approach is:

== Step 1:  convert the Dataset into JavaRDD
   JavaRDD dataPoints =modelDS.toJavaRDD();

== Step 2: convert  JavaRDD into JavaRDD
JavaRDD dataLabeledPoints = dataPoints.map(new Function() {
 @Override
public LabeledPoint call(Row line) throws Exception {
String lineAsString=line.toString();

String[] fields =lineAsString.split("\t");
LabeledPoint labeledPoint = new 
LabeledPoint(Integer.valueOf(fields[0]), 
Vectors.dense(Double.valueOf(fields[1]),Double.valueOf(fields[2]));
return labeledPoint;
}
});


The have got two compile time errors:

[cid:0438E5AE-B1BC-40D1-834B-110AB2519989@open.ac.uk][cid:1F89BE89-B3A5-43D2-87DB-86B2CF463A2E@open.ac.uk]


Please, I would like to ask  what I am doing wrong?
Any suggestion is very appreciated.

Thanks in advance.
Best,
Carlo



-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
Solved!!
The solution is using date_format with the “u” option.

Thank you very much.
Best,
Carlo

On 28 Jul 2016, at 18:59, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Hi Mark,

Thanks for the suggestion.
I changed the maven entries as follows

spark-core_2.10
2.0.0

and
spark-sql_2.10
2.0.0

As result, it worked when I removed the following line of code to compute 
DAYOFWEEK (Monday—>1 etc.):

Dataset tmp6=tmp5.withColumn("ORD_DAYOFWEEK", callUDF("computeDayOfWeek", 
tmp5.col("ORD_time_window_per_hour#3").getItem("start").cast(DataTypes.StringType)));

 this.spark.udf().register("computeDayOfWeek", new UDF1<String,Integer>() {
@Override
  public Integer call(String myDate) throws Exception {
Date date = new SimpleDateFormat(dateFormat).parse(myDate);
Calendar c = Calendar.getInstance();
c.setTime(date);
int dayOfWeek = c.get(Calendar.DAY_OF_WEEK);
  return dayOfWeek;//myDate.length();
}
  }, DataTypes.IntegerType);



And the full stack is reported below.

Is there another way to compute DAYOFWEEK from a dateFormat="-MM-dd 
HH:mm:ss" by using built-in function? I have  checked date_format but it does 
not do it.

Any Suggestion?

Many Thanks,
Carlo




Test set: org.mksmart.amaretto.ml.DatasetPerHourVerOneTest
---
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 32.658 sec <<< 
FAILURE!
testBuildDatasetNew(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 32.581 sec  <<< ERROR!
org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:128)
at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at 
org.mksmart.amaretto.ml.DatasetPerHourVerOneTest.testBuildDatasetNew(DatasetPerHourVerOneTest.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.Ru

Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
eys=[time_window_per_hour#3#739, asin#94], 
functions=[count(time_window_per_hour#3#739), count(qty_shipped#98), 
count(qty_ordered#97), min(item_price#99), max(item_price#99), 
avg(cast(item_price#99 as double)), stddev_pop(cast(item_price#99 as double))], 
output=[ORD_time_window_per_hour#3#748, ORD_asin#749, ORD_num_of_order#758L, 
ORD_tot_qty_shipped#761L, ORD_tot_qty_ordered#764L, ORD_min_ord_item_price#766, 
ORD_max_ord_item_price#768, ORD_avg_ord_item_price#770, 
ORD_std_ord_item_price#780, ORD_YEAR#781, ORD_MONTH#782, ORD_DAYOFMONTH#783, 
ORD_HOUR#784, ORD_DAYOFYEAR#785, ORD_WEEKOFYEAR#786, ORD_DAYOFWEEK#787])
+- Exchange hashpartitioning(time_window_per_hour#3#739, asin#94, 200)
   +- *HashAggregate(keys=[time_window_per_hour#3#739, asin#94], 
functions=[partial_count(time_window_per_hour#3#739), 
partial_count(qty_shipped#98), partial_count(qty_ordered#97), 
partial_min(item_price#99), partial_max(item_price#99), 
partial_avg(cast(item_price#99 as double)), 
partial_stddev_pop(cast(item_price#99 as double))], 
output=[time_window_per_hour#3#739, asin#94, count#846L, count#847L, 
count#848L, min#849, max#850, sum#851, count#852L, n#833, avg#834, m2#835])
  +- *Project [window#746 AS time_window_per_hour#3#739, asin#94, 
qty_shipped#98, item_price#99, qty_ordered#97]
 +- *Filter ((isnotnull(purchased#102) && (cast(purchased#102 as 
timestamp) >= window#746.start)) && (cast(purchased#102 as timestamp) < 
window#746.end))
+- *Expand [List(named_struct(start, 
CEIL((cast((precisetimestamp(cast(purchased#102 as timestamp)) - 0) as 
double) / 3.6E9)) + 0) - 1) * 36) + 0), end, 
(CEIL((cast((precisetimestamp(cast(purchased#102 as timestamp)) - 0) as 
double) / 3.6E9)) + 0) - 1) * 36) + 0) + 36)), asin#94, 
qty_ordered#97, qty_shipped#98, item_price#99, purchased#102), 
List(named_struct(start, CEIL((cast((precisetimestamp(cast(purchased#102 as 
timestamp)) - 0) as double) / 3.6E9)) + 1) - 1) * 36) + 0), end, 
(CEIL((cast((precisetimestamp(cast(purchased#102 as timestamp)) - 0) as 
double) / 3.6E9)) + 1) - 1) * 36) + 0) + 36)), asin#94, 
qty_ordered#97, qty_shipped#98, item_price#99, purchased#102)], [window#746, 
asin#94, qty_ordered#97, qty_shipped#98, item_price#99, purchased#102]
   +- *Project [asin#94, qty_ordered#97, qty_shipped#98, 
item_price#99, purchased#102]
  +- *Filter (((isnotnull(grade#96) && 
isnotnull(item_price#99)) && (grade#96 = New Item)) && isnotnull(purchased#102))
 +- *Scan csv 
[asin#94,grade#96,qty_ordered#97,qty_shipped#98,item_price#99,purchased#102] 
Format: CSV, InputPaths: 
file:/Users/carloallocca/Desktop/NSLDataset/20160706/order.csv, PushedFilters: 
[IsNotNull(grade), IsNotNull(item_price), EqualTo(grade,New Item), 
IsNotNull(purchased)], ReadSchema: 
struct<asin:string,grade:string,qty_ordered:int,qty_shipped:string,item_price:float,purchased:str...
)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 6)
- field (class: 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: 
references$1, type: class [Ljava.lang.Object;)
- object (class 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, )
at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 61 more






On 28 Jul 2016, at 17:39, Mark Hamstra 
<m...@clearstorydata.com<mailto:m...@clearstorydata.com>> wrote:

Don't use Spark 2.0.0-preview.  That was a preview release with known issues, 
and was intended to be used only for early, pre-release testing purpose.  Spark 
2.0.0 is now released, and you should be using that.

On Thu, Jul 28, 2016 at 3:48 AM, Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>> wrote:
and, of course I am using

 
org.apache.spark
spark-core_2.11
2.0.0-preview




org.apache.spark
spark-sql_2.11
2.0.0-preview
jar



Is the below problem/issue related to the experimental version of SPARK 2.0.0.

Many Thanks for your help and support.

Best Regards,
carlo

On 28 Jul 2016, at 11:14, Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>> wrote:

I have also found the following two related links:

1) 
https://github.com/apache/spark/commit/947b9020b0d621bc97661a0a056297e6889936d3
2) https://github.com/apache/spark/pull/12433

which both explain why it happens but nothing about what to do to solve it.

Do you have any suggestion/recommendation?

Many thanks.
Carlo

Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
and, of course I am using

 
org.apache.spark
spark-core_2.11
2.0.0-preview




org.apache.spark
spark-sql_2.11
2.0.0-preview
jar



Is the below problem/issue related to the experimental version of SPARK 2.0.0.

Many Thanks for your help and support.

Best Regards,
carlo

On 28 Jul 2016, at 11:14, Carlo.Allocca 
<carlo.allo...@open.ac.uk<mailto:carlo.allo...@open.ac.uk>> wrote:

I have also found the following two related links:

1) 
https://github.com/apache/spark/commit/947b9020b0d621bc97661a0a056297e6889936d3
2) https://github.com/apache/spark/pull/12433

which both explain why it happens but nothing about what to do to solve it.

Do you have any suggestion/recommendation?

Many thanks.
Carlo

On 28 Jul 2016, at 11:06, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Hi Rui,

Thanks for the promptly reply.
No, I am not using Mesos.

Ok. I am writing a code to build a suitable dataset for my needs as in the 
following:

== Session configuration:

 SparkSession spark = SparkSession
.builder()
.master("local[6]") //
.appName("DatasetForCaseNew")
.config("spark.executor.memory", "4g")
.config("spark.shuffle.blockTransferService", "nio")
.getOrCreate();


public Dataset buildDataset(){
...

// STEP A
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS
= res1
  .join(res2, 
(res1.col("PRD_asin#100")).equalTo(res2.col("CMP_asin")), "inner");

prdDS_Join_cmpDS.take(1);

// STEP B
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS_Join
= prdDS_Join_cmpDS
  .join(res3, 
prdDS_Join_cmpDS.col("PRD_asin#100").equalTo(res3.col("ORD_asin")), "inner");
prdDS_Join_cmpDS_Join.take(1);
prdDS_Join_cmpDS_Join.show();

}


The exception is thrown when the computation reach the STEP B, until STEP A is 
fine.

Is there anything wrong or missing?

Thanks for your help in advance.

Best Regards,
Carlo





=== STACK TRACE

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 422.102 sec <<< 
FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 421.994 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.consume(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.doProduce(SortMergeJoinExec.scala:565)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.produce(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenEx

Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
I have also found the following two related links:

1) 
https://github.com/apache/spark/commit/947b9020b0d621bc97661a0a056297e6889936d3
2) https://github.com/apache/spark/pull/12433

which both explain why it happens but nothing about what to do to solve it.

Do you have any suggestion/recommendation?

Many thanks.
Carlo

On 28 Jul 2016, at 11:06, carlo allocca 
<ca6...@open.ac.uk<mailto:ca6...@open.ac.uk>> wrote:

Hi Rui,

Thanks for the promptly reply.
No, I am not using Mesos.

Ok. I am writing a code to build a suitable dataset for my needs as in the 
following:

== Session configuration:

 SparkSession spark = SparkSession
.builder()
.master("local[6]") //
.appName("DatasetForCaseNew")
.config("spark.executor.memory", "4g")
.config("spark.shuffle.blockTransferService", "nio")
.getOrCreate();


public Dataset buildDataset(){
...

// STEP A
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS
= res1
  .join(res2, 
(res1.col("PRD_asin#100")).equalTo(res2.col("CMP_asin")), "inner");

prdDS_Join_cmpDS.take(1);

// STEP B
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS_Join
= prdDS_Join_cmpDS
  .join(res3, 
prdDS_Join_cmpDS.col("PRD_asin#100").equalTo(res3.col("ORD_asin")), "inner");
prdDS_Join_cmpDS_Join.take(1);
prdDS_Join_cmpDS_Join.show();

}


The exception is thrown when the computation reach the STEP B, until STEP A is 
fine.

Is there anything wrong or missing?

Thanks for your help in advance.

Best Regards,
Carlo





=== STACK TRACE

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 422.102 sec <<< 
FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 421.994 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.consume(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.doProduce(SortMergeJoinExec.scala:565)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.produce(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCod

Re: SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
Hi Rui,

Thanks for the promptly reply.
No, I am not using Mesos.

Ok. I am writing a code to build a suitable dataset for my needs as in the 
following:

== Session configuration:

 SparkSession spark = SparkSession
.builder()
.master("local[6]") //
.appName("DatasetForCaseNew")
.config("spark.executor.memory", "4g")
.config("spark.shuffle.blockTransferService", "nio")
.getOrCreate();


public Dataset buildDataset(){
...

// STEP A
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS
= res1
  .join(res2, 
(res1.col("PRD_asin#100")).equalTo(res2.col("CMP_asin")), "inner");

prdDS_Join_cmpDS.take(1);

// STEP B
// Join prdDS with cmpDS
Dataset prdDS_Join_cmpDS_Join
= prdDS_Join_cmpDS
  .join(res3, 
prdDS_Join_cmpDS.col("PRD_asin#100").equalTo(res3.col("ORD_asin")), "inner");
prdDS_Join_cmpDS_Join.take(1);
prdDS_Join_cmpDS_Join.show();

}


The exception is thrown when the computation reach the STEP B, until STEP A is 
fine.

Is there anything wrong or missing?

Thanks for your help in advance.

Best Regards,
Carlo





=== STACK TRACE

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 422.102 sec <<< 
FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 421.994 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.consume(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.doProduce(SortMergeJoinExec.scala:565)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec.produce(SortMergeJoinExec.scala:35)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 

SPARK Exception thrown in awaitResult

2016-07-28 Thread Carlo . Allocca
Hi All,

I am running SPARK locally, and when running d3=join(d1,d2) and d5=(d3, d4) am 
getting  the following exception "org.apache.spark.SparkException: Exception 
thrown in awaitResult”.
Googling for it, I found that the closed is the answer reported 
https://issues.apache.org/jira/browse/SPARK-16522 which mention that it is bug 
of the SPARK 2.0.0.

Is it correct or am I missing anything?

Many Thanks for your answer and help.

Best Regards,
Carlo

-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.


SPARK UDF related issue

2016-07-25 Thread Carlo . Allocca
Hi All, 

I am using SPARK 2.0 and I have got the following issue: 

I am able to run the step 1-5 (see below) but not the step 6 which uses an UDF. 
Actually, the step 1-5 takes few second and the step 6 looks like that it never 
ends.

Is there anything wrong?  how should I address it?

Any suggestion/feedback would be very appreciated. 

Many Thanks in advance for your help. 

Best regards, 
Carlo


 

== Code 


=== STEP 1
SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName("DatasetForCaseNew")
.config("spark.executor.memory", "3g")
.getOrCreate();


=== STEP 2
this.spark.udf().register("computeBindingValue", new 
UDF1() {
@Override
public Integer call(String newBindingValue) throws Exception {  
  
if(newBindingValue.contains("Paperback")) return 1;
return 2;
}
}, DataTypes.IntegerType);

=== STEP 3
Dataset cmptDS_TMP = cmptDS
.select(window(cmptDS.col("created"), "1 
hour").as("PRD_TimeWindow#1"),
cmptDS.col("asin").as("PRD_asin#1"),
cmptDS.col("sale_rank").as("PRD_global_sale_rank")
);

=== STEP 4
Dataset resultProd = prdDS
.select(
prdDS.col("asin").alias("PRD_asin#300"),
prdDS.col("rppprice").alias("PRD_rppprice"),
prdDS.col("binding").alias("PRD_binding")

).distinct().sort("PRD_asin#300");

=== STEP 5
Dataset cmptDS_TMP_join_resultProd=cmptDS_TMP
.join(resultProd, 
cmptDS_TMP.col("PRD_asin#1").equalTo(resultProd.col("PRD_asin#300")), "inner"); 
  
cmptDS_TMP_join_resultProd.show();

=== STEP 6
   Dataset prodWithBindingValue = 
cmptDS_TMP_join_resultProd.withColumn("PRD_bindingValue",
callUDF("computeBindingValue", 
cmptDS_TMP_join_resultProd.col("PRD_binding")));
prodWithBindingValue.show();



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SPARK SQL and join pipeline issue

2016-07-25 Thread Carlo . Allocca
Dear All,

I have the following question:

I am using SPARK SQL 2.0 version and, in particular I am doing some joins in 
pipeline of the following pattern (d3 = d1 join d2, d4=d5 join d6, d7=d3 join 
d4).

When running my code, I realised that the building of d7 generates an issue as 
reported below.

The typical code that I am using for building a join is:

// We make a join of the first two above
   Dataset d3 = d1
   .join(d2,
   (d2.col("PRD_asin#2").equalTo(d1.col("PRD_asin#1")), 
"inner");

Is there something that I am doing wrong?

Please, any help would be very appreciated.

Thank you in advance.

Best Regards,
Carlo




 ISSUE TRACK:

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 319.666 sec <<< 
FAILURE!
testBuildDataset(org.mksmart.amaretto.ml.DatasetPerHourVerOneTest)  Time 
elapsed: 319.265 sec  <<< ERROR!
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at org.apache.spark.sql.execution.ExpandExec.consume(ExpandExec.scala:36)
at org.apache.spark.sql.execution.ExpandExec.doConsume(ExpandExec.scala:198)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
at 
org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:146)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:211)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:146)
at 
org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at