Re: Identifying specific persisted DataFrames via getPersistentRDDs()

2018-05-03 Thread Reynold Xin
Why do you need the underlying RDDs? Can't you just unpersist the
dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas 
wrote:

> This seems to be an underexposed part of the API. My use case is this: I
> want to unpersist all DataFrames except a specific few. I want to do this
> because I know at a specific point in my pipeline that I have a handful of
> DataFrames that I need, and everything else is no longer needed.
>
> The problem is that there doesn’t appear to be a way to identify specific
> DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(),
> which is the only way I’m aware of to ask Spark for all currently persisted
> RDDs:
>
> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> 
> >>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
> [(3, JavaObject id=o36)]
>
> As you can see, the id of the persisted RDD, 8, doesn’t match the id
> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
> returned by getPersistentRDDs() and know which ones I want to keep.
>
> id() itself appears to be an undocumented method of the RDD API, and in
> PySpark getPersistentRDDs() is buried behind the Java sub-objects
> , so I know I’m
> reaching here. But is there a way to do what I want in PySpark without
> manually tracking everything I’ve persisted myself?
>
> And more broadly speaking, do we want to add additional APIs, or formalize
> currently undocumented APIs like id(), to make this use case possible?
>
> Nick
> ​
>


Re: [Structured streaming, V2] commit on ContinuousReader

2018-05-03 Thread Joseph Torres
In the master branch, we currently call this method in
ContinuousExecution.commit().

Note that the ContinuousReader API is experimental and undergoing active
design work. We will definitely include some kind of functionality to
back-commit data once it's been processed, but the handle we eventually
stabilize won't necessarily be `*commit(end: Offset)`.*

On Thu, May 3, 2018 at 10:43 AM, Jiří Syrový  wrote:

> Version: 2.3, DataSourceV2, ContinuousReader
>
> Hi,
>
> We're creating a new data source to fetch data from streaming source that
> requires commiting received data and we would like to commit data once in a
> while after it has been retrieved and correctly processed and then fetch
> more.
>
> One option could be to rely on spark committing already read data using 
> *commit(end:
> Offset)* that is present in *ContinuousReader (v2.reader.streaming)*, but
> it seems that this method is never called.
>
> The question is if this method *commit(end: Offset) is ever* used and
> when? I went through part of Spark code base, but haven't really found any
> place where it could be called.
>
> Thanks,
> Jiri
>
>


Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-03 Thread Ryan Blue
Yes, you can usually use a broadcast join to avoid skew problems.

On Wed, May 2, 2018 at 8:57 PM, Pralabh Kumar 
wrote:

> I am performing join operation , if I convert reduce side join to map side
> (no shuffle will happen)  and I assume in that case this error shouldn't
> come. Let me know if this understanding is correct
>
> On Tue, May 1, 2018 at 9:37 PM, Ryan Blue  wrote:
>
>> This is usually caused by skew. Sometimes you can work around it by in
>> creasing the number of partitions like you tried, but when that doesn’t
>> work you need to change the partitioning that you’re using.
>>
>> If you’re aggregating, try adding an intermediate aggregation. For
>> example, if your query is select sum(x), a from t group by a, then try select
>> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
>> group by a.
>>
>> rb
>> ​
>>
>> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar 
>> wrote:
>>
>>> Hi
>>>
>>> I am getting the above error in Spark SQL . I have increase (using 5000
>>> ) number of partitions but still getting the same error .
>>>
>>> My data most probably is skew.
>>>
>>>
>>>
>>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>> at 
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


[Structured streaming, V2] commit on ContinuousReader

2018-05-03 Thread Jiří Syrový
Version: 2.3, DataSourceV2, ContinuousReader

Hi,

We're creating a new data source to fetch data from streaming source that
requires commiting received data and we would like to commit data once in a
while after it has been retrieved and correctly processed and then fetch
more.

One option could be to rely on spark committing already read data
using *commit(end:
Offset)* that is present in *ContinuousReader (v2.reader.streaming)*, but
it seems that this method is never called.

The question is if this method *commit(end: Offset) is ever* used and when?
I went through part of Spark code base, but haven't really found any place
where it could be called.

Thanks,
Jiri


Re: SparkR test failures in PR builder

2018-05-03 Thread Xiao Li
Thank you for working on this. It helps a lot!

Xiao

2018-05-03 8:42 GMT-07:00 Felix Cheung :

> This is resolved.
>
> Please see https://issues.apache.org/jira/browse/SPARK-24152
>
> --
> *From:* Kazuaki Ishizaki 
> *Sent:* Wednesday, May 2, 2018 4:51:11 PM
> *To:* dev
> *Cc:* Joseph Bradley; Hossein Falaki
> *Subject:* Re: SparkR test failures in PR builder
>
> I am not familiar with SparkR or CRAN. However, I remember that we had the
> similar situation.
>
> Here is a great work at that time. When I have just visited this PR, I
> think that we have the similar situation (i.e. format error) again.
> https://github.com/apache/spark/pull/20005
>
> Any other comments are appreciated.
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From:Joseph Bradley 
> To:dev 
> Cc:Hossein Falaki 
> Date:2018/05/03 07:31
> Subject:SparkR test failures in PR builder
> --
>
>
>
> Hi all,
>
> Does anyone know why the PR builder keeps failing on SparkR's CRAN
> checks?  I've seen this in a lot of unrelated PRs.  E.g.:
> *https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90065/console*
> 
>
> Hossein spotted this line:
> ```
> * checking CRAN incoming feasibility ...Error in
> .check_package_CRAN_incoming(pkgdir) :
>   dims [product 24] do not match the length of object [0]
> ```
> and suggested that it could be CRAN flakiness.  I'm not familiar with
> CRAN, but do others have thoughts about how to fix this?
>
> Thanks!
> Joseph
>
> --
> Joseph Bradley
> Software Engineer - Machine Learning
> Databricks, Inc.
> 
>
>


Re: SparkR test failures in PR builder

2018-05-03 Thread Felix Cheung
This is resolved.

Please see https://issues.apache.org/jira/browse/SPARK-24152


From: Kazuaki Ishizaki 
Sent: Wednesday, May 2, 2018 4:51:11 PM
To: dev
Cc: Joseph Bradley; Hossein Falaki
Subject: Re: SparkR test failures in PR builder

I am not familiar with SparkR or CRAN. However, I remember that we had the 
similar situation.

Here is a great work at that time. When I have just visited this PR, I think 
that we have the similar situation (i.e. format error) again.
https://github.com/apache/spark/pull/20005

Any other comments are appreciated.

Regards,
Kazuaki Ishizaki



From:Joseph Bradley 
To:dev 
Cc:Hossein Falaki 
Date:2018/05/03 07:31
Subject:SparkR test failures in PR builder




Hi all,

Does anyone know why the PR builder keeps failing on SparkR's CRAN checks?  
I've seen this in a lot of unrelated PRs.  E.g.: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90065/console

Hossein spotted this line:
```
* checking CRAN incoming feasibility ...Error in 
.check_package_CRAN_incoming(pkgdir) :
  dims [product 24] do not match the length of object [0]
```
and suggested that it could be CRAN flakiness.  I'm not familiar with CRAN, but 
do others have thoughts about how to fix this?

Thanks!
Joseph

--
Joseph Bradley
Software Engineer - Machine Learning
Databricks, Inc.




Re: Custom datasource as a wrapper for existing ones?

2018-05-03 Thread Jörn Franke
It changed from 2.0 to 2.1 to 2.2 ...
Not much but still changed. I somehow agree that this is still manageable 

> On 3. May 2018, at 16:46, Wenchen Fan  wrote:
> 
> Hi Jakub,
> 
> Yea I think data source would be the most elegant way to solve your problem. 
> Unfortunately in Spark 2.3 the only stable data source API is data source v1, 
> which can't be used to implement high-performance data source. Data source v2 
> is still a preview version in Spark 2.3 and may change in the next release.
> 
> For now I'd suggest you take a look at `FileFormat`, which is the API for the 
> Spark builtin file-based data source like parquet. It's an internal API but 
> has not been changed for a long time. In the future, data source v2 would be 
> the best solution.
> 
> Thanks,
> Wenchen
> 
>> On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak  wrote:
>> Hello,
>> 
>> Thanks a lot for your answers. 
>> 
>> We normally look for some stability so the use of internal APIs that are a 
>> subject to change with no warning are somewhat questionable. 
>> As to the approach of putting this functionality on top of Spark instead of 
>> a datasource - this works but poses a problem for Python. 
>> In Python we would like to reuse the code written in Java. An external lib 
>> in Java has to proxy to Python and Spark proxies as well. 
>> This means passing over objects (like SparkSession) back and forth from one 
>> jvm to the other. Not surprisingly this did not work for us in the past 
>> (although we did not push much hoping for the datasource).
>> All in all if we don’t find another solution we might go for an external 
>> library that most likely have to be reimplemented twice in Python… 
>> Or there might be a way to force our lib execution in the same JVM as Spark 
>> uses. To be seen… Again the most elegant way would be the datasource.
>> 
>> Cheers,
>> Jakub
>> 
>> 
>> > On 2 May 2018, at 21:07, Jörn Franke  wrote:
>> > 
>> > Some note on the internal API - it used to change with each release which 
>> > was quiet annoying because  other data sources (Avro, HadoopOffice etc) 
>> > had to follow up in this. In the end it is an internal API and thus does 
>> > not guarantee to be stable. If you want to have something stable you have 
>> > to use the official data source APIs with some disadvantages.
>> > 
>> >> On 2. May 2018, at 18:49, jwozniak  wrote:
>> >> 
>> >> Hello,
>> >> 
>> >> At CERN we are developing a Big Data system called NXCALS that uses Spark 
>> >> as
>> >> Extraction API.
>> >> We have implemented a custom datasource that was wrapping 2 existing ones
>> >> (parquet and Hbase) in order to hide the implementation details (location 
>> >> of
>> >> the parquet files, hbase tables, etc) and to provide an abstraction layer 
>> >> to
>> >> our users. 
>> >> We have entered a stage where we execute some performance tests on our 
>> >> data
>> >> and we have noticed that this approach did not provide the expected
>> >> performance observed using pure Spark. In other words reading a parquet 
>> >> file
>> >> with some simple predicates behaves 15 times slower if the same code is
>> >> executed from within a custom datasource (that just uses Spark to read
>> >> parquet). 
>> >> After some investigation we've learnt that Spark did not apply the same
>> >> optimisations for both. 
>> >> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> >> from SparkSession and focuses on low level Row API. 
>> >> Could you give us some suggestions of how to correctly implement our
>> >> datasource using the V2 API? 
>> >> Is this a correct way of doing it at all? 
>> >> 
>> >> What we want to achieve is to join existing datasources with some level of
>> >> additional abstraction on top. 
>> >> At the same time we want to profit from all catalyst & parquet 
>> >> optimisations
>> >> that exist for the original ones.
>> >> We also don't want to reimplement access to parquet files or Hbase at the
>> >> low level (like Row) but just profit from the Dataset API. 
>> >> We could have achieved the same by providing an external library on top of
>> >> Spark but the datasource approach looked like a more elegant solution. 
>> >> Only
>> >> the performance is still far from the desired one. 
>> >> 
>> >> Any help or direction in that matter would be greatly appreciated as we 
>> >> have
>> >> only started to build our Spark expertise yet.  
>> >> 
>> >> Best regards,
>> >> Jakub Wozniak
>> >> Software Engineer
>> >> CERN
>> >> 
>> >> 
>> >> 
>> >> --
>> >> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>> >> 
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >> 
>> 
> 


Re: Custom datasource as a wrapper for existing ones?

2018-05-03 Thread Jakub Wozniak
Hi Wenchen,

Thanks for your reply! We will have a look at the FileFormat.

Actually looking at the V2 APIs I still don’t see how you can use the existing 
datasource (like Parquet + Hbase) and wrap it up in another one.
Imagine you would like to load some files from parquet and load some tables 
from Hbase and present it together (like we do).
First of all you don’t have access to any high level Spark APIs in the 
datasource anymore… So it is not easy now to use any existing ds inside another 
ds.
Secondly even if you had still you have to translate a Dataset back into 
partitions/filters/expressions/Rows/etc to meet the V2 API requirements.

Please correct me if I’m wrong. I might not see the full picture…

Cheers,
Jakub





On 3 May 2018, at 16:46, Wenchen Fan 
> wrote:

Hi Jakub,

Yea I think data source would be the most elegant way to solve your problem. 
Unfortunately in Spark 2.3 the only stable data source API is data source v1, 
which can't be used to implement high-performance data source. Data source v2 
is still a preview version in Spark 2.3 and may change in the next release.

For now I'd suggest you take a look at `FileFormat`, which is the API for the 
Spark builtin file-based data source like parquet. It's an internal API but has 
not been changed for a long time. In the future, data source v2 would be the 
best solution.

Thanks,
Wenchen

On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak 
> wrote:
Hello,

Thanks a lot for your answers.

We normally look for some stability so the use of internal APIs that are a 
subject to change with no warning are somewhat questionable.
As to the approach of putting this functionality on top of Spark instead of a 
datasource - this works but poses a problem for Python.
In Python we would like to reuse the code written in Java. An external lib in 
Java has to proxy to Python and Spark proxies as well.
This means passing over objects (like SparkSession) back and forth from one jvm 
to the other. Not surprisingly this did not work for us in the past (although 
we did not push much hoping for the datasource).
All in all if we don’t find another solution we might go for an external 
library that most likely have to be reimplemented twice in Python…
Or there might be a way to force our lib execution in the same JVM as Spark 
uses. To be seen… Again the most elegant way would be the datasource.

Cheers,
Jakub


> On 2 May 2018, at 21:07, Jörn Franke 
> > wrote:
>
> Some note on the internal API - it used to change with each release which was 
> quiet annoying because  other data sources (Avro, HadoopOffice etc) had to 
> follow up in this. In the end it is an internal API and thus does not 
> guarantee to be stable. If you want to have something stable you have to use 
> the official data source APIs with some disadvantages.
>
>> On 2. May 2018, at 18:49, jwozniak 
>> > wrote:
>>
>> Hello,
>>
>> At CERN we are developing a Big Data system called NXCALS that uses Spark as
>> Extraction API.
>> We have implemented a custom datasource that was wrapping 2 existing ones
>> (parquet and Hbase) in order to hide the implementation details (location of
>> the parquet files, hbase tables, etc) and to provide an abstraction layer to
>> our users.
>> We have entered a stage where we execute some performance tests on our data
>> and we have noticed that this approach did not provide the expected
>> performance observed using pure Spark. In other words reading a parquet file
>> with some simple predicates behaves 15 times slower if the same code is
>> executed from within a custom datasource (that just uses Spark to read
>> parquet).
>> After some investigation we've learnt that Spark did not apply the same
>> optimisations for both.
>> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> from SparkSession and focuses on low level Row API.
>> Could you give us some suggestions of how to correctly implement our
>> datasource using the V2 API?
>> Is this a correct way of doing it at all?
>>
>> What we want to achieve is to join existing datasources with some level of
>> additional abstraction on top.
>> At the same time we want to profit from all catalyst & parquet optimisations
>> that exist for the original ones.
>> We also don't want to reimplement access to parquet files or Hbase at the
>> low level (like Row) but just profit from the Dataset API.
>> We could have achieved the same by providing an external library on top of
>> Spark but the datasource approach looked like a more elegant solution. Only
>> the performance is still far from the desired one.
>>
>> Any help or direction in that matter would be greatly appreciated as we have
>> only started to build our Spark expertise yet.
>>
>> Best regards,
>> Jakub Wozniak
>> Software Engineer
>> CERN

Re: Custom datasource as a wrapper for existing ones?

2018-05-03 Thread Wenchen Fan
Hi Jakub,

Yea I think data source would be the most elegant way to solve your
problem. Unfortunately in Spark 2.3 the only stable data source API is data
source v1, which can't be used to implement high-performance data source.
Data source v2 is still a preview version in Spark 2.3 and may change in
the next release.

For now I'd suggest you take a look at `FileFormat`, which is the API for
the Spark builtin file-based data source like parquet. It's an internal API
but has not been changed for a long time. In the future, data source v2
would be the best solution.

Thanks,
Wenchen

On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak  wrote:

> Hello,
>
> Thanks a lot for your answers.
>
> We normally look for some stability so the use of internal APIs that are a
> subject to change with no warning are somewhat questionable.
> As to the approach of putting this functionality on top of Spark instead
> of a datasource - this works but poses a problem for Python.
> In Python we would like to reuse the code written in Java. An external lib
> in Java has to proxy to Python and Spark proxies as well.
> This means passing over objects (like SparkSession) back and forth from
> one jvm to the other. Not surprisingly this did not work for us in the past
> (although we did not push much hoping for the datasource).
> All in all if we don’t find another solution we might go for an external
> library that most likely have to be reimplemented twice in Python…
> Or there might be a way to force our lib execution in the same JVM as
> Spark uses. To be seen… Again the most elegant way would be the datasource.
>
> Cheers,
> Jakub
>
>
> > On 2 May 2018, at 21:07, Jörn Franke  wrote:
> >
> > Some note on the internal API - it used to change with each release
> which was quiet annoying because  other data sources (Avro, HadoopOffice
> etc) had to follow up in this. In the end it is an internal API and thus
> does not guarantee to be stable. If you want to have something stable you
> have to use the official data source APIs with some disadvantages.
> >
> >> On 2. May 2018, at 18:49, jwozniak  wrote:
> >>
> >> Hello,
> >>
> >> At CERN we are developing a Big Data system called NXCALS that uses
> Spark as
> >> Extraction API.
> >> We have implemented a custom datasource that was wrapping 2 existing
> ones
> >> (parquet and Hbase) in order to hide the implementation details
> (location of
> >> the parquet files, hbase tables, etc) and to provide an abstraction
> layer to
> >> our users.
> >> We have entered a stage where we execute some performance tests on our
> data
> >> and we have noticed that this approach did not provide the expected
> >> performance observed using pure Spark. In other words reading a parquet
> file
> >> with some simple predicates behaves 15 times slower if the same code is
> >> executed from within a custom datasource (that just uses Spark to read
> >> parquet).
> >> After some investigation we've learnt that Spark did not apply the same
> >> optimisations for both.
> >> We could see that in Spark 2.3.0 there was a new V2 version that
> abstracts
> >> from SparkSession and focuses on low level Row API.
> >> Could you give us some suggestions of how to correctly implement our
> >> datasource using the V2 API?
> >> Is this a correct way of doing it at all?
> >>
> >> What we want to achieve is to join existing datasources with some level
> of
> >> additional abstraction on top.
> >> At the same time we want to profit from all catalyst & parquet
> optimisations
> >> that exist for the original ones.
> >> We also don't want to reimplement access to parquet files or Hbase at
> the
> >> low level (like Row) but just profit from the Dataset API.
> >> We could have achieved the same by providing an external library on top
> of
> >> Spark but the datasource approach looked like a more elegant solution.
> Only
> >> the performance is still far from the desired one.
> >>
> >> Any help or direction in that matter would be greatly appreciated as we
> have
> >> only started to build our Spark expertise yet.
> >>
> >> Best regards,
> >> Jakub Wozniak
> >> Software Engineer
> >> CERN
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>
>
>


Re: AccumulatorV2 vs AccumulableParam (V1)

2018-05-03 Thread Wenchen Fan
Hi Sergey,

Thanks for your valuable feedback!

For 1: yea this is definitely a bug and I have sent a PR to fix it.
For 2: I have left my comments on the JIRA ticket.
For 3: I don't quite understand it, can you give some concrete examples?
For 4: yea this is a problem, but I think it's not a big deal, and we
couldn't find a better solution at that time.
For 5: I think this is a real problem. It looks to me that we can merge
`isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is
basically just the `copyAndReset`. If there is a way to fix this without
breaking the existing API, I'm really happy to do it.
For 6: same as 4. It's a problem but not a big deal.

In general, I think accumulator v2 sacrifices some flexibility to simplify
the framework and improve the performance. Users can still use accumulator
v1 if flexibility is more important to them. We can keep improving
accumulator v2 without breaking backward compatibility.

Thanks,
Wenchen

On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky 
wrote:

> Hello guys,
>
> I've started to migrate my Spark jobs which use Accumulators V1 to
> AccumulatorV2 and faced with the following issues:
>
> 1. LegacyAccumulatorWrapper now requires the resulting type of
> AccumulableParam to implement equals. In other case the
> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
> will fail with AssertionError (SPARK-23697 [1]).
>
> 2. Existing AccumulatorV2 classes are hardly difficult to extend
> easily and correctly (SPARK-24154 [2]) due to its "copy" method which
> is called during serialization and usually loses type information of
> descendant classes which don't override "copy" (and it's easier to
> implement an accumulator from scratch than override it correctly)
>
> 3. The same instance of AccumulatorV2 cannot be used with the same
> SparkContext multiple times (unlike AccumulableParam) failing with
> "IllegalStateException: Cannot register an Accumulator twice" even
> after "reset" method called. So it's impossible to unregister already
> registered accumulator from user code.
>
> 4. AccumulableParam (V1) implementations are usually more or less
> stateless, while AccumulatorV2 implementations are almost always
> stateful, leading to (unnecessary?) type checks (unlike
> AccumulableParam). For example typical "merge" method of AccumulatorV2
> requires to check whether current accumulator is of an appropriate
> type, like here [3]
>
> 5. AccumulatorV2 is more difficult to implement correctly unlike
> AccumulableParam. For example, in case of AccumulableParam I have to
> implement just 3 methods (addAccumulator, addInPlace, zero), in case
> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
> AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)
>
> 6. AccumulatorV2 classes are hardly possible to be anonymous classes,
> because of their "copy" and "merge" methods which typically require a
> concrete class to make a type check.
>
> I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
> just wondering whether there is a way to simplify the API of
> AccumulatorV2 to meet the points described above and to be less error
> prone?
>
>
> [1] https://issues.apache.org/jira/browse/SPARK-23697
> [2] https://issues.apache.org/jira/browse/SPARK-24154
> [3] https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea107165
> 2293981604/core/src/main/scala/org/apache/spark/util/
> AccumulatorV2.scala#L348
> [4] https://issues.apache.org/jira/browse/SPARK-14654
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>