Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-31 Thread James Baker
I think that makes sense. I didn't understand backcompat was the primary 
driver. I actually don't care right now about aggregations on the datasource 
I'm integrating with - I just care about receiving all the filters (and ideally 
also the desired sort order) at the same time. I am mostly fine with anything 
else; but getting filters at the same time is important for me, and doesn't 
seem overly contentious? (e.g. it's compatible with datasources v1). Ideally 
also getting sort orders _after_ getting filters.

That said, an unstable api that gets me the query plan would be appreciated by 
plenty I'm sure :) (and would make my implementation more straightforward - the 
state management is painful atm).

James

On Wed, 30 Aug 2017 at 14:56 Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Sure that's good to do (and as discussed earlier a good compromise might be to 
expose an interface for the source to decide which part of the logical plan 
they want to accept).

To me everything is about cost vs benefit.

In my mind, the biggest issue with the existing data source API is backward and 
forward compatibility. All the data sources written for Spark 1.x broke in 
Spark 2.x. And that's one of the biggest value v2 can bring. To me it's far 
more important to have data sources implemented in 2017 to be able to work in 
2027, in Spark 10.x.

You are basically arguing for creating a new API that is capable of doing 
arbitrary expression, aggregation, and join pushdowns (you only mentioned 
aggregation so far, but I've talked to enough database people that I know once 
Spark gives them aggregation pushdown, they will come back for join pushdown). 
We can do that using unstable APIs, and creating stable APIs would be extremely 
difficult (still doable, just would take a long time to design and implement). 
As mentioned earlier, it basically involves creating a stable representation 
for all of logical plan, which is a lot of work. I think we should still work 
towards that (for other reasons as well), but I'd consider that out of scope 
for the current one. Otherwise we'd not release something probably for the next 
2 or 3 years.





On Wed, Aug 30, 2017 at 11:50 PM, James Baker 
<j.ba...@outlook.com<mailto:j.ba...@outlook.com>> wrote:
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-31 Thread James Baker
I think that makes sense. I didn't understand backcompat was the primary 
driver. I actually don't care right now about aggregations on the datasource 
I'm integrating with - I just care about receiving all the filters (and ideally 
also the desired sort order) at the same time. I am mostly fine with anything 
else; but getting filters at the same time is important for me, and doesn't 
seem overly contentious? (e.g. it's compatible with datasources v1). Ideally 
also getting sort orders _after_ getting filters.

That said, an unstable api that gets me the query plan would be appreciated by 
plenty I'm sure :) (and would make my implementation more straightforward - the 
state management is painful atm).

James

On Wed, 30 Aug 2017 at 14:56 Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Sure that's good to do (and as discussed earlier a good compromise might be to 
expose an interface for the source to decide which part of the logical plan 
they want to accept).

To me everything is about cost vs benefit.

In my mind, the biggest issue with the existing data source API is backward and 
forward compatibility. All the data sources written for Spark 1.x broke in 
Spark 2.x. And that's one of the biggest value v2 can bring. To me it's far 
more important to have data sources implemented in 2017 to be able to work in 
2027, in Spark 10.x.

You are basically arguing for creating a new API that is capable of doing 
arbitrary expression, aggregation, and join pushdowns (you only mentioned 
aggregation so far, but I've talked to enough database people that I know once 
Spark gives them aggregation pushdown, they will come back for join pushdown). 
We can do that using unstable APIs, and creating stable APIs would be extremely 
difficult (still doable, just would take a long time to design and implement). 
As mentioned earlier, it basically involves creating a stable representation 
for all of logical plan, which is a lot of work. I think we should still work 
towards that (for other reasons as well), but I'd consider that out of scope 
for the current one. Otherwise we'd not release something probably for the next 
2 or 3 years.





On Wed, Aug 30, 2017 at 11:50 PM, James Baker 
<j.ba...@outlook.com<mailto:j.ba...@outlook.com>> wrote:
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-31 Thread James Baker
I think that makes sense. I didn't understand backcompat was the primary 
driver. I actually don't care right now about aggregations on the datasource 
I'm integrating with - I just care about receiving all the filters (and ideally 
also the desired sort order) at the same time. I am mostly fine with anything 
else; but getting filters at the same time is important for me, and doesn't 
seem overly contentious? (e.g. it's compatible with datasources v1). Ideally 
also getting sort orders _after_ getting filters.

That said, an unstable api that gets me the query plan would be appreciated by 
plenty I'm sure :) (and would make my implementation more straightforward - the 
state management is painful atm).

James

On Wed, 30 Aug 2017 at 14:56 Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Sure that's good to do (and as discussed earlier a good compromise might be to 
expose an interface for the source to decide which part of the logical plan 
they want to accept).

To me everything is about cost vs benefit.

In my mind, the biggest issue with the existing data source API is backward and 
forward compatibility. All the data sources written for Spark 1.x broke in 
Spark 2.x. And that's one of the biggest value v2 can bring. To me it's far 
more important to have data sources implemented in 2017 to be able to work in 
2027, in Spark 10.x.

You are basically arguing for creating a new API that is capable of doing 
arbitrary expression, aggregation, and join pushdowns (you only mentioned 
aggregation so far, but I've talked to enough database people that I know once 
Spark gives them aggregation pushdown, they will come back for join pushdown). 
We can do that using unstable APIs, and creating stable APIs would be extremely 
difficult (still doable, just would take a long time to design and implement). 
As mentioned earlier, it basically involves creating a stable representation 
for all of logical plan, which is a lot of work. I think we should still work 
towards that (for other reasons as well), but I'd consider that out of scope 
for the current one. Otherwise we'd not release something probably for the next 
2 or 3 years.





On Wed, Aug 30, 2017 at 11:50 PM, James Baker 
<j.ba...@outlook.com<mailto:j.ba...@outlook.com>> wrote:
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-30 Thread James Baker
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so 
thanks for starting this one… but there’s still discussion happening on the 
prototype API, which it hasn’t been updated. I’d like to see the proposal 
shaped by the ongoing discussion so that we have a better, more concrete plan. 
I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals 
should be separated. The PR<https://github.com/cloud-fan/spark/pull/10> 
currently has “write path” listed as a TODO item and most of the discussion 
I’ve seen is on the read side. I think it would be better to separate the read 
and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the 
proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source 
API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider 
ways to fix that problem instead of carrying the problem forward to Data Source 
V2. We can solve this by adding a high-level API for DDL and a better 
write/insert API that works well with it. Clearly, that discussion is 
independent of the read path, which is why I think separating the two proposals 
would be a win.

rb

​

On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It 
would be a completely different interface.

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
OK I agree with it, how about we add a new interface to push down the query 
plan, based on the current framework? We can mark the query-plan-push-down 
interface as unstable, to save the effort of designing a stable representation 
of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker 
<j.ba...@o

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-30 Thread James Baker
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so 
thanks for starting this one… but there’s still discussion happening on the 
prototype API, which it hasn’t been updated. I’d like to see the proposal 
shaped by the ongoing discussion so that we have a better, more concrete plan. 
I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals 
should be separated. The PR<https://github.com/cloud-fan/spark/pull/10> 
currently has “write path” listed as a TODO item and most of the discussion 
I’ve seen is on the read side. I think it would be better to separate the read 
and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the 
proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source 
API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider 
ways to fix that problem instead of carrying the problem forward to Data Source 
V2. We can solve this by adding a high-level API for DDL and a better 
write/insert API that works well with it. Clearly, that discussion is 
independent of the read path, which is why I think separating the two proposals 
would be a win.

rb

​

On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It 
would be a completely different interface.

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
OK I agree with it, how about we add a new interface to push down the query 
plan, based on the current framework? We can mark the query-plan-push-down 
interface as unstable, to save the effort of designing a stable representation 
of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker 
<j.ba...@o

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-30 Thread James Baker
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so 
thanks for starting this one… but there’s still discussion happening on the 
prototype API, which it hasn’t been updated. I’d like to see the proposal 
shaped by the ongoing discussion so that we have a better, more concrete plan. 
I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals 
should be separated. The PR<https://github.com/cloud-fan/spark/pull/10> 
currently has “write path” listed as a TODO item and most of the discussion 
I’ve seen is on the read side. I think it would be better to separate the read 
and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the 
proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source 
API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider 
ways to fix that problem instead of carrying the problem forward to Data Source 
V2. We can solve this by adding a high-level API for DDL and a better 
write/insert API that works well with it. Clearly, that discussion is 
independent of the read path, which is why I think separating the two proposals 
would be a win.

rb

​

On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It 
would be a completely different interface.

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
OK I agree with it, how about we add a new interface to push down the query 
plan, based on the current framework? We can mark the query-plan-push-down 
interface as unstable, to save the effort of designing a stable representation 
of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker 
<j.ba...@o

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-30 Thread James Baker
I guess I was more suggesting that by coding up the powerful mode as the API, 
it becomes easy for someone to layer an easy mode beneath it to enable simpler 
datasources to be integrated (and that simple mode should be the out of scope 
thing).

Taking a small step back here, one of the places where I think I'm missing some 
context is in understanding the target consumers of these interfaces. I've done 
some amount (though likely not enough) of research about the places where 
people have had issues of API surface in the past - the concrete tickets I've 
seen have been based on Cassandra integration where you want to indicate 
clustering, and SAP HANA where they want to push down more complicated queries 
through Spark. This proposal supports the former, but the amount of change 
required to support clustering in the current API is not obviously high - 
whilst the current proposal for V2 seems to make it very difficult to add 
support for pushing down plenty of aggregations in the future (I've found the 
question of how to add GROUP BY to be pretty tricky to answer for the current 
proposal).

Googling around for implementations of the current PrunedFilteredScan, I 
basically find a lot of databases, which seems reasonable - SAP HANA, 
ElasticSearch, Solr, MongoDB, Apache Phoenix, etc. I've talked to people who've 
used (some of) these connectors and the sticking point has generally been that 
Spark needs to load a lot of data out in order to solve aggregations that can 
be very efficiently pushed down into the datasources.

So, with this proposal it appears that we're optimising towards making it easy 
to write one-off datasource integrations, with some amount of pluggability for 
people who want to do more complicated things (the most interesting being 
bucketing integration). However, my guess is that this isn't what the current 
major integrations suffer from; they suffer mostly from restrictions in what 
they can push down (which broadly speaking are not going to go away).

So the place where I'm confused is that the current integrations can be made 
incrementally better as a consequence of this, but the backing data systems 
have the features which enable a step change which this API makes harder to 
achieve in the future. Who are the group of users who benefit the most as a 
consequence of this change, like, who is the target consumer here? My personal 
slant is that it's more important to improve support for other datastores than 
it is to lower the barrier of entry - this is why I've been pushing here.

James

On Wed, 30 Aug 2017 at 09:37 Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:

-1 (non-binding)

Sometimes it takes a VOTE thread to get people to actually read and comment, so 
thanks for starting this one… but there’s still discussion happening on the 
prototype API, which it hasn’t been updated. I’d like to see the proposal 
shaped by the ongoing discussion so that we have a better, more concrete plan. 
I think that’s going to produces a better SPIP.

The second reason for -1 is that I think the read- and write-side proposals 
should be separated. The PR<https://github.com/cloud-fan/spark/pull/10> 
currently has “write path” listed as a TODO item and most of the discussion 
I’ve seen is on the read side. I think it would be better to separate the read 
and write APIs so we can focus on them individually.

An example of why we should focus on the write path separately is that the 
proposal says this:

Ideally partitioning/bucketing concept should not be exposed in the Data Source 
API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2 . . .

Essentially, the some APIs mix DDL and DML operations. I’d like to consider 
ways to fix that problem instead of carrying the problem forward to Data Source 
V2. We can solve this by adding a high-level API for DDL and a better 
write/insert API that works well with it. Clearly, that discussion is 
independent of the read path, which is why I think separating the two proposals 
would be a win.

rb

​

On Wed, Aug 30, 2017 at 4:28 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
That might be good to do, but seems like orthogonal to this effort itself. It 
would be a completely different interface.

On Wed, Aug 30, 2017 at 1:10 PM Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
OK I agree with it, how about we add a new interface to push down the query 
plan, based on the current framework? We can mark the query-plan-push-down 
interface as unstable, to save the effort of designing a stable representation 
of query plan and maintaining forward compatibility.

On Wed, Aug 30, 2017 at 10:53 AM, James Baker 
<j.ba...@o

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-29 Thread James Baker
I'll just focus on the one-by-one thing for now - it's the thing that blocks me 
the most.

I think the place where we're most confused here is on the cost of determining 
whether I can push down a filter. For me, in order to work out whether I can 
push down a filter or satisfy a sort, I might have to read plenty of data. That 
said, it's worth me doing this because I can use this information to avoid 
reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we 
stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is 
common). So, it's likely not going to work to pass us all the combinations. 
That said, if I can tell you a cost, I know what optimal looks like, why can't 
I just pick that myself?

The current design is friendly to simple datasources, but does not have the 
potential to support this.

So the main problem we have with datasources v1 is that it's essentially 
impossible to leverage a bunch of Spark features - I don't get to use bucketing 
or row batches or all the nice things that I really want to use to get decent 
performance. Provided I can leverage these in a moderately supported way which 
won't break in any given commit, I'll be pretty happy with anything that lets 
me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated 
use cases, you end up being able to write simple mode in terms of it very 
easily. So we could actually provide two APIs, one that lets people who have 
more interesting datasources leverage the cool Spark features, and one that 
lets people who just want to implement basic features do that - I'd try to 
include some kind of layering here. I could probably sketch out something here 
if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to 
make a tradeoff here.

> Explicitly here, what I'm looking for is a convenient mechanism to accept a 
> fully specified set of arguments

The problem with this approach is: 1) if we wanna add more arguments in the 
future, it's really hard to do without changing the existing interface. 2) if a 
user wants to implement a very simple data source, he has to look at all the 
arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


> There are loads of cases like this - you can imagine someone being able to 
> push down a sort before a filter is applied, but not afterwards. However, 
> maybe the filter is so selective that it's better to push down the filter and 
> not handle the sort. I don't get to make this decision, Spark does (but 
> doesn't have good enough information to do it properly, whilst I do). I want 
> to be able to choose the parts I push down given knowledge of my datasource - 
> as defined the APIs don't let me do that, they're strictly more restrictive 
> than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, 
incrementally. If a data source wanna go back to accept a sort push down after 
it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do 
have a fully specified set of arguments waiting to be pushed down, but Spark 
doesn't know which is the best order to push them into data source. Spark can 
try every combination and ask the data source to report a cost, then Spark can 
pick the best combination with the lowest cost. This can also be implemented as 
a cost report interface, so that advanced data source can implement it for 
optimal performance, and simple data source doesn't need to care about it and 
keep simple.


The current design is very friendly to simple data source, and has the 
potential to support complex data source, I prefer the current design over the 
plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker 
<j.ba...@outlook.com<mailto:j.ba...@outlook.com>> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty 
intractable, it restricts the modifications that you can do in the future too 
much. That said, it shouldn't be as hard if you restrict yourself to the parts 
of the plan which are supported by the datasources V2 API (which after all, 
need to be translateable properly into the future to support the mixins 
proposed). This should have a pretty small scope in comparison. As long as the 
user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable 
query plan.

Explicitly here, what I'm looking for is a convenient mechani

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-29 Thread James Baker
I'll just focus on the one-by-one thing for now - it's the thing that blocks me 
the most.

I think the place where we're most confused here is on the cost of determining 
whether I can push down a filter. For me, in order to work out whether I can 
push down a filter or satisfy a sort, I might have to read plenty of data. That 
said, it's worth me doing this because I can use this information to avoid 
reading >>that much data.

If you give me all the orderings, I will have to read that data many times (we 
stream it to avoid keeping it in memory).

There's also a thing where our typical use cases have many filters (20+ is 
common). So, it's likely not going to work to pass us all the combinations. 
That said, if I can tell you a cost, I know what optimal looks like, why can't 
I just pick that myself?

The current design is friendly to simple datasources, but does not have the 
potential to support this.

So the main problem we have with datasources v1 is that it's essentially 
impossible to leverage a bunch of Spark features - I don't get to use bucketing 
or row batches or all the nice things that I really want to use to get decent 
performance. Provided I can leverage these in a moderately supported way which 
won't break in any given commit, I'll be pretty happy with anything that lets 
me opt out of the restrictions.

My suggestion here is that if you make a mode which works well for complicated 
use cases, you end up being able to write simple mode in terms of it very 
easily. So we could actually provide two APIs, one that lets people who have 
more interesting datasources leverage the cool Spark features, and one that 
lets people who just want to implement basic features do that - I'd try to 
include some kind of layering here. I could probably sketch out something here 
if that'd be useful?

James

On Tue, 29 Aug 2017 at 18:59 Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
Hi James,

Thanks for your feedback! I think your concerns are all valid, but we need to 
make a tradeoff here.

> Explicitly here, what I'm looking for is a convenient mechanism to accept a 
> fully specified set of arguments

The problem with this approach is: 1) if we wanna add more arguments in the 
future, it's really hard to do without changing the existing interface. 2) if a 
user wants to implement a very simple data source, he has to look at all the 
arguments and understand them, which may be a burden for him.
I don't have a solution to solve these 2 problems, comments are welcome.


> There are loads of cases like this - you can imagine someone being able to 
> push down a sort before a filter is applied, but not afterwards. However, 
> maybe the filter is so selective that it's better to push down the filter and 
> not handle the sort. I don't get to make this decision, Spark does (but 
> doesn't have good enough information to do it properly, whilst I do). I want 
> to be able to choose the parts I push down given knowledge of my datasource - 
> as defined the APIs don't let me do that, they're strictly more restrictive 
> than the V1 APIs in this way.

This is true, the current framework applies push downs one by one, 
incrementally. If a data source wanna go back to accept a sort push down after 
it accepts a filter push down, it's impossible with the current data source V2.
Fortunately, we have a solution for this problem. At Spark side, actually we do 
have a fully specified set of arguments waiting to be pushed down, but Spark 
doesn't know which is the best order to push them into data source. Spark can 
try every combination and ask the data source to report a cost, then Spark can 
pick the best combination with the lowest cost. This can also be implemented as 
a cost report interface, so that advanced data source can implement it for 
optimal performance, and simple data source doesn't need to care about it and 
keep simple.


The current design is very friendly to simple data source, and has the 
potential to support complex data source, I prefer the current design over the 
plan push down one. What do you think?


On Wed, Aug 30, 2017 at 5:53 AM, James Baker 
<j.ba...@outlook.com<mailto:j.ba...@outlook.com>> wrote:
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty 
intractable, it restricts the modifications that you can do in the future too 
much. That said, it shouldn't be as hard if you restrict yourself to the parts 
of the plan which are supported by the datasources V2 API (which after all, 
need to be translateable properly into the future to support the mixins 
proposed). This should have a pretty small scope in comparison. As long as the 
user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable 
query plan.

Explicitly here, what I'm looking for is a convenient mechani

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-29 Thread James Baker
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty 
intractable, it restricts the modifications that you can do in the future too 
much. That said, it shouldn't be as hard if you restrict yourself to the parts 
of the plan which are supported by the datasources V2 API (which after all, 
need to be translateable properly into the future to support the mixins 
proposed). This should have a pretty small scope in comparison. As long as the 
user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable 
query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a 
fully specified set of arguments (of which I can choose to ignore some), and 
return the information as to which of them I'm ignoring. Taking a query plan of 
sorts is a way of doing this which IMO is intuitive to the user. It also 
provides a convenient location to plug in things like stats. Not at all married 
to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my 
understanding is that replacing isolated nodes in a query plan is easy. That 
said, our goal here is to be able to push down as much as possible into the 
underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, 
you end up having to reject pushdowns and filters that you actually can do, 
which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 
1, but otherwise will force a sort in Spark. On the PR I detail a case I see 
where I can push down two equality filters iff I am given them at the same 
time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push 
down a sort before a filter is applied, but not afterwards. However, maybe the 
filter is so selective that it's better to push down the filter and not handle 
the sort. I don't get to make this decision, Spark does (but doesn't have good 
enough information to do it properly, whilst I do). I want to be able to choose 
the parts I push down given knowledge of my datasource - as defined the APIs 
don't let me do that, they're strictly more restrictive than the V1 APIs in 
this way.

The pattern of not considering things that can be done in bulk bites us in 
other ways. The retrieval methods end up being trickier to implement than is 
necessary because frequently a single operation provides the result of many of 
the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly 
the same as the work I need to do to buildScan, so I want to cache it. This 
means that I end up with code that looks like:

public final class CachingFoo implements Foo {
private final Foo delegate;

private List currentFilters = emptyList();
private Supplier barSupplier = newSupplier(currentFilters);

public CachingFoo(Foo delegate) {
this.delegate = delegate;
}

private Supplier newSupplier(List filters) {
return Suppliers.memoize(() -> delegate.computeBar(filters));
}

@Override
public Bar computeBar(List filters) {
if (!filters.equals(currentFilters)) {
currentFilters = filters;
barSupplier = newSupplier(filters);
}

return barSupplier.get();
}
}

which caches the result required in unhandledFilters on the expectation that 
Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new 
APIs. As one example here, the state I will need in order to compute accurate 
column stats internally will likely be a subset of the work required in order 
to get the read tasks, tell you if I can handle filters, etc, so I'll want to 
cache them for reuse. However, the cached information needs to be appropriately 
invalidated when I add a new filter or sort order or limit, and this makes 
implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark 
calls the methods on your datasource (ideally this contract could be implied by 
the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between 
expressiveness and API simplicity, compatibility and evolvability. For the max 
expressiveness, we'd want the ability to expose full query plans, and let the 
data source decide which part of the query plan can be pushed down.

The downsid

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-29 Thread James Baker
Yeah, for sure.

With the stable representation - agree that in the general case this is pretty 
intractable, it restricts the modifications that you can do in the future too 
much. That said, it shouldn't be as hard if you restrict yourself to the parts 
of the plan which are supported by the datasources V2 API (which after all, 
need to be translateable properly into the future to support the mixins 
proposed). This should have a pretty small scope in comparison. As long as the 
user can bail out of nodes they don't understand, they should be ok, right?

That said, what would also be fine for us is a place to plug into an unstable 
query plan.

Explicitly here, what I'm looking for is a convenient mechanism to accept a 
fully specified set of arguments (of which I can choose to ignore some), and 
return the information as to which of them I'm ignoring. Taking a query plan of 
sorts is a way of doing this which IMO is intuitive to the user. It also 
provides a convenient location to plug in things like stats. Not at all married 
to the idea of using a query plan here; it just seemed convenient.

Regarding the users who just want to be able to pump data into Spark, my 
understanding is that replacing isolated nodes in a query plan is easy. That 
said, our goal here is to be able to push down as much as possible into the 
underlying datastore.

To your second question:

The issue is that if you build up pushdowns incrementally and not all at once, 
you end up having to reject pushdowns and filters that you actually can do, 
which unnecessarily increases overheads.

For example, the dataset

a b c
1 2 3
1 3 3
1 3 4
2 1 1
2 0 1

can efficiently push down sort(b, c) if I have already applied the filter a = 
1, but otherwise will force a sort in Spark. On the PR I detail a case I see 
where I can push down two equality filters iff I am given them at the same 
time, whilst not being able to one at a time.

There are loads of cases like this - you can imagine someone being able to push 
down a sort before a filter is applied, but not afterwards. However, maybe the 
filter is so selective that it's better to push down the filter and not handle 
the sort. I don't get to make this decision, Spark does (but doesn't have good 
enough information to do it properly, whilst I do). I want to be able to choose 
the parts I push down given knowledge of my datasource - as defined the APIs 
don't let me do that, they're strictly more restrictive than the V1 APIs in 
this way.

The pattern of not considering things that can be done in bulk bites us in 
other ways. The retrieval methods end up being trickier to implement than is 
necessary because frequently a single operation provides the result of many of 
the getters, but the state is mutable, so you end up with odd caches.

For example, the work I need to do to answer unhandledFilters in V1 is roughly 
the same as the work I need to do to buildScan, so I want to cache it. This 
means that I end up with code that looks like:

public final class CachingFoo implements Foo {
private final Foo delegate;

private List currentFilters = emptyList();
private Supplier barSupplier = newSupplier(currentFilters);

public CachingFoo(Foo delegate) {
this.delegate = delegate;
}

private Supplier newSupplier(List filters) {
return Suppliers.memoize(() -> delegate.computeBar(filters));
}

@Override
public Bar computeBar(List filters) {
if (!filters.equals(currentFilters)) {
currentFilters = filters;
barSupplier = newSupplier(filters);
}

return barSupplier.get();
}
}

which caches the result required in unhandledFilters on the expectation that 
Spark will call buildScan afterwards and get to use the result..

This kind of cache becomes more prominent, but harder to deal with in the new 
APIs. As one example here, the state I will need in order to compute accurate 
column stats internally will likely be a subset of the work required in order 
to get the read tasks, tell you if I can handle filters, etc, so I'll want to 
cache them for reuse. However, the cached information needs to be appropriately 
invalidated when I add a new filter or sort order or limit, and this makes 
implementing the APIs harder and more error-prone.

One thing that'd be great is a defined contract of the order in which Spark 
calls the methods on your datasource (ideally this contract could be implied by 
the way the Java class structure works, but otherwise I can just throw).

James

On Tue, 29 Aug 2017 at 02:56 Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
James,

Thanks for the comment. I think you just pointed out a trade-off between 
expressiveness and API simplicity, compatibility and evolvability. For the max 
expressiveness, we'd want the ability to expose full query plans, and let the 
data source decide which part of the query plan can be pushed down.

The downsid

Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-28 Thread James Baker
Copying from the code review comments I just submitted on the draft API 
(https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and 
have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually 
implement (in a similar but more aggressive way than DataSource V1, because of 
the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the 
filters with the buildScan method, and then passed in again with the 
unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the 
current API does not handle well. Suppose I can handle filter A some of the 
time, and filter B some of the time. If I’m passed in both, then either A and B 
are unhandled, or A, or B, or neither. The work I have to do to work this out 
is essentially the same as I have to do while actually generating my RDD 
(essentially I have to generate my partitions), so I end up doing some weird 
caching work.

This V2 API proposal has the same issues, but perhaps moreso. In 
PrunedFilteredScan, there is essentially one degree of freedom for pruning 
(filters), so you just have to implement caching between unhandledFilters and 
buildScan. However, here we have many degrees of freedom; sorts, individual 
filters, clustering, sampling, maybe aggregations eventually - and these 
operations are not all commutative, and computing my support one-by-one can 
easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change 
my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all 
at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query 
plan to the datasource which enables this kind of replacement. Explicitly don’t 
want to give the whole query plan - that sounds painful - would prefer we push 
down only the parts of the query plan we deem to be stable. With the mix-in 
approach, I don’t think we can guarantee the properties we want without a 
two-phase thing - I’d really love to be able to just define a straightforward 
union type which is our supported pushdown stuff, and then the user can 
transform and return it.

I think this ends up being a more elegant API for consumers, and also far more 
intuitive.

James

On Mon, 28 Aug 2017 at 18:00 蒋星博 
<jiangxb1...@gmail.com<mailto:jiangxb1...@gmail.com>> wrote:
+1 (Non-binding)

Xiao Li <gatorsm...@gmail.com<mailto:gatorsm...@gmail.com>>于2017年8月28日 
周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger 
<c...@koeninger.org<mailto:c...@koeninger.org>>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-28 Thread James Baker
Copying from the code review comments I just submitted on the draft API 
(https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and 
have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually 
implement (in a similar but more aggressive way than DataSource V1, because of 
the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the 
filters with the buildScan method, and then passed in again with the 
unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the 
current API does not handle well. Suppose I can handle filter A some of the 
time, and filter B some of the time. If I’m passed in both, then either A and B 
are unhandled, or A, or B, or neither. The work I have to do to work this out 
is essentially the same as I have to do while actually generating my RDD 
(essentially I have to generate my partitions), so I end up doing some weird 
caching work.

This V2 API proposal has the same issues, but perhaps moreso. In 
PrunedFilteredScan, there is essentially one degree of freedom for pruning 
(filters), so you just have to implement caching between unhandledFilters and 
buildScan. However, here we have many degrees of freedom; sorts, individual 
filters, clustering, sampling, maybe aggregations eventually - and these 
operations are not all commutative, and computing my support one-by-one can 
easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change 
my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all 
at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query 
plan to the datasource which enables this kind of replacement. Explicitly don’t 
want to give the whole query plan - that sounds painful - would prefer we push 
down only the parts of the query plan we deem to be stable. With the mix-in 
approach, I don’t think we can guarantee the properties we want without a 
two-phase thing - I’d really love to be able to just define a straightforward 
union type which is our supported pushdown stuff, and then the user can 
transform and return it.

I think this ends up being a more elegant API for consumers, and also far more 
intuitive.

James

On Mon, 28 Aug 2017 at 18:00 蒋星博 
<jiangxb1...@gmail.com<mailto:jiangxb1...@gmail.com>> wrote:
+1 (Non-binding)

Xiao Li <gatorsm...@gmail.com<mailto:gatorsm...@gmail.com>>于2017年8月28日 
周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger 
<c...@koeninger.org<mailto:c...@koeninger.org>>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2

2017-08-28 Thread James Baker
Copying from the code review comments I just submitted on the draft API 
(https://github.com/cloud-fan/spark/pull/10#pullrequestreview-59088745):

Context here is that I've spent some time implementing a Spark datasource and 
have had some issues with the current API which are made worse in V2.

The general conclusion I’ve come to here is that this is very hard to actually 
implement (in a similar but more aggressive way than DataSource V1, because of 
the extra methods and dimensions we get in V2).

In DataSources V1 PrunedFilteredScan, the issue is that you are passed in the 
filters with the buildScan method, and then passed in again with the 
unhandledFilters method.

However, the filters that you can’t handle might be data dependent, which the 
current API does not handle well. Suppose I can handle filter A some of the 
time, and filter B some of the time. If I’m passed in both, then either A and B 
are unhandled, or A, or B, or neither. The work I have to do to work this out 
is essentially the same as I have to do while actually generating my RDD 
(essentially I have to generate my partitions), so I end up doing some weird 
caching work.

This V2 API proposal has the same issues, but perhaps moreso. In 
PrunedFilteredScan, there is essentially one degree of freedom for pruning 
(filters), so you just have to implement caching between unhandledFilters and 
buildScan. However, here we have many degrees of freedom; sorts, individual 
filters, clustering, sampling, maybe aggregations eventually - and these 
operations are not all commutative, and computing my support one-by-one can 
easily end up being more expensive than computing all in one go.

For some trivial examples:

- After filtering, I might be sorted, whilst before filtering I might not be.

- Filtering with certain filters might affect my ability to push down others.

- Filtering with aggregations (as mooted) might not be possible to push down.

And with the API as currently mooted, I need to be able to go back and change 
my results because they might change later.

Really what would be good here is to pass all of the filters and sorts etc all 
at once, and then I return the parts I can’t handle.

I’d prefer in general that this be implemented by passing some kind of query 
plan to the datasource which enables this kind of replacement. Explicitly don’t 
want to give the whole query plan - that sounds painful - would prefer we push 
down only the parts of the query plan we deem to be stable. With the mix-in 
approach, I don’t think we can guarantee the properties we want without a 
two-phase thing - I’d really love to be able to just define a straightforward 
union type which is our supported pushdown stuff, and then the user can 
transform and return it.

I think this ends up being a more elegant API for consumers, and also far more 
intuitive.

James

On Mon, 28 Aug 2017 at 18:00 蒋星博 
<jiangxb1...@gmail.com<mailto:jiangxb1...@gmail.com>> wrote:
+1 (Non-binding)

Xiao Li <gatorsm...@gmail.com<mailto:gatorsm...@gmail.com>>于2017年8月28日 
周一下午5:38写道:
+1

2017-08-28 12:45 GMT-07:00 Cody Koeninger 
<c...@koeninger.org<mailto:c...@koeninger.org>>:
Just wanted to point out that because the jira isn't labeled SPIP, it
won't have shown up linked from

http://spark.apache.org/improvement-proposals.html

On Mon, Aug 28, 2017 at 2:20 PM, Wenchen Fan 
<cloud0...@gmail.com<mailto:cloud0...@gmail.com>> wrote:
> Hi all,
>
> It has been almost 2 weeks since I proposed the data source V2 for
> discussion, and we already got some feedbacks on the JIRA ticket and the
> prototype PR, so I'd like to call for a vote.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit
>
> Note that, this vote should focus on high-level design/framework, not
> specified APIs, as we can always change/improve specified APIs during
> development.
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




RE: [VOTE] Release Apache Spark 2.0.0 (RC4)

2016-07-15 Thread james
-1

This bug SPARK-16515 in Spark 2.0 breaks our cases which can run on 1.6.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-2-0-0-RC4-tp18317p18341.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



How Spark SQL correctly connect hive metastore database with Spark 2.0 ?

2016-05-12 Thread james
Hi Spark guys,
I am try to run Spark SQL using bin/spark-sql with Spark 2.0 master
code(commit ba181c0c7a32b0e81bbcdbe5eed94fc97b58c83e) but ran across an
issue that it always connect local derby database and can't connect my
existing hive metastore database. Could you help me to check what's the root
cause ? What's specific configuration for integration with hive metastore in
Spark 2.0 ? BTW, this case is OK in Spark 1.6.

Build package command:
./dev/make-distribution.sh --tgz -Pyarn -Phadoop-2.6
-Dhadoop.version=2.6.0-cdh5.5.1 -Phive -Phive-thriftserver -DskipTests

Key configurations in spark-defaults.conf:
spark.sql.hive.metastore.version=1.1.0
spark.sql.hive.metastore.jars=/usr/lib/hive/lib/*:/usr/lib/hadoop/client/*
spark.executor.extraClassPath=/etc/hive/conf
spark.driver.extraClassPath=/etc/hive/conf
spark.yarn.jars=local:/usr/lib/spark/jars/*

There is existing hive metastore database named by "test_sparksql". I always
got error "metastore.ObjectStore: Failed to get database test_sparksql,
returning NoSuchObjectException" after issuing 'use test_sparksql'. Please
see below steps for details.
 
$ /usr/lib/spark/bin/spark-sql --master yarn --deploy-mode client

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/avro/avro-tools-1.7.6-cdh5.5.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/05/12 22:23:28 WARN conf.HiveConf: HiveConf of name
hive.enable.spark.execution.engine does not exist
16/05/12 22:23:30 INFO metastore.HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/05/12 22:23:30 INFO metastore.ObjectStore: ObjectStore, initialize called
16/05/12 22:23:30 WARN DataNucleus.General: Plugin (Bundle)
"org.datanucleus.store.rdbms" is already registered. Ensure you dont have
multiple JAR versions of the same plugin in the classpath. The URL
"file:/usr/lib/hive/lib/datanucleus-rdbms-3.2.9.jar" is already registered,
and you are trying to register an identical plugin located at URL
"file:/usr/lib/spark/jars/datanucleus-rdbms-3.2.9.jar."
16/05/12 22:23:30 WARN DataNucleus.General: Plugin (Bundle)
"org.datanucleus" is already registered. Ensure you dont have multiple JAR
versions of the same plugin in the classpath. The URL
"file:/usr/lib/hive/lib/datanucleus-core-3.2.10.jar" is already registered,
and you are trying to register an identical plugin located at URL
"file:/usr/lib/spark/jars/datanucleus-core-3.2.10.jar."
16/05/12 22:23:30 WARN DataNucleus.General: Plugin (Bundle)
"org.datanucleus.api.jdo" is already registered. Ensure you dont have
multiple JAR versions of the same plugin in the classpath. The URL
"file:/usr/lib/spark/jars/datanucleus-api-jdo-3.2.6.jar" is already
registered, and you are trying to register an identical plugin located at
URL "file:/usr/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar."
16/05/12 22:23:30 INFO DataNucleus.Persistence: Property
datanucleus.cache.level2 unknown - will be ignored
16/05/12 22:23:30 INFO DataNucleus.Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/05/12 22:23:31 WARN conf.HiveConf: HiveConf of name
hive.enable.spark.execution.engine does not exist
16/05/12 22:23:31 INFO metastore.ObjectStore: Setting MetaStore object pin
classes with
hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/05/12 22:23:32 INFO DataNucleus.Datastore: The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
16/05/12 22:23:32 INFO DataNucleus.Datastore: The class
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only"
so does not have its own datastore table.
16/05/12 22:23:33 INFO DataNucleus.Datastore: The class
"org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
"embedded-only" so does not have its own datastore table.
16/05/12 22:23:33 INFO DataNucleus.Datastore: The class
"org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only"
so does not have its own datastore table.
16/05/12 22:23:33 INFO metastore.MetaStoreDirectSql: Using direct SQL,
underlying DB is DERBY
16/05/12 22:23:33 INFO metastore.ObjectStore: Initialized ObjectStore
16/05/12 22:23:33 WARN metastore.ObjectStore: Version information not found
in metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
16/05/12 22:23:33 WARN metastore.ObjectStore: Failed to get database
default, returning NoSuchObjectException
16/05/12 22:23:34 INFO 

Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread James Hammerton
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu <yuzhih...@gmail.com> wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin <linbojin...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
>> "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
>> UserDefinedFunction(,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
>> string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---++---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>


Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-22 Thread james
I guess different workload cause diff result ?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-OutOfMemoryError-Unable-to-acquire-bytes-of-memory-tp16773p16789.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-22 Thread james
Hi,
I also found 'Unable to acquire memory' issue using Spark 1.6.1 with Dynamic
allocation on YARN. My case happened with setting
spark.sql.shuffle.partitions larger than 200. From error stack, it has a
diff with issue reported by Nezih and not sure if these has same root cause.

Thanks 
James

16/03/17 16:02:11 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 1912805 bytes
16/03/17 16:02:12 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 1 to hw-node3:55062
16/03/17 16:02:12 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 1912805 bytes
16/03/17 16:02:16 INFO scheduler.TaskSetManager: Starting task 280.0 in
stage 153.0 (TID 9390, hw-node5, partition 280,PROCESS_LOCAL, 2432 bytes)
16/03/17 16:02:16 WARN scheduler.TaskSetManager: Lost task 170.0 in stage
153.0 (TID 9280, hw-node5): java.lang.OutOfMemoryError: Unable to acquire
1073741824 bytes of memory, got 1060110796
at
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:295)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:330)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:91)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:168)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:90)
at org.apache.spark.sql.execution.Sort$$anonfun$1.apply(Sort.scala:64)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-OutOfMemoryError-Unable-to-acquire-bytes-of-memory-tp16773p16787.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: ORC file writing hangs in pyspark

2016-02-24 Thread James Barney
Thank you for the suggestions. We looked at the live spark UI and yarn app
logs and found what we think to be the issue: in spark 1.5.2, the FPGrowth
algorithm doesn't require you to specify the number of partitions in your
input data. Without specifying, FPGrowth puts all of its data into one
partition however. Subsequently, only one executor is responsible for
writing the ORC file from the resultant dataframe that FPGrowth puts out.
That's what was causing it to hang.

After specifying the number of partitions in FPGrowth, upon writing, the
writing step continues and finishes quickly.

Thank you again for the suggestions

On Tue, Feb 23, 2016 at 9:28 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:

> Hi James,
>
> You can try to write with other format, e.g., parquet to see whether it is
> a orc specific issue or more generic issue.
>
> Thanks.
>
> Zhan Zhang
>
> On Feb 23, 2016, at 6:05 AM, James Barney <jamesbarne...@gmail.com> wrote:
>
> I'm trying to write an ORC file after running the FPGrowth algorithm on a
> dataset of around just 2GB in size. The algorithm performs well and can
> display results if I take(n) the freqItemSets() of the result after
> converting that to a DF.
>
> I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.
>
> I get the results from querying a Hive table, also ORC format, running a
> number of maps, joins, and filters on the data.
>
> When the program attempts to write the files:
> result.write.orc('/data/staged/raw_result')
>   size_1_buckets.write.orc('/data/staged/size_1_results')
>   filter_size_2_buckets.write.orc('/data/staged/size_2_results')
>
> The first path, /data/staged/raw_result, is created with a _temporary
> folder, but the data is never written. The job hangs at this point,
> apparently indefinitely.
>
> Additionally, no logs are recorded or available for the jobs on the
> history server.
>
> What could be the problem?
>
>
>


ORC file writing hangs in pyspark

2016-02-23 Thread James Barney
I'm trying to write an ORC file after running the FPGrowth algorithm on a
dataset of around just 2GB in size. The algorithm performs well and can
display results if I take(n) the freqItemSets() of the result after
converting that to a DF.

I'm using Spark 1.5.2 on HDP 2.3.4 and Python 3.4.2 on Yarn.

I get the results from querying a Hive table, also ORC format, running a
number of maps, joins, and filters on the data.

When the program attempts to write the files:
result.write.orc('/data/staged/raw_result')
  size_1_buckets.write.orc('/data/staged/size_1_results')
  filter_size_2_buckets.write.orc('/data/staged/size_2_results')

The first path, /data/staged/raw_result, is created with a _temporary
folder, but the data is never written. The job hangs at this point,
apparently indefinitely.

Additionally, no logs are recorded or available for the jobs on the history
server.

What could be the problem?


Re: [VOTE] Release Apache Spark 1.5.1 (RC1)

2015-09-28 Thread james
+1 

1) Build binary instruction: ./make-distribution.sh --tgz --skip-java-test
-Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
-DskipTests
2) Run Spark SQL with YARN client mode

This 1.5.1 RC1 package have better test results than previous 1.5.0 except
for Spark-10484,Spark-4266 open issue.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-5-1-RC1-tp14310p14388.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.5.0 (RC3)

2015-09-07 Thread james
add a critical bug https://issues.apache.org/jira/browse/SPARK-10474
(Aggregation failed with unable to acquire memory)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-5-0-RC3-tp13928p13987.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [VOTE] Release Apache Spark 1.5.0 (RC3)

2015-09-06 Thread james
I saw a new "spark.shuffle.manager=tungsten-sort" implemented in
https://issues.apache.org/jira/browse/SPARK-7081, but it can't be found its
corresponding description in
http://people.apache.org/~pwendell/spark-releases/spark-1.5.0-rc3-docs/configuration.html(Currenlty
there are only 'sort' and 'hash' two options).



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-5-0-RC3-tp13928p13984.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Came across Spark SQL hang/Error issue with Spark 1.5 Tungsten feature

2015-08-03 Thread james
Based on the latest spark code(commit
608353c8e8e50461fafff91a2c885dca8af3aaa8) and used the same Spark SQL query
to test two group of combined configuration and seemed that currently it
don't work fine in tungsten-sort shuffle manager from below results:

*Test 1# (PASSED)*
spark.shuffle.manager=sort
spark.sql.codegen=true
spark.sql.unsafe.enabled=true 

*Test 2#(FAILED)*
spark.shuffle.manager=tungsten-sort
spark.sql.codegen=true
spark.sql.unsafe.enabled=true 

15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode4:50313
15/08/03 16:46:02 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 3 is 586 bytes
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:60490
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:56319
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:58179
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:32816
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:55840
15/08/03 16:46:02 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:46874
15/08/03 16:46:02 WARN scheduler.TaskSetManager: Lost task 42.0 in stage
158.0 (TID 1548, bignode4): java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:118)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:107)
at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:167)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:140)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:120)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Came-across-Spark-SQL-hang-Error-issue-with-Spark-1-5-Tungsten-feature-tp13537p13563.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Came across Spark SQL hang/Error issue with Spark 1.5 Tungsten feature

2015-08-02 Thread james
Thank you for your reply!
Do you mean that currently if i want to use this Tungsten feature, we had to
set sort shuffle manager(spark.shuffle.manager=sort) ,right ?  However, I
saw a slide Deep Dive into Project Tungsten: Bringing Spark Closer to Bare
Metal published in Spark Summit 2015 and it seems to recommend
'tungsten-sort' manager.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Came-across-Spark-SQL-hang-Error-issue-with-Spark-1-5-Tungsten-feature-tp13537p13561.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Came across Spark SQL hang issue with Spark 1.5 Tungsten feature

2015-07-31 Thread james
I try to enable Tungsten with Spark SQL and set below 3 parameters, but i
found the Spark SQL always hang below point. So could you please point me
what's the potential cause ? I'd appreciate any input.
spark.shuffle.manager=tungsten-sort
spark.sql.codegen=true
spark.sql.unsafe.enabled=true

15/07/31 15:19:46 INFO scheduler.TaskSetManager: Starting task 110.0 in
stage 131.0 (TID 280, bignode3, PROCESS_LOCAL, 1446 bytes)
15/07/31 15:19:46 INFO scheduler.TaskSetManager: Starting task 111.0 in
stage 131.0 (TID 281, bignode2, PROCESS_LOCAL, 1446 bytes)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode3:38948 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode3:57341 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode1:33229 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode1:42261 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode2:44033 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode2:42863 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO storage.BlockManagerInfo: Added broadcast_132_piece0
in memory on bignode4:58639 (size: 7.4 KB, free: 1766.4 MB)
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:46462
15/07/31 15:19:46 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 3 is 71847 bytes
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:38803
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:35241
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:48323
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:56697
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode4:55810
15/07/31 15:19:46 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:37386




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Came-across-Spark-SQL-hang-issue-with-Spark-1-5-Tungsten-feature-tp13537.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Came across Spark SQL hang/Error issue with Spark 1.5 Tungsten feature

2015-07-31 Thread james
Another error:
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:40443
15/07/31 16:15:28 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 3 is 583 bytes
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode1:40474
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:34052
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:46929
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode3:50890
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode2:47795
15/07/31 16:15:28 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send
map output locations for shuffle 3 to bignode4:35120
15/07/31 16:15:28 INFO scheduler.TaskSetManager: Finished task 32.0 in stage
151.0 (TID 1203) in 155 ms on bignode3 (1/50)
15/07/31 16:15:28 INFO scheduler.TaskSetManager: Finished task 35.0 in stage
151.0 (TID 1204) in 157 ms on bignode2 (2/50)
15/07/31 16:15:28 INFO scheduler.TaskSetManager: Finished task 8.0 in stage
151.0 (TID 1196) in 168 ms on bignode3 (3/50)
15/07/31 16:15:28 WARN scheduler.TaskSetManager: Lost task 46.0 in stage
151.0 (TID 1184, bignode1): java.lang.NegativeArraySizeException
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:314)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getUTF8String(UnsafeRow.java:297)
at SC$SpecificProjection.apply(Unknown Source)
at
org.apache.spark.sql.catalyst.expressions.FromUnsafeProjection.apply(Projection.scala:152)
at
org.apache.spark.sql.catalyst.expressions.FromUnsafeProjection.apply(Projection.scala:140)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:148)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:71)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Came-across-Spark-SQL-hang-Error-issue-with-Spark-1-5-Tungsten-feature-tp13537p13538.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



graph.mapVertices() function obtain edge triplets with null attribute

2015-02-26 Thread James
My code

```
// Initial the graph, assign a counter to each vertex that contains the
vertex id only
var anfGraph = graph.mapVertices { case (vid, _) =
  val counter = new HyperLogLog(5)
  counter.offer(vid)
  counter
}

val nullVertex = anfGraph.triplets.filter(edge = edge.srcAttr ==
null).first
// There is an edge whose src attr is null

anfGraph.vertices.filter(_._1 == nullVertex).first
// I could see that the vertex has a not null attribute

// messages = anfGraph.aggregateMessages(msgFun, mergeMessage)   // -
NullPointerException

```

My spark version:1.2.0

Alcaid


Re: Why a program would receive null from send message of mapReduceTriplets

2015-02-13 Thread James
I have a question:

*How could the attributes of triplets of a graph get update after
mapVertices() func? *

My code

```
// Initial the graph, assign a counter to each vertex that contains the
vertex id only
var anfGraph = graph.mapVertices { case (vid, _) =
  val counter = new HyperLogLog(5)
  counter.offer(vid)
  counter
}

val nullVertex = anfGraph.triplets.filter(edge = edge.srcAttr ==
null).first

anfGraph.vertices.filter(_._1 == nullVertex).first
// I could see that the vertex has a not null attribute

// messages = anfGraph.aggregateMessages(msgFun, mergeMessage)   // -
NullPointerException

```

I could found that some vertex attributes in some triplets are null, but
not all.


Alcaid


2015-02-13 14:50 GMT+08:00 Reynold Xin r...@databricks.com:

 Then maybe you actually had a null in your vertex attribute?


 On Thu, Feb 12, 2015 at 10:47 PM, James alcaid1...@gmail.com wrote:

 I changed the mapReduceTriplets() func to aggregateMessages(), but it
 still failed.


 2015-02-13 6:52 GMT+08:00 Reynold Xin r...@databricks.com:

 Can you use the new aggregateNeighbors method? I suspect the null is
 coming from automatic join elimination, which detects bytecode to see if
 you need the src or dst vertex data. Occasionally it can fail to detect. In
 the new aggregateNeighbors API, the caller needs to explicitly specifying
 that, making it more robust.


 On Thu, Feb 12, 2015 at 6:26 AM, James alcaid1...@gmail.com wrote:

 Hello,

 When I am running the code on a much bigger size graph, I met
 NullPointerException.

 I found that is because the sendMessage() function receive a triplet
 that
 edge.srcAttr or edge.dstAttr is null. Thus I wonder why it will happen
 as I
 am sure every vertices have a attr.

 Any returns is appreciated.

 Alcaid


 2015-02-11 19:30 GMT+08:00 James alcaid1...@gmail.com:

  Hello,
 
  Recently  I am trying to estimate the average distance of a big graph
  using spark with the help of [HyperAnf](
  http://dl.acm.org/citation.cfm?id=1963493).
 
  It works like Connect Componenet algorithm, while the attribute of a
  vertex is a HyperLogLog counter that at k-th iteration it estimates
 the
  number of vertices it could reaches less than k hops.
 
  I have successfully run the code on a graph with 20M vertices. But I
 still
  need help:
 
 
  *I think the code could work more efficiently especially the Send
  message function, but I am not sure about what will happen if a
 vertex
  receive no message at a iteration.*
 
  Here is my code: https://github.com/alcaid1801/Erdos
 
  Any returns is appreciated.
 







Re: Why a program would receive null from send message of mapReduceTriplets

2015-02-12 Thread James
I am trying to run the data on spark-shell mode to find whether there is
something wrong in the code or data. As I could only reproduce the error on
a 50B edge graph.

2015-02-13 14:50 GMT+08:00 Reynold Xin r...@databricks.com:

 Then maybe you actually had a null in your vertex attribute?


 On Thu, Feb 12, 2015 at 10:47 PM, James alcaid1...@gmail.com wrote:

 I changed the mapReduceTriplets() func to aggregateMessages(), but it
 still failed.


 2015-02-13 6:52 GMT+08:00 Reynold Xin r...@databricks.com:

 Can you use the new aggregateNeighbors method? I suspect the null is
 coming from automatic join elimination, which detects bytecode to see if
 you need the src or dst vertex data. Occasionally it can fail to detect. In
 the new aggregateNeighbors API, the caller needs to explicitly specifying
 that, making it more robust.


 On Thu, Feb 12, 2015 at 6:26 AM, James alcaid1...@gmail.com wrote:

 Hello,

 When I am running the code on a much bigger size graph, I met
 NullPointerException.

 I found that is because the sendMessage() function receive a triplet
 that
 edge.srcAttr or edge.dstAttr is null. Thus I wonder why it will happen
 as I
 am sure every vertices have a attr.

 Any returns is appreciated.

 Alcaid


 2015-02-11 19:30 GMT+08:00 James alcaid1...@gmail.com:

  Hello,
 
  Recently  I am trying to estimate the average distance of a big graph
  using spark with the help of [HyperAnf](
  http://dl.acm.org/citation.cfm?id=1963493).
 
  It works like Connect Componenet algorithm, while the attribute of a
  vertex is a HyperLogLog counter that at k-th iteration it estimates
 the
  number of vertices it could reaches less than k hops.
 
  I have successfully run the code on a graph with 20M vertices. But I
 still
  need help:
 
 
  *I think the code could work more efficiently especially the Send
  message function, but I am not sure about what will happen if a
 vertex
  receive no message at a iteration.*
 
  Here is my code: https://github.com/alcaid1801/Erdos
 
  Any returns is appreciated.
 







Re: Why a program would receive null from send message of mapReduceTriplets

2015-02-12 Thread James
I changed the mapReduceTriplets() func to aggregateMessages(), but it still
failed.


2015-02-13 6:52 GMT+08:00 Reynold Xin r...@databricks.com:

 Can you use the new aggregateNeighbors method? I suspect the null is
 coming from automatic join elimination, which detects bytecode to see if
 you need the src or dst vertex data. Occasionally it can fail to detect. In
 the new aggregateNeighbors API, the caller needs to explicitly specifying
 that, making it more robust.


 On Thu, Feb 12, 2015 at 6:26 AM, James alcaid1...@gmail.com wrote:

 Hello,

 When I am running the code on a much bigger size graph, I met
 NullPointerException.

 I found that is because the sendMessage() function receive a triplet that
 edge.srcAttr or edge.dstAttr is null. Thus I wonder why it will happen as
 I
 am sure every vertices have a attr.

 Any returns is appreciated.

 Alcaid


 2015-02-11 19:30 GMT+08:00 James alcaid1...@gmail.com:

  Hello,
 
  Recently  I am trying to estimate the average distance of a big graph
  using spark with the help of [HyperAnf](
  http://dl.acm.org/citation.cfm?id=1963493).
 
  It works like Connect Componenet algorithm, while the attribute of a
  vertex is a HyperLogLog counter that at k-th iteration it estimates the
  number of vertices it could reaches less than k hops.
 
  I have successfully run the code on a graph with 20M vertices. But I
 still
  need help:
 
 
  *I think the code could work more efficiently especially the Send
  message function, but I am not sure about what will happen if a vertex
  receive no message at a iteration.*
 
  Here is my code: https://github.com/alcaid1801/Erdos
 
  Any returns is appreciated.
 





Why a program would receive null from send message of mapReduceTriplets

2015-02-12 Thread James
Hello,

When I am running the code on a much bigger size graph, I met
NullPointerException.

I found that is because the sendMessage() function receive a triplet that
edge.srcAttr or edge.dstAttr is null. Thus I wonder why it will happen as I
am sure every vertices have a attr.

Any returns is appreciated.

Alcaid


2015-02-11 19:30 GMT+08:00 James alcaid1...@gmail.com:

 Hello,

 Recently  I am trying to estimate the average distance of a big graph
 using spark with the help of [HyperAnf](
 http://dl.acm.org/citation.cfm?id=1963493).

 It works like Connect Componenet algorithm, while the attribute of a
 vertex is a HyperLogLog counter that at k-th iteration it estimates the
 number of vertices it could reaches less than k hops.

 I have successfully run the code on a graph with 20M vertices. But I still
 need help:


 *I think the code could work more efficiently especially the Send
 message function, but I am not sure about what will happen if a vertex
 receive no message at a iteration.*

 Here is my code: https://github.com/alcaid1801/Erdos

 Any returns is appreciated.



[GraphX] Estimating Average distance of a big graph using GraphX

2015-02-11 Thread James
Hello,

Recently  I am trying to estimate the average distance of a big graph using
spark with the help of [HyperAnf](http://dl.acm.org/citation.cfm?id=1963493
).

It works like Connect Componenet algorithm, while the attribute of a vertex
is a HyperLogLog counter that at k-th iteration it estimates the number of
vertices it could reaches less than k hops.

I have successfully run the code on a graph with 20M vertices. But I still
need help:


*I think the code could work more efficiently especially the Send message
function, but I am not sure about what will happen if a vertex receive no
message at a iteration.*

Here is my code: https://github.com/alcaid1801/Erdos

Any returns is appreciated.


not found: type LocalSparkContext

2015-01-20 Thread James
Hi all,

When I was trying to write a test on my spark application I met

```
Error:(14, 43) not found: type LocalSparkContext
class HyperANFSuite extends FunSuite with LocalSparkContext {
```

At the source code of spark-core I could not found LocalSparkContext,
thus I wonder how to write a test like [this] (
https://github.com/apache/spark/blob/master/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
)

Alcaid


Re: not found: type LocalSparkContext

2015-01-20 Thread James
I could not correctly import org.apache.spark.LocalSparkContext,

I use sbt on Intellij for developing,here is my build sbt.

```
libraryDependencies += org.apache.spark %% spark-core % 1.2.0

libraryDependencies += org.apache.spark %% spark-graphx % 1.2.0

libraryDependencies += com.clearspring.analytics % stream % 2.7.0

libraryDependencies += org.scalatest % scalatest_2.10 % 2.0

resolvers += Akka Repository at http://repo.akka.io/releases/;
```

I think maybe I have make some mistakes on the library setting, as a new
developer of spark application, I wonder what is the standard procedure of
developing a spark application.

Any reply is appreciated.


Alcaid


2015-01-21 2:05 GMT+08:00 Will Benton wi...@redhat.com:

 It's declared here:


 https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/LocalSparkContext.scala

 I assume you're already importing LocalSparkContext, but since the test
 classes aren't included in Spark packages, you'll also need to package them
 up in order to use them in your application (viz., outside of Spark).



 best,
 wb

 - Original Message -
  From: James alcaid1...@gmail.com
  To: dev@spark.apache.org
  Sent: Tuesday, January 20, 2015 6:35:07 AM
  Subject: not found: type LocalSparkContext
 
  Hi all,
 
  When I was trying to write a test on my spark application I met
 
  ```
  Error:(14, 43) not found: type LocalSparkContext
  class HyperANFSuite extends FunSuite with LocalSparkContext {
  ```
 
  At the source code of spark-core I could not found LocalSparkContext,
  thus I wonder how to write a test like [this] (
 
 https://github.com/apache/spark/blob/master/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala
  )
 
  Alcaid
 



Using graphx to calculate average distance of a big graph

2015-01-04 Thread James
Recently we want to use spark to calculate the average shortest path
distance between each reachable pair of nodes in a very big graph.

Is there any one ever try this? We hope to discuss about the problem.


Re: will/when Spark/SparkSQL will support ORCFile format

2014-10-09 Thread James Yu
For performance, will foreign data format support, same as native ones?

Thanks,
James


On Wed, Oct 8, 2014 at 11:03 PM, Cheng Lian lian.cs@gmail.com wrote:

 The foreign data source API PR also matters here
 https://www.github.com/apache/spark/pull/2475

 Foreign data source like ORC can be added more easily and systematically
 after this PR is merged.

 On 10/9/14 8:22 AM, James Yu wrote:

 Thanks Mark! I will keep eye on it.

 @Evan, I saw people use both format, so I really want to have Spark
 support
 ORCFile.


 On Wed, Oct 8, 2014 at 11:12 AM, Mark Hamstra m...@clearstorydata.com
 wrote:

  https://github.com/apache/spark/pull/2576



 On Wed, Oct 8, 2014 at 11:01 AM, Evan Chan velvia.git...@gmail.com
 wrote:

  James,

 Michael at the meetup last night said there was some development
 activity around ORCFiles.

 I'm curious though, what are the pros and cons of ORCFiles vs Parquet?

 On Wed, Oct 8, 2014 at 10:03 AM, James Yu jym2...@gmail.com wrote:

 Didn't see anyone asked the question before, but I was wondering if

 anyone

 knows if Spark/SparkSQL will support ORCFile format soon? ORCFile is
 getting more and more popular hi Hive world.

 Thanks,
 James

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org






Re: will/when Spark/SparkSQL will support ORCFile format

2014-10-09 Thread James Yu
Sounds great, thanks!



On Thu, Oct 9, 2014 at 2:22 PM, Michael Armbrust mich...@databricks.com
wrote:

 Yes, the foreign sources work is only about exposing a stable set of APIs
 for external libraries to link against (to avoid the spark assembly
 becoming a dependency mess).  The code path these APIs use will be the same
 as that for datasources included in the core spark sql library.

 Michael

 On Thu, Oct 9, 2014 at 2:18 PM, James Yu jym2...@gmail.com wrote:

 For performance, will foreign data format support, same as native ones?

 Thanks,
 James


 On Wed, Oct 8, 2014 at 11:03 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  The foreign data source API PR also matters here
  https://www.github.com/apache/spark/pull/2475
 
  Foreign data source like ORC can be added more easily and systematically
  after this PR is merged.
 
  On 10/9/14 8:22 AM, James Yu wrote:
 
  Thanks Mark! I will keep eye on it.
 
  @Evan, I saw people use both format, so I really want to have Spark
  support
  ORCFile.
 
 
  On Wed, Oct 8, 2014 at 11:12 AM, Mark Hamstra m...@clearstorydata.com
 
  wrote:
 
   https://github.com/apache/spark/pull/2576
 
 
 
  On Wed, Oct 8, 2014 at 11:01 AM, Evan Chan velvia.git...@gmail.com
  wrote:
 
   James,
 
  Michael at the meetup last night said there was some development
  activity around ORCFiles.
 
  I'm curious though, what are the pros and cons of ORCFiles vs
 Parquet?
 
  On Wed, Oct 8, 2014 at 10:03 AM, James Yu jym2...@gmail.com wrote:
 
  Didn't see anyone asked the question before, but I was wondering if
 
  anyone
 
  knows if Spark/SparkSQL will support ORCFile format soon? ORCFile is
  getting more and more popular hi Hive world.
 
  Thanks,
  James
 
  -
  To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
  For additional commands, e-mail: dev-h...@spark.apache.org
 
 
 
 





will/when Spark/SparkSQL will support ORCFile format

2014-10-08 Thread James Yu
Didn't see anyone asked the question before, but I was wondering if anyone
knows if Spark/SparkSQL will support ORCFile format soon? ORCFile is
getting more and more popular hi Hive world.

Thanks,
James