Re: Spark writing API

2023-08-16 Thread Andrew Melo
Hello Wenchen,

On Wed, Aug 16, 2023 at 23:33 Wenchen Fan  wrote:

> > is there a way to hint to the downstream users on the number of rows
> expected to write?
>
> It will be very hard to do. Spark pipelines the execution (within shuffle
> boundaries) and we can't predict the number of final output rows.
>

Perhaps I don't understand -- even in the case of multiple shuffles, you
can assume that there is exactly one shuffle boundary before the write
operation, and that shuffle boundary knows the number of input rows for
that shuffle. That number of rows has to be, by construction, the upper
bound on the number of rows that will be passed to the writer.

If the writer can be hinted that bound then it can do something smart with
allocating (memory or disk). By comparison, the current API just gives
rows/batches one at a time, and in the case of off-heap allocation (like
with arrow's off-heap storage), it's crazy inefficient to try and do the
equivalent of realloc() to grow the buffer size.

Thanks
Andrew



> On Mon, Aug 7, 2023 at 8:27 PM Steve Loughran 
> wrote:
>
>>
>>
>> On Thu, 1 Jun 2023 at 00:58, Andrew Melo  wrote:
>>
>>> Hi all
>>>
>>> I've been developing for some time a Spark DSv2 plugin "Laurelin" (
>>> https://github.com/spark-root/laurelin
>>> ) to read the ROOT (https://root.cern) file format (which is used in
>>> high energy physics). I've recently presented my work in a conference (
>>> https://indico.jlab.org/event/459/contributions/11603/).
>>>
>>>
>> nice paper given the esoteric nature of HEP file formats.
>>
>> All of that to say,
>>>
>>> A) is there no reason that the builtin (eg parquet) data sources can't
>>> consume the external APIs? It's hard to write a plugin that has to use a
>>> specific API when you're competing with another source who gets access to
>>> the internals directly.
>>>
>>> B) What is the Spark-approved API to code against for to write? There is
>>> a mess of *ColumnWriter classes in the Java namespace, and while there is
>>> no documentation, it's unclear which is preferred by the core (maybe
>>> ArrowWriterColumnVector?). We can give a zero copy write if the API
>>> describes it
>>>
>>
>> There's a dangerous tendency for things that libraries need to be tagged
>> private [spark], normally worked around by people putting their code into
>> org.apache.spark packages. Really everyone who does that should try to get
>> a longer term fix in, as well as that quick-and-effective workaround.
>> Knowing where problems lie would be a good first step. spark sub-modules
>> are probably a place to get insight into where those low-level internal
>> operations are considered important, although many uses may be for historic
>> "we wrote it that way a long time ago" reasons
>>
>>
>>>
>>> C) Putting aside everything above, is there a way to hint to the
>>> downstream users on the number of rows expected to write? Any smart writer
>>> will use off-heap memory to write to disk/memory, so the current API that
>>> shoves rows in doesn't do the trick. You don't want to keep reallocating
>>> buffers constantly
>>>
>>> D) what is sparks plan to use arrow-based columnar data representations?
>>> I see that there a lot of external efforts whose only option is to inject
>>> themselves in the CLASSPATH. The regular DSv2 api is already crippled for
>>> reads and for writes it's even worse. Is there a commitment from the spark
>>> core to bring the API to parity? Or is instead is it just a YMMV commitment
>>>
>>
>> No idea, I'm afraid. I do think arrow makes a good format for processing,
>> and it'd be interesting to see how well it actually works as a wire format
>> to replace other things (e.g hive's protocol), especially on RDMA networks
>> and the like. I'm not up to date with ongoing work there -if anyone has
>> pointers that'd be interesting.
>>
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>
>>>
>>>
>>> --
>>> It's dark in this basement.
>>>
>> --
It's dark in this basement.


Re: Spark writing API

2023-08-02 Thread Andrew Melo
Hello Spark Devs

Could anyone help me with this?

Thanks,
Andrew

On Wed, May 31, 2023 at 20:57 Andrew Melo  wrote:

> Hi all
>
> I've been developing for some time a Spark DSv2 plugin "Laurelin" (
> https://github.com/spark-root/laurelin
> ) to read the ROOT (https://root.cern) file format (which is used in high
> energy physics). I've recently presented my work in a conference (
> https://indico.jlab.org/event/459/contributions/11603/).
>
> All of that to say,
>
> A) is there no reason that the builtin (eg parquet) data sources can't
> consume the external APIs? It's hard to write a plugin that has to use a
> specific API when you're competing with another source who gets access to
> the internals directly.
>
> B) What is the Spark-approved API to code against for to write? There is a
> mess of *ColumnWriter classes in the Java namespace, and while there is no
> documentation, it's unclear which is preferred by the core (maybe
> ArrowWriterColumnVector?). We can give a zero copy write if the API
> describes it
>
> C) Putting aside everything above, is there a way to hint to the
> downstream users on the number of rows expected to write? Any smart writer
> will use off-heap memory to write to disk/memory, so the current API that
> shoves rows in doesn't do the trick. You don't want to keep reallocating
> buffers constantly
>
> D) what is sparks plan to use arrow-based columnar data representations? I
> see that there a lot of external efforts whose only option is to inject
> themselves in the CLASSPATH. The regular DSv2 api is already crippled for
> reads and for writes it's even worse. Is there a commitment from the spark
> core to bring the API to parity? Or is instead is it just a YMMV commitment
>
> Thanks!
> Andrew
>
>
>
>
>
> --
> It's dark in this basement.
>
-- 
It's dark in this basement.


Spark writing API

2023-05-31 Thread Andrew Melo
Hi all

I've been developing for some time a Spark DSv2 plugin "Laurelin" (
https://github.com/spark-root/laurelin
) to read the ROOT (https://root.cern) file format (which is used in high
energy physics). I've recently presented my work in a conference (
https://indico.jlab.org/event/459/contributions/11603/).

All of that to say,

A) is there no reason that the builtin (eg parquet) data sources can't
consume the external APIs? It's hard to write a plugin that has to use a
specific API when you're competing with another source who gets access to
the internals directly.

B) What is the Spark-approved API to code against for to write? There is a
mess of *ColumnWriter classes in the Java namespace, and while there is no
documentation, it's unclear which is preferred by the core (maybe
ArrowWriterColumnVector?). We can give a zero copy write if the API
describes it

C) Putting aside everything above, is there a way to hint to the downstream
users on the number of rows expected to write? Any smart writer will use
off-heap memory to write to disk/memory, so the current API that shoves
rows in doesn't do the trick. You don't want to keep reallocating buffers
constantly

D) what is sparks plan to use arrow-based columnar data representations? I
see that there a lot of external efforts whose only option is to inject
themselves in the CLASSPATH. The regular DSv2 api is already crippled for
reads and for writes it's even worse. Is there a commitment from the spark
core to bring the API to parity? Or is instead is it just a YMMV commitment

Thanks!
Andrew





-- 
It's dark in this basement.


Re: Spark on Kube (virtua) coffee/tea/pop times

2023-02-07 Thread Andrew Melo
I'm Central US time (AKA UTC -6:00)

On Tue, Feb 7, 2023 at 5:32 PM Holden Karau  wrote:
>
> Awesome, I guess I should have asked folks for timezones that they’re in.
>
> On Tue, Feb 7, 2023 at 3:30 PM Andrew Melo  wrote:
>>
>> Hello Holden,
>>
>> We are interested in Spark on k8s and would like the opportunity to
>> speak with devs about what we're looking for slash better ways to use
>> spark.
>>
>> Thanks!
>> Andrew
>>
>> On Tue, Feb 7, 2023 at 5:24 PM Holden Karau  wrote:
>> >
>> > Hi Folks,
>> >
>> > It seems like we could maybe use some additional shared context around 
>> > Spark on Kube so I’d like to try and schedule a virtual coffee session.
>> >
>> > Who all would be interested in virtual adventures around Spark on Kube 
>> > development?
>> >
>> > No pressure if the idea of hanging out in a virtual chat with coffee and 
>> > Spark devs does not sound like your thing, just trying to make something 
>> > informal so we can have a better understanding of everyone’s goals here.
>> >
>> > Cheers,
>> >
>> > Holden :)
>> > --
>> > Twitter: https://twitter.com/holdenkarau
>> > Books (Learning Spark, High Performance Spark, etc.): 
>> > https://amzn.to/2MaRAG9
>> > YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark on Kube (virtua) coffee/tea/pop times

2023-02-07 Thread Andrew Melo
Hello Holden,

We are interested in Spark on k8s and would like the opportunity to
speak with devs about what we're looking for slash better ways to use
spark.

Thanks!
Andrew

On Tue, Feb 7, 2023 at 5:24 PM Holden Karau  wrote:
>
> Hi Folks,
>
> It seems like we could maybe use some additional shared context around Spark 
> on Kube so I’d like to try and schedule a virtual coffee session.
>
> Who all would be interested in virtual adventures around Spark on Kube 
> development?
>
> No pressure if the idea of hanging out in a virtual chat with coffee and 
> Spark devs does not sound like your thing, just trying to make something 
> informal so we can have a better understanding of everyone’s goals here.
>
> Cheers,
>
> Holden :)
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Apache Spark 3.2.2 Release?

2022-07-07 Thread Andrew Ray
+1 (non-binding) Thanks!

On Thu, Jul 7, 2022 at 7:00 AM Yang,Jie(INF)  wrote:

> +1 (non-binding) Thank you Dongjoon ~
>
>
>
> *发件人**: *Ruifeng Zheng 
> *日期**: *2022年7月7日 星期四 16:28
> *收件人**: *dev 
> *主题**: *Re: Apache Spark 3.2.2 Release?
>
>
>
> +1 thank you Dongjoon!
>
>
> --
>
> [image: 图像已被发件人删除。]
>
> Ruifeng Zheng
>
> ruife...@foxmail.com
>
>
>
>
>
>
>
> -- Original --
>
> *From:* "Yikun Jiang" ;
>
> *Date:* Thu, Jul 7, 2022 04:16 PM
>
> *To:* "Mridul Muralidharan";
>
> *Cc:* "Gengliang Wang";"Cheng Su";"Maxim
> Gekk";"Wenchen 
> Fan";"Xiao
> Li";"Xinrong
> Meng";"Yuming Wang" >;"dev";
>
> *Subject:* Re: Apache Spark 3.2.2 Release?
>
>
>
> +1  (non-binding)
>
>
>
> Thanks!
>
>
> Regards,
>
> Yikun
>
>
>
>
>
> On Thu, Jul 7, 2022 at 1:57 PM Mridul Muralidharan 
> wrote:
>
> +1
>
>
>
> Thanks for driving this Dongjoon !
>
>
>
> Regards,
>
> Mridul
>
>
>
> On Thu, Jul 7, 2022 at 12:36 AM Gengliang Wang  wrote:
>
> +1.
>
> Thank you, Dongjoon.
>
>
>
> On Wed, Jul 6, 2022 at 10:21 PM Wenchen Fan  wrote:
>
> +1
>
>
>
> On Thu, Jul 7, 2022 at 10:41 AM Xinrong Meng
>  wrote:
>
> +1
>
>
> Thanks!
>
>
>
> Xinrong Meng
>
> Software Engineer
>
> Databricks
>
>
>
>
>
> On Wed, Jul 6, 2022 at 7:25 PM Xiao Li  wrote:
>
> +1
>
>
>
> Xiao
>
>
>
> Cheng Su  于2022年7月6日周三 19:16写道:
>
> +1 (non-binding)
>
>
>
> Thanks,
>
> Cheng Su
>
>
>
> On Wed, Jul 6, 2022 at 6:01 PM Yuming Wang  wrote:
>
> +1
>
>
>
> On Thu, Jul 7, 2022 at 5:53 AM Maxim Gekk
>  wrote:
>
> +1
>
>
>
> On Thu, Jul 7, 2022 at 12:26 AM John Zhuge  wrote:
>
> +1  Thanks for the effort!
>
>
>
> On Wed, Jul 6, 2022 at 2:23 PM Bjørn Jørgensen 
> wrote:
>
> +1
>
>
>
> ons. 6. jul. 2022, 23:05 skrev Hyukjin Kwon :
>
> Yeah +1
>
>
>
> On Thu, Jul 7, 2022 at 5:40 AM Dongjoon Hyun 
> wrote:
>
> Hi, All.
>
> Since Apache Spark 3.2.1 tag creation (Jan 19), new 197 patches
> including 11 correctness patches arrived at branch-3.2.
>
> Shall we make a new release, Apache Spark 3.2.2, as the third release
> at 3.2 line? I'd like to volunteer as the release manager for Apache
> Spark 3.2.2. I'm thinking about starting the first RC next week.
>
> $ git log --oneline v3.2.1..HEAD | wc -l
>  197
>
> # Correctness issues
>
> SPARK-38075 Hive script transform with order by and limit will
> return fake rows
> SPARK-38204 All state operators are at a risk of inconsistency
> between state partitioning and operator partitioning
> SPARK-38309 SHS has incorrect percentiles for shuffle read bytes
> and shuffle total blocks metrics
> SPARK-38320 (flat)MapGroupsWithState can timeout groups which just
> received inputs in the same microbatch
> SPARK-38614 After Spark update, df.show() shows incorrect
> F.percent_rank results
> SPARK-38655 OffsetWindowFunctionFrameBase cannot find the offset
> row whose input is not null
> SPARK-38684 Stream-stream outer join has a possible correctness
> issue due to weakly read consistent on outer iterators
> SPARK-39061 Incorrect results or NPE when using Inline function
> against an array of dynamically created structs
> SPARK-39107 Silent change in regexp_replace's handling of empty strings
> SPARK-39259 Timestamps returned by now() and equivalent functions
> are not consistent in subqueries
> SPARK-39293 The accumulator of ArrayAggregate should copy the
> intermediate result if string, struct, array, or map
>
> Best,
> Dongjoon.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
> --
>
> John Zhuge
>
>


Re: Apache Spark 3.3 Release

2022-03-16 Thread Andrew Melo
Hello,

I've been trying for a bit to get the following two PRs merged and
into a release, and I'm having some difficulty moving them forward:

https://github.com/apache/spark/pull/34903 - This passes the current
python interpreter to spark-env.sh to allow some currently-unavailable
customization to happen
https://github.com/apache/spark/pull/31774 - This fixes a bug in the
SparkUI reverse proxy-handling code where it does a greedy match for
"proxy" in the URL, and will mistakenly replace the App-ID in the
wrong place.

I'm not exactly sure of how to get attention of PRs that have been
sitting around for a while, but these are really important to our
use-cases, and it would be nice to have them merged in.

Cheers
Andrew

On Wed, Mar 16, 2022 at 6:21 PM Holden Karau  wrote:
>
> I'd like to add/backport the logging in 
> https://github.com/apache/spark/pull/35881 PR so that when users submit 
> issues with dynamic allocation we can better debug what's going on.
>
> On Wed, Mar 16, 2022 at 3:45 PM Chao Sun  wrote:
>>
>> There is one item on our side that we want to backport to 3.3:
>> - vectorized DELTA_BYTE_ARRAY/DELTA_LENGTH_BYTE_ARRAY encodings for
>> Parquet V2 support (https://github.com/apache/spark/pull/35262)
>>
>> It's already reviewed and approved.
>>
>> On Wed, Mar 16, 2022 at 9:13 AM Tom Graves  
>> wrote:
>> >
>> > It looks like the version hasn't been updated on master and still shows 
>> > 3.3.0-SNAPSHOT, can you please update that.
>> >
>> > Tom
>> >
>> > On Wednesday, March 16, 2022, 01:41:00 AM CDT, Maxim Gekk 
>> >  wrote:
>> >
>> >
>> > Hi All,
>> >
>> > I have created the branch for Spark 3.3:
>> > https://github.com/apache/spark/commits/branch-3.3
>> >
>> > Please, backport important fixes to it, and if you have some doubts, ping 
>> > me in the PR. Regarding new features, we are still building the allow list 
>> > for branch-3.3.
>> >
>> > Best regards,
>> > Max Gekk
>> >
>> >
>> > On Wed, Mar 16, 2022 at 5:51 AM Dongjoon Hyun  
>> > wrote:
>> >
>> > Yes, I agree with you for your whitelist approach for backporting. :)
>> > Thank you for summarizing.
>> >
>> > Thanks,
>> > Dongjoon.
>> >
>> >
>> > On Tue, Mar 15, 2022 at 4:20 PM Xiao Li  wrote:
>> >
>> > I think I finally got your point. What you want to keep unchanged is the 
>> > branch cut date of Spark 3.3. Today? or this Friday? This is not a big 
>> > deal.
>> >
>> > My major concern is whether we should keep merging the feature work or the 
>> > dependency upgrade after the branch cut. To make our release time more 
>> > predictable, I am suggesting we should finalize the exception PR list 
>> > first, instead of merging them in an ad hoc way. In the past, we spent a 
>> > lot of time on the revert of the PRs that were merged after the branch 
>> > cut. I hope we can minimize unnecessary arguments in this release. Do you 
>> > agree, Dongjoon?
>> >
>> >
>> >
>> > Dongjoon Hyun  于2022年3月15日周二 15:55写道:
>> >
>> > That is not totally fine, Xiao. It sounds like you are asking a change of 
>> > plan without a proper reason.
>> >
>> > Although we cut the branch Today according our plan, you still can collect 
>> > the list and make a list of exceptions. I'm not blocking what you want to 
>> > do.
>> >
>> > Please let the community start to ramp down as we agreed before.
>> >
>> > Dongjoon
>> >
>> >
>> >
>> > On Tue, Mar 15, 2022 at 3:07 PM Xiao Li  wrote:
>> >
>> > Please do not get me wrong. If we don't cut a branch, we are allowing all 
>> > patches to land Apache Spark 3.3. That is totally fine. After we cut the 
>> > branch, we should avoid merging the feature work. In the next three days, 
>> > let us collect the actively developed PRs that we want to make an 
>> > exception (i.e., merged to 3.3 after the upcoming branch cut). Does that 
>> > make sense?
>> >
>> > Dongjoon Hyun  于2022年3月15日周二 14:54写道:
>> >
>> > Xiao. You are working against what you are saying.
>> > If you don't cut a branch, it means you are allowing all patches to land 
>> > Apache Spark 3.3. No?
>> >
>> > > we need to avoid backporting the feature work that are not being well 
>> > > discussed.
>> >
>> >
>> >
>> &g

Re: Time to start publishing Spark Docker Images?

2021-08-17 Thread Andrew Melo
Hi Mich,

By default, pip caches downloaded binaries to somewhere like
$HOME/.cache/pip. So after doing any "pip install", you'll want to either
delete that directory, or pass the "--no-cache-dir" option to pip to
prevent the download binaries from being added to the image.

HTH
Andrew

On Tue, Aug 17, 2021 at 2:29 PM Mich Talebzadeh 
wrote:

> Hi Andrew,
>
> Can you please elaborate on blowing pip cache before committing the layer?
>
> Thanks,
>
> Much
>
> On Tue, 17 Aug 2021 at 16:57, Andrew Melo  wrote:
>
>> Silly Q, did you blow away the pip cache before committing the layer?
>> That always trips me up.
>>
>> Cheers
>> Andrew
>>
>> On Tue, Aug 17, 2021 at 10:56 Mich Talebzadeh 
>> wrote:
>>
>>> With no additional python packages etc we get 1.4GB compared to 2.19GB
>>> before
>>>
>>> REPOSITORY   TAG  IMAGE ID
>>>  CREATED  SIZE
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8only   faee4dbb95dd
>>>  Less than a second ago   1.41GB
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337
>>>  4 hours ago  2.19GB
>>>
>>> root@233a81199b43:/opt/spark/work-dir# pip list
>>> Package   Version
>>> - ---
>>> asn1crypto0.24.0
>>> cryptography  2.6.1
>>> entrypoints   0.3
>>> keyring   17.1.1
>>> keyrings.alt  3.1.1
>>> pip   21.2.4
>>> pycrypto  2.6.1
>>> PyGObject 3.30.4
>>> pyxdg 0.25
>>> SecretStorage 2.3.1
>>> setuptools57.4.0
>>> six   1.12.0
>>> wheel 0.32.3
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 16:24, Mich Talebzadeh 
>>> wrote:
>>>
>>>> Yes, I will double check. it includes java 8 in addition to base java
>>>> 11.
>>>>
>>>> in addition it has these Python packages for now (added for my own
>>>> needs for now)
>>>>
>>>> root@ce6773017a14:/opt/spark/work-dir# pip list
>>>> Package   Version
>>>> - ---
>>>> asn1crypto0.24.0
>>>> cryptography  2.6.1
>>>> cx-Oracle 8.2.1
>>>> entrypoints   0.3
>>>> keyring   17.1.1
>>>> keyrings.alt  3.1.1
>>>> numpy 1.21.2
>>>> pip   21.2.4
>>>> py4j  0.10.9
>>>> pycrypto  2.6.1
>>>> PyGObject 3.30.4
>>>> pyspark   3.1.2
>>>> pyxdg 0.25
>>>> PyYAML5.4.1
>>>> SecretStorage 2.3.1
>>>> setuptools57.4.0
>>>> six   1.12.0
>>>> wheel 0.32.3
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 17 Aug 2021 at 16:17, Maciej  wrote:
>>>>
>>>>> Quick question ‒ is this actual output? If so, do we know what
>>>>> accounts 1.5GB overhead for PySpark image. Even without
>>>>> --no-install-recommends this seems like a lot (if I recall correctly
>>>>> it was around 400MB for existing images).
>>>>>
>>>>>
>>>>> On 8/17/21 2:24 PM, Mich Talebzadeh wrote:
>>>&

Re: Time to start publishing Spark Docker Images?

2021-08-17 Thread Andrew Melo
Silly Q, did you blow away the pip cache before committing the layer? That
always trips me up.

Cheers
Andrew

On Tue, Aug 17, 2021 at 10:56 Mich Talebzadeh 
wrote:

> With no additional python packages etc we get 1.4GB compared to 2.19GB
> before
>
> REPOSITORY   TAG  IMAGE ID
>  CREATED  SIZE
> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8only   faee4dbb95dd
>  Less than a second ago   1.41GB
> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337   4
> hours ago  2.19GB
>
> root@233a81199b43:/opt/spark/work-dir# pip list
> Package   Version
> - ---
> asn1crypto0.24.0
> cryptography  2.6.1
> entrypoints   0.3
> keyring   17.1.1
> keyrings.alt  3.1.1
> pip   21.2.4
> pycrypto  2.6.1
> PyGObject 3.30.4
> pyxdg 0.25
> SecretStorage 2.3.1
> setuptools57.4.0
> six   1.12.0
> wheel 0.32.3
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 17 Aug 2021 at 16:24, Mich Talebzadeh 
> wrote:
>
>> Yes, I will double check. it includes java 8 in addition to base java 11.
>>
>> in addition it has these Python packages for now (added for my own needs
>> for now)
>>
>> root@ce6773017a14:/opt/spark/work-dir# pip list
>> Package   Version
>> - ---
>> asn1crypto0.24.0
>> cryptography  2.6.1
>> cx-Oracle 8.2.1
>> entrypoints   0.3
>> keyring   17.1.1
>> keyrings.alt  3.1.1
>> numpy 1.21.2
>> pip   21.2.4
>> py4j  0.10.9
>> pycrypto  2.6.1
>> PyGObject 3.30.4
>> pyspark   3.1.2
>> pyxdg 0.25
>> PyYAML5.4.1
>> SecretStorage 2.3.1
>> setuptools57.4.0
>> six   1.12.0
>> wheel 0.32.3
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 17 Aug 2021 at 16:17, Maciej  wrote:
>>
>>> Quick question ‒ is this actual output? If so, do we know what accounts
>>> 1.5GB overhead for PySpark image. Even without --no-install-recommends
>>> this seems like a lot (if I recall correctly it was around 400MB for
>>> existing images).
>>>
>>>
>>> On 8/17/21 2:24 PM, Mich Talebzadeh wrote:
>>>
>>> Examples:
>>>
>>> *docker images*
>>>
>>> REPOSITORY   TAG  IMAGE ID
>>>  CREATED  SIZE
>>>
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337   2
>>> minutes ago2.19GB
>>>
>>> spark3.1.1-scala_2.12-java11  4595c4e78879   18
>>> minutes ago   635MB
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 10:31, Mich Talebzadeh 
>>> wrote:
>>>
>>>> 3.1.2_sparkpy_3.7-scala_2.12-java11
>>>>
>>>> 3.1.2_sparkR_3.6-scala_2.12-java11
>>>> Yes let us go with that and remember that we can change the tags
>>>> anytime. The accompanying release note should detail what is inside the
>>>&

Re: WholeStageCodeGen + DSv2

2021-05-19 Thread Andrew Melo
As it turns out, I also commented on the same Jira (and forgot about
it until just now).

On Wed, May 19, 2021 at 8:32 AM Shubham Chaurasia
 wrote:
>
> Hi,
>
> I remember creating one for a similar scenario in the past - 
> https://issues.apache.org/jira/browse/SPARK-29372.
>
> Thanks,
> Shubham
>
> On Wed, May 19, 2021 at 5:34 PM Takeshi Yamamuro  
> wrote:
>>
>> hi, Andrew,
>>
>> Welcome any improvement proposal for that.
>> Could you file an issue in jira first to show us your idea and an example 
>> query
>> to reproduce the issue you described?
>>
>> Bests,
>> Takeshi
>>
>> On Wed, May 19, 2021 at 11:38 AM Andrew Melo  wrote:
>>>
>>> Hello,
>>>
>>> When reading a very wide (> 1000 cols) input, WholeStageCodeGen blows
>>> past the 64kB source limit and fails. Looking at the generated code, a
>>> big part of the code is simply the DSv2 convention that the codegen'd
>>> variable names are the same as the columns instead of something more
>>> compact like 'c1', 'c2', etc..
>>>
>>> Would there be any interest in accepting a patch that shortens these
>>> variable names to try and stay under the limit?
>>>
>>> Thanks
>>> Andrew
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



WholeStageCodeGen + DSv2

2021-05-18 Thread Andrew Melo
Hello,

When reading a very wide (> 1000 cols) input, WholeStageCodeGen blows
past the 64kB source limit and fails. Looking at the generated code, a
big part of the code is simply the DSv2 convention that the codegen'd
variable names are the same as the columns instead of something more
compact like 'c1', 'c2', etc..

Would there be any interest in accepting a patch that shortens these
variable names to try and stay under the limit?

Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Secrets store for DSv2

2021-05-18 Thread Andrew Melo
Hello,

When implementing a DSv2 datasource, where is an appropriate place to
store/transmit secrets from the driver to the executors? Is there
built-in spark functionality for that, or is my best bet to stash it
as a member variable in one of the classes that gets sent to the
executors?

Thanks!
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Support pandas API layer on PySpark

2021-03-16 Thread Andrew Melo
Hi,

Integrating Koalas with pyspark might help enable a richer integration
between the two. Something that would be useful with a tighter
integration is support for custom column array types. Currently, Spark
takes dataframes, converts them to arrow buffers then transmits them
over the socket to Python. On the other side, pyspark takes the arrow
buffer and converts it to a Pandas dataframe. Unfortunately, the
default Pandas representation of a list-type for a column causes it to
turn what was contiguous value/offset arrays in Arrow into
deserialized Python objects for each row. Obviously, this kills
performance.

A PR to extend the pyspark API to elide the pandas conversion
(https://github.com/apache/spark/pull/26783) was submitted and
rejected, which is unfortunate, but perhaps this proposed integration
would provide the hooks via Pandas' ExtensionArray interface to allow
Spark to performantly interchange jagged/ragged lists to/from python
UDFs.

Cheers
Andrew

On Tue, Mar 16, 2021 at 8:15 PM Hyukjin Kwon  wrote:
>
> Thank you guys for all your feedback. I will start working on SPIP with 
> Koalas team.
> I would expect the SPIP can be sent late this week or early next week.
>
>
> I inlined and answered the questions unanswered as below:
>
> Is the community developing the pandas API layer for Spark interested in 
> being part of Spark or do they prefer having their own release cycle?
>
> Yeah, Koalas team used to have its own release cycle to develop and move 
> quickly.
> Now it became pretty mature with reaching 1.7.0, and the team thinks that 
> it’s now
> fine to have less frequent releases, and they are happy to work together with 
> Spark with
> contributing to it. The active contributors in the Koalas community will 
> continue to
> make the contributions in Spark.
>
> How about test code? Does it fit into the PySpark test framework?
>
> Yes, this will be one of the places where it needs some efforts. Koalas 
> currently uses pytest
> with various dependency version combinations (e.g., Python version, conda vs 
> pip) whereas
> PySpark uses the plain unittests with less dependency version combinations.
>
> For pytest in Koalas <> unittests in PySpark:
>
>   I am currently thinking we will have to convert the Koalas tests to use 
> unittests to match
>   with PySpark for now.
>   It is a feasible option for PySpark to migrate to pytest too but it will 
> need extra effort to
>   make it working with our own PySpark testing framework seamlessly.
>   Koalas team (presumably and likely I) will take a look in any event.
>
> For the combinations of dependency versions:
>
>   Due to the lack of the resources in GitHub Actions, I currently plan to 
> just add the
>   Koalas tests into the matrix PySpark is currently using.
>
> one question I have; what’s an initial goal of the proposal?
> Is that to port all the pandas interfaces that Koalas has already implemented?
> Or, the basic set of them?
>
> The goal of the proposal is to port all of Koalas project into PySpark.
> For example,
>
> import koalas
>
> will be equivalent to
>
> # Names, etc. might change in the final proposal or during the review
> from pyspark.sql import pandas
>
> Koalas supports pandas APIs with a separate layer to cover a bit of 
> difference between
> DataFrame structures in pandas and PySpark, e.g.) other types as column names 
> (labels),
> index (something like row number in DBMSs) and so on. So I think it would 
> make more sense
> to port the whole layer instead of a subset of the APIs.
>
>
>
>
>
> 2021년 3월 17일 (수) 오전 12:32, Wenchen Fan 님이 작성:
>>
>> +1, it's great to have Pandas support in Spark out of the box.
>>
>> On Tue, Mar 16, 2021 at 10:12 PM Takeshi Yamamuro  
>> wrote:
>>>
>>> +1; the pandas interfaces are pretty popular and supporting them in pyspark 
>>> looks promising, I think.
>>> one question I have; what's an initial goal of the proposal?
>>> Is that to port all the pandas interfaces that Koalas has already 
>>> implemented?
>>> Or, the basic set of them?
>>>
>>> On Tue, Mar 16, 2021 at 1:44 AM Ismaël Mejía  wrote:
>>>>
>>>> +1
>>>>
>>>> Bringing a Pandas API for pyspark to upstream Spark will only bring
>>>> benefits for everyone (more eyes to use/see/fix/improve the API) as
>>>> well as better alignment with core Spark improvements, the extra
>>>> weight looks manageable.
>>>>
>>>> On Mon, Mar 15, 2021 at 4:45 PM Nicholas Chammas
>>>>  wrote:
>>>> >
>>>> > On Mon, Mar 15, 2021 at 2:12 AM Reynold Xin  wrote:
>>>> >&

Re: [DISCUSS] SPIP: FunctionCatalog

2021-02-16 Thread Andrew Melo
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue  wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear 
> path forward for calling functions. Even without a prototype, the `invoke` 
> plans show that Wenchen's suggested optimization can be done, and 
> incorporating it as an optional extension to this proposal solves many of the 
> unknowns.
>
> With that area now understood, is there any discussion about other parts of 
> the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun  wrote:
>>
>> This is an important feature which can unblock several other projects 
>> including bucket join support for DataSource v2, complete support for 
>> enforcing DataSource v2 distribution requirements on the write path, etc. I 
>> like Ryan's proposals which look simple and elegant, with nice support on 
>> function overloading and variadic arguments. On the other hand, I think 
>> Wenchen made a very good point about performance. Overall, I'm excited to 
>> see active discussions on this topic and believe the community will come to 
>> a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon  wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh 님이 작성:
>>>>
>>>> Basically I think the proposal makes sense to me and I'd like to support 
>>>> the
>>>> SPIP as it looks like we have strong need for the important feature.
>>>>
>>>> Thanks Ryan for working on this and I do also look forward to Wenchen's
>>>> implementation. Thanks for the discussion too.
>>>>
>>>> Actually I think the SupportsInvoke proposed by Ryan looks a good
>>>> alternative to me. Besides Wenchen's alternative implementation, is there a
>>>> chance we also have the SupportsInvoke for comparison?
>>>>
>>>>
>>>> John Zhuge wrote
>>>> > Excited to see our Spark community rallying behind this important 
>>>> > feature!
>>>> >
>>>> > The proposal lays a solid foundation of minimal feature set with careful
>>>> > considerations for future optimizations and extensions. Can't wait to see
>>>> > it leading to more advanced functionalities like views with shared custom
>>>> > functions, function pushdown, lambda, etc. It has already borne fruit 
>>>> > from
>>>> > the constructive collaborations in this thread. Looking forward to
>>>> > Wenchen's prototype and further discussions including the SupportsInvoke
>>>> > extension proposed by Ryan.
>>>> >
>>>> >
>>>> > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley 
>>>>
>>>> > owen.omalley@
>>>>
>>>> > 
>>>> > wrote:
>>>> >
>>>> >> I think this proposal is a very good thing giving Spark a standard way 
>>>> >> of
>>>> >> getting to and calling UDFs.
>>>> >>
>>>> >> I like having the ScalarFunction as the API to call the UDFs. It is
>>>> >> simple, yet covers all of the polymorphic type cases well. I think it
>>>> >> would
>>>> >> also simplify using the functions in other contexts like pushing down
>>>> >> filters into the ORC & Parquet readers although there are a lot of
>>>> >> details
>>>> >> that would need to be considered there.
>>>> >>
>>>> >> .. Owen
>>>> >>
>>>> >>
>>>> >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen 
>>>>
>>>> > ekrogen@.com
>>>>
>>>> > 
>>>> >> wrote:
>>>> >>
>>>> >>> I agree that there is a strong need for a FunctionCatalog within Spark
>>>> >>> to
>>>> >>> provide support for shareable UDFs, as well as make movement towards
>>>> >>> more
>>>> >>> advanced functionality like views which themselves depend on UDFs, so I
>>>> >>> 

Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2020-06-24 Thread Andrew Melo
Hello,

On Wed, Jun 24, 2020 at 2:13 PM Holden Karau  wrote:
>
> So I thought our theory for the pypi packages was it was for local 
> developers, they really shouldn't care about the Hadoop version. If you're 
> running on a production cluster you ideally pip install from the same release 
> artifacts as your production cluster to match.

That's certainly one use of pypi packages, but not the only one. In
our case, we provide clusters for our users, with SPARK_CONF pre
configured with (e.g.) the master connection URL. But the analyses
they're doing are their own and unique, so they work in their own
personal python virtual environments. There are no "release artifacts"
to publish, per-se, since each user is working independently and can
install whatever they'd like into their virtual environments.

Cheers
Andrew

>
> On Wed, Jun 24, 2020 at 12:11 PM Wenchen Fan  wrote:
>>
>> Shall we start a new thread to discuss the bundled Hadoop version in 
>> PySpark? I don't have a strong opinion on changing the default, as users can 
>> still download the Hadoop 2.7 version.
>>
>> On Thu, Jun 25, 2020 at 2:23 AM Dongjoon Hyun  
>> wrote:
>>>
>>> To Xiao.
>>> Why Apache project releases should be blocked by PyPi / CRAN? It's 
>>> completely optional, isn't it?
>>>
>>> > let me repeat my opinion:  the top priority is to provide two options 
>>> for PyPi distribution
>>>
>>> IIRC, Apache Spark 3.0.0 fails to upload to CRAN and this is not the first 
>>> incident. Apache Spark already has a history of missing SparkR uploading. 
>>> We don't say Spark 3.0.0 fails due to CRAN uploading or other non-Apache 
>>> distribution channels. In short, non-Apache distribution channels cannot be 
>>> a `blocker` for Apache project releases. We only do our best for the 
>>> community.
>>>
>>> SPARK-32017 (Make Pyspark Hadoop 3.2+ Variant available in PyPI) is really 
>>> irrelevant to this PR. If someone wants to do that and the PR is ready, why 
>>> don't we do it in `Apache Spark 3.0.1 timeline`? Why do we wait for 
>>> December? Is there a reason why we need to wait?
>>>
>>> To Sean
>>> Thanks!
>>>
>>> To Nicholas.
>>> Do you think `pip install pyspark` is version-agnostic? In the Python 
>>> world, `pip install somepackage` fails frequently. In production, you 
>>> should use `pip install somepackage==specificversion`. I don't think the 
>>> production pipeline has non-versinoned Python package installation.
>>>
>>> The bottom line is that the PR doesn't change PyPi uploading, the AS-IS PR 
>>> keeps Hadoop 2.7 on PySpark due to Xiao's comments. I don't think there is 
>>> a blocker for that PR.
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>> On Wed, Jun 24, 2020 at 10:54 AM Nicholas Chammas 
>>>  wrote:
>>>>
>>>> To rephrase my earlier email, PyPI users would care about the bundled 
>>>> Hadoop version if they have a workflow that, in effect, looks something 
>>>> like this:
>>>>
>>>> ```
>>>> pip install pyspark
>>>> pyspark --packages org.apache.hadoop:hadoop-aws:2.7.7
>>>> spark.read.parquet('s3a://...')
>>>> ```
>>>>
>>>> I agree that Hadoop 3 would be a better default (again, the s3a support is 
>>>> just much better). But to Xiao's point, if you are expecting Spark to work 
>>>> with some package like hadoop-aws that assumes an older version of Hadoop 
>>>> bundled with Spark, then changing the default may break your workflow.
>>>>
>>>> In the case of hadoop-aws the fix is simple--just bump hadoop-aws:2.7.7 to 
>>>> hadoop-aws:3.2.1. But perhaps there are other PyPI-based workflows that 
>>>> would be more difficult to repair. 路‍♂️
>>>>
>>>> On Wed, Jun 24, 2020 at 1:44 PM Sean Owen  wrote:
>>>>>
>>>>> I'm also genuinely curious when PyPI users would care about the
>>>>> bundled Hadoop jars - do we even need two versions? that itself is
>>>>> extra complexity for end users.
>>>>> I do think Hadoop 3 is the better choice for the user who doesn't
>>>>> care, and better long term.
>>>>> OK but let's at least move ahead with changing defaults.
>>>>>
>>>>> On Wed, Jun 24, 2020 at 12:38 PM Xiao Li  wrote:
>>>>> >
>>>>> > Hi, Dongjoon,
>>>>> >
>>>>> > Please do not misinterp

Re: DSv2 & DataSourceRegister

2020-04-16 Thread Andrew Melo
Hi again,

Does anyone have thoughts on either the idea or the implementation?

Thanks,
Andrew

On Thu, Apr 9, 2020 at 11:32 PM Andrew Melo  wrote:
>
> Hi all,
>
> I've opened a WIP PR here https://github.com/apache/spark/pull/28159
> I'm a novice at Scala, so I'm sure the code isn't idiomatic, but it
> behaves functionally how I'd expect. I've added unit tests to the PR,
> but if you would like to verify the intended functionality, I've
> uploaded a fat jar with my datasource to
> http://mirror.accre.vanderbilt.edu/spark/laurelin-both.jar and an
> example input file to
> https://github.com/spark-root/laurelin/raw/master/testdata/stdvector.root.
> The following in spark-shell successfully chooses the proper plugin
> implementation based on the spark version:
>
> spark.read.format("root").option("tree","tvec").load("stdvector.root")
>
> Additionally, I did a very rough POC for spark2.4, which you can find
> at https://github.com/PerilousApricot/spark/tree/feature/registerv2-24
> . The same jar/inputfile works there as well.
>
> Thanks again,
> Andrew
>
> On Wed, Apr 8, 2020 at 10:27 AM Andrew Melo  wrote:
> >
> > On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
> > >
> > > It would be good to support your use case, but I'm not sure how to 
> > > accomplish that. Can you open a PR so that we can discuss it in detail? 
> > > How can `public Class getImplementation();` be 
> > > possible in 3.0 as there is no `DataSourceV2`?
> >
> > You're right, that was a typo. Since the whole point is to separate
> > the (stable) registration interface from the (evolving) DSv2 API, it
> > defeats the purpose to then directly reference the DSv2 API within the
> > registration interface.
> >
> > I'll put together a PR.
> >
> > Thanks again,
> > Andrew
> >
> > >
> > > On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
> > >>
> > >> Hello
> > >>
> > >> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
> > >>>
> > >>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm 
> > >>> not sure this is possible as the DS V2 API is very different in 3.0, 
> > >>> e.g. there is no `DataSourceV2` anymore, and you should implement 
> > >>> `TableProvider` (if you don't have database/table).
> > >>
> > >>
> > >> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a 
> > >> toplevel Root_v24 (implements DataSourceV2) and Root_v30 (implements 
> > >> TableProvider). I can load this jar in a both pyspark 2.4 and 3.0 and it 
> > >> works well -- as long as I remove the registration from META-INF and 
> > >> pass in the full class name to the DataFrameReader.
> > >>
> > >> Thanks
> > >> Andrew
> > >>
> > >>>
> > >>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  
> > >>> wrote:
> > >>>>
> > >>>> Hi Ryan,
> > >>>>
> > >>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
> > >>>> >
> > >>>> > Hi Andrew,
> > >>>> >
> > >>>> > With DataSourceV2, I recommend plugging in a catalog instead of 
> > >>>> > using DataSource. As you've noticed, the way that you plug in data 
> > >>>> > sources isn't very flexible. That's one of the reasons why we 
> > >>>> > changed the plugin system and made it possible to use named catalogs 
> > >>>> > that load implementations based on configuration properties.
> > >>>> >
> > >>>> > I think it's fine to consider how to patch the registration trait, 
> > >>>> > but I really don't recommend continuing to identify table 
> > >>>> > implementations directly by name.
> > >>>>
> > >>>> Can you be a bit more concrete with what you mean by plugging a
> > >>>> catalog instead of a DataSource? We have been using
> > >>>> sc.read.format("root").load([list of paths]) which works well. Since
> > >>>> we don't have a database or tables, I don't fully understand what's
> > >>>> different between the two interfaces that would make us prefer one or
> > >>>> another.
> > >>>>
> > >>>> That being said, WRT the registration trait, if I'm not misreading
> > >>>> 

Re: DSv2 & DataSourceRegister

2020-04-09 Thread Andrew Melo
Hi all,

I've opened a WIP PR here https://github.com/apache/spark/pull/28159
I'm a novice at Scala, so I'm sure the code isn't idiomatic, but it
behaves functionally how I'd expect. I've added unit tests to the PR,
but if you would like to verify the intended functionality, I've
uploaded a fat jar with my datasource to
http://mirror.accre.vanderbilt.edu/spark/laurelin-both.jar and an
example input file to
https://github.com/spark-root/laurelin/raw/master/testdata/stdvector.root.
The following in spark-shell successfully chooses the proper plugin
implementation based on the spark version:

spark.read.format("root").option("tree","tvec").load("stdvector.root")

Additionally, I did a very rough POC for spark2.4, which you can find
at https://github.com/PerilousApricot/spark/tree/feature/registerv2-24
. The same jar/inputfile works there as well.

Thanks again,
Andrew

On Wed, Apr 8, 2020 at 10:27 AM Andrew Melo  wrote:
>
> On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
> >
> > It would be good to support your use case, but I'm not sure how to 
> > accomplish that. Can you open a PR so that we can discuss it in detail? How 
> > can `public Class getImplementation();` be 
> > possible in 3.0 as there is no `DataSourceV2`?
>
> You're right, that was a typo. Since the whole point is to separate
> the (stable) registration interface from the (evolving) DSv2 API, it
> defeats the purpose to then directly reference the DSv2 API within the
> registration interface.
>
> I'll put together a PR.
>
> Thanks again,
> Andrew
>
> >
> > On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
> >>
> >> Hello
> >>
> >> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
> >>>
> >>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not 
> >>> sure this is possible as the DS V2 API is very different in 3.0, e.g. 
> >>> there is no `DataSourceV2` anymore, and you should implement 
> >>> `TableProvider` (if you don't have database/table).
> >>
> >>
> >> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel 
> >> Root_v24 (implements DataSourceV2) and Root_v30 (implements 
> >> TableProvider). I can load this jar in a both pyspark 2.4 and 3.0 and it 
> >> works well -- as long as I remove the registration from META-INF and pass 
> >> in the full class name to the DataFrameReader.
> >>
> >> Thanks
> >> Andrew
> >>
> >>>
> >>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
> >>>>
> >>>> Hi Ryan,
> >>>>
> >>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
> >>>> >
> >>>> > Hi Andrew,
> >>>> >
> >>>> > With DataSourceV2, I recommend plugging in a catalog instead of using 
> >>>> > DataSource. As you've noticed, the way that you plug in data sources 
> >>>> > isn't very flexible. That's one of the reasons why we changed the 
> >>>> > plugin system and made it possible to use named catalogs that load 
> >>>> > implementations based on configuration properties.
> >>>> >
> >>>> > I think it's fine to consider how to patch the registration trait, but 
> >>>> > I really don't recommend continuing to identify table implementations 
> >>>> > directly by name.
> >>>>
> >>>> Can you be a bit more concrete with what you mean by plugging a
> >>>> catalog instead of a DataSource? We have been using
> >>>> sc.read.format("root").load([list of paths]) which works well. Since
> >>>> we don't have a database or tables, I don't fully understand what's
> >>>> different between the two interfaces that would make us prefer one or
> >>>> another.
> >>>>
> >>>> That being said, WRT the registration trait, if I'm not misreading
> >>>> createTable() and friends, the "source" parameter is resolved the same
> >>>> way as DataFrameReader.format(), so a solution that helps out
> >>>> registration should help both interfaces.
> >>>>
> >>>> Thanks again,
> >>>> Andrew
> >>>>
> >>>> >
> >>>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  
> >>>> > wrote:
> >>>> >>
> >>>> >> Hi all,
> >>>> >>
> >>>> >> I posted an improveme

Re: DSv2 & DataSourceRegister

2020-04-08 Thread Andrew Melo
On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
>
> It would be good to support your use case, but I'm not sure how to accomplish 
> that. Can you open a PR so that we can discuss it in detail? How can `public 
> Class getImplementation();` be possible in 3.0 as 
> there is no `DataSourceV2`?

You're right, that was a typo. Since the whole point is to separate
the (stable) registration interface from the (evolving) DSv2 API, it
defeats the purpose to then directly reference the DSv2 API within the
registration interface.

I'll put together a PR.

Thanks again,
Andrew

>
> On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
>>
>> Hello
>>
>> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
>>>
>>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not 
>>> sure this is possible as the DS V2 API is very different in 3.0, e.g. there 
>>> is no `DataSourceV2` anymore, and you should implement `TableProvider` (if 
>>> you don't have database/table).
>>
>>
>> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel 
>> Root_v24 (implements DataSourceV2) and Root_v30 (implements TableProvider). 
>> I can load this jar in a both pyspark 2.4 and 3.0 and it works well -- as 
>> long as I remove the registration from META-INF and pass in the full class 
>> name to the DataFrameReader.
>>
>> Thanks
>> Andrew
>>
>>>
>>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
>>>>
>>>> Hi Ryan,
>>>>
>>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>>>> >
>>>> > Hi Andrew,
>>>> >
>>>> > With DataSourceV2, I recommend plugging in a catalog instead of using 
>>>> > DataSource. As you've noticed, the way that you plug in data sources 
>>>> > isn't very flexible. That's one of the reasons why we changed the plugin 
>>>> > system and made it possible to use named catalogs that load 
>>>> > implementations based on configuration properties.
>>>> >
>>>> > I think it's fine to consider how to patch the registration trait, but I 
>>>> > really don't recommend continuing to identify table implementations 
>>>> > directly by name.
>>>>
>>>> Can you be a bit more concrete with what you mean by plugging a
>>>> catalog instead of a DataSource? We have been using
>>>> sc.read.format("root").load([list of paths]) which works well. Since
>>>> we don't have a database or tables, I don't fully understand what's
>>>> different between the two interfaces that would make us prefer one or
>>>> another.
>>>>
>>>> That being said, WRT the registration trait, if I'm not misreading
>>>> createTable() and friends, the "source" parameter is resolved the same
>>>> way as DataFrameReader.format(), so a solution that helps out
>>>> registration should help both interfaces.
>>>>
>>>> Thanks again,
>>>> Andrew
>>>>
>>>> >
>>>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  
>>>> > wrote:
>>>> >>
>>>> >> Hi all,
>>>> >>
>>>> >> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>>>> >> send an email to the dev list for discussion.
>>>> >>
>>>> >> As the DSv2 API evolves, some breaking changes are occasionally made
>>>> >> to the API. It's possible to split a plugin into a "common" part and
>>>> >> multiple version-specific parts and this works OK to have a single
>>>> >> artifact for users, as long as they write out the fully qualified
>>>> >> classname as the DataFrame format(). The one part that can't be
>>>> >> currently worked around is the DataSourceRegister trait. Since classes
>>>> >> which implement DataSourceRegister must also implement DataSourceV2
>>>> >> (and its mixins), changes to those interfaces cause the ServiceLoader
>>>> >> to fail when it attempts to load the "wrong" DataSourceV2 class.
>>>> >> (there's also an additional prohibition against multiple
>>>> >> implementations having the same ShortName in
>>>> >> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>>>&g

Re: DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hello

On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:

> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not
> sure this is possible as the DS V2 API is very different in 3.0, e.g. there
> is no `DataSourceV2` anymore, and you should implement `TableProvider` (if
> you don't have database/table).
>

Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel
Root_v24 (implements DataSourceV2) and Root_v30 (implements TableProvider).
I can load this jar in a both pyspark 2.4 and 3.0 and it works well -- as
long as I remove the registration from META-INF and pass in the full class
name to the DataFrameReader.

Thanks
Andrew


> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
>
>> Hi Ryan,
>>
>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>> >
>> > Hi Andrew,
>> >
>> > With DataSourceV2, I recommend plugging in a catalog instead of using
>> DataSource. As you've noticed, the way that you plug in data sources isn't
>> very flexible. That's one of the reasons why we changed the plugin system
>> and made it possible to use named catalogs that load implementations based
>> on configuration properties.
>> >
>> > I think it's fine to consider how to patch the registration trait, but
>> I really don't recommend continuing to identify table implementations
>> directly by name.
>>
>> Can you be a bit more concrete with what you mean by plugging a
>> catalog instead of a DataSource? We have been using
>> sc.read.format("root").load([list of paths]) which works well. Since
>> we don't have a database or tables, I don't fully understand what's
>> different between the two interfaces that would make us prefer one or
>> another.
>>
>> That being said, WRT the registration trait, if I'm not misreading
>> createTable() and friends, the "source" parameter is resolved the same
>> way as DataFrameReader.format(), so a solution that helps out
>> registration should help both interfaces.
>>
>> Thanks again,
>> Andrew
>>
>> >
>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo 
>> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>> >> send an email to the dev list for discussion.
>> >>
>> >> As the DSv2 API evolves, some breaking changes are occasionally made
>> >> to the API. It's possible to split a plugin into a "common" part and
>> >> multiple version-specific parts and this works OK to have a single
>> >> artifact for users, as long as they write out the fully qualified
>> >> classname as the DataFrame format(). The one part that can't be
>> >> currently worked around is the DataSourceRegister trait. Since classes
>> >> which implement DataSourceRegister must also implement DataSourceV2
>> >> (and its mixins), changes to those interfaces cause the ServiceLoader
>> >> to fail when it attempts to load the "wrong" DataSourceV2 class.
>> >> (there's also an additional prohibition against multiple
>> >> implementations having the same ShortName in
>> >>
>> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>> >> This means users will need to update their notebooks/code/tutorials if
>> >> they run @ a different site whose cluster is a different version.
>> >>
>> >> To solve this, I proposed in SPARK-31363 a new trait who would
>> >> function the same as the existing DataSourceRegister trait, but adds
>> >> an additional method:
>> >>
>> >> public Class getImplementation();
>> >>
>> >> ...which will allow DSv2 plugins to dynamically choose the appropriate
>> >> DataSourceV2 class based on the runtime environment. This would let us
>> >> release a single artifact for different Spark versions and users could
>> >> use the same artifactID & format regardless of where they were
>> >> executing their code. If there was no services registered with this
>> >> new trait, the functionality would remain the same as before.
>> >>
>> >> I think this functionality will be useful as DSv2 continues to evolve,
>> >> please let me know your thoughts.
>> >>
>> >> Thanks
>> >> Andrew
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > --
>> > Ryan Blue
>> > Software Engineer
>> > Netflix
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hi Ryan,

On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>
> Hi Andrew,
>
> With DataSourceV2, I recommend plugging in a catalog instead of using 
> DataSource. As you've noticed, the way that you plug in data sources isn't 
> very flexible. That's one of the reasons why we changed the plugin system and 
> made it possible to use named catalogs that load implementations based on 
> configuration properties.
>
> I think it's fine to consider how to patch the registration trait, but I 
> really don't recommend continuing to identify table implementations directly 
> by name.

Can you be a bit more concrete with what you mean by plugging a
catalog instead of a DataSource? We have been using
sc.read.format("root").load([list of paths]) which works well. Since
we don't have a database or tables, I don't fully understand what's
different between the two interfaces that would make us prefer one or
another.

That being said, WRT the registration trait, if I'm not misreading
createTable() and friends, the "source" parameter is resolved the same
way as DataFrameReader.format(), so a solution that helps out
registration should help both interfaces.

Thanks again,
Andrew

>
> On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  wrote:
>>
>> Hi all,
>>
>> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>> send an email to the dev list for discussion.
>>
>> As the DSv2 API evolves, some breaking changes are occasionally made
>> to the API. It's possible to split a plugin into a "common" part and
>> multiple version-specific parts and this works OK to have a single
>> artifact for users, as long as they write out the fully qualified
>> classname as the DataFrame format(). The one part that can't be
>> currently worked around is the DataSourceRegister trait. Since classes
>> which implement DataSourceRegister must also implement DataSourceV2
>> (and its mixins), changes to those interfaces cause the ServiceLoader
>> to fail when it attempts to load the "wrong" DataSourceV2 class.
>> (there's also an additional prohibition against multiple
>> implementations having the same ShortName in
>> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>> This means users will need to update their notebooks/code/tutorials if
>> they run @ a different site whose cluster is a different version.
>>
>> To solve this, I proposed in SPARK-31363 a new trait who would
>> function the same as the existing DataSourceRegister trait, but adds
>> an additional method:
>>
>> public Class getImplementation();
>>
>> ...which will allow DSv2 plugins to dynamically choose the appropriate
>> DataSourceV2 class based on the runtime environment. This would let us
>> release a single artifact for different Spark versions and users could
>> use the same artifactID & format regardless of where they were
>> executing their code. If there was no services registered with this
>> new trait, the functionality would remain the same as before.
>>
>> I think this functionality will be useful as DSv2 continues to evolve,
>> please let me know your thoughts.
>>
>> Thanks
>> Andrew
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hi all,

I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
send an email to the dev list for discussion.

As the DSv2 API evolves, some breaking changes are occasionally made
to the API. It's possible to split a plugin into a "common" part and
multiple version-specific parts and this works OK to have a single
artifact for users, as long as they write out the fully qualified
classname as the DataFrame format(). The one part that can't be
currently worked around is the DataSourceRegister trait. Since classes
which implement DataSourceRegister must also implement DataSourceV2
(and its mixins), changes to those interfaces cause the ServiceLoader
to fail when it attempts to load the "wrong" DataSourceV2 class.
(there's also an additional prohibition against multiple
implementations having the same ShortName in
org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
This means users will need to update their notebooks/code/tutorials if
they run @ a different site whose cluster is a different version.

To solve this, I proposed in SPARK-31363 a new trait who would
function the same as the existing DataSourceRegister trait, but adds
an additional method:

public Class getImplementation();

...which will allow DSv2 plugins to dynamically choose the appropriate
DataSourceV2 class based on the runtime environment. This would let us
release a single artifact for different Spark versions and users could
use the same artifactID & format regardless of where they were
executing their code. If there was no services registered with this
new trait, the functionality would remain the same as before.

I think this functionality will be useful as DSv2 continues to evolve,
please let me know your thoughts.

Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-14 Thread Andrew Melo
Hi Sean

On Fri, Mar 13, 2020 at 6:46 PM Sean Owen  wrote:

> Do you really need a new cluster per user? and if so, why specify N
> workers > M machines? I am not seeing a need for that. I don't even
> think 2 workers on the same host makes sense, as they are both
> managing the same resources; it only exists for test purposes AFAICT.
>

Sorry, I'm from a completely different field, so I've inherited a
completely different vocabulary. So thanks for bearing with me :)

I think from reading your response, maybe the confusion is that HTCondor is
a completely different resource acquisition model than what industry is
familiar with. Unlike AWS that gives you a whole VM or k8s that gives you a
whole container, condor (and most other batch schedulers) split up a single
bare machine that your job shares with whatever else is on that machine.
You don't get your own machine or even the illusion you have your own
machine (via containerization).

Using these schedulers it's not that you ask for N workers when there's
only M machines, you request N x 8core slots when there are M cores
available, and the scheduler packs them wherever there's free resources.

> What you are trying to do sounds like one cluster, not many. JVMs

> can't be shared across users; JVM = executor. But that's a good thing,
> or else there would be all kinds of collisions.


> What pools are you referring to?


If you're talking about the 2nd half, let's say I'm running two pyspark
notebooks connected to the system above, and batch scheduler gives each of
them 2 cores of slaves. Each notebook will have their own set (which I
called a pool earlier) of slaves, so when you're working in one notebook,
the other notebook of slaves is idle. My comment was about the resources
being idle and the desire to increase utillzation.

Thanks
Andrew

Sean
>
> On Fri, Mar 13, 2020 at 6:33 PM Andrew Melo  wrote:
> >
> > Hi Xingbo, Sean,
> >
> > On Fri, Mar 13, 2020 at 12:31 PM Xingbo Jiang 
> wrote:
> >>
> >> Andrew, could you provide more context of your use case please? Is it
> like you deploy homogeneous containers on hosts with available resources,
> and each container launches one worker? Or you deploy workers directly on
> hosts thus you could have multiple workers from the same application on the
> same host?
> >
> >
> > Sure, I describe a bit more detail about the actual workload below [*],
> but the short version is that our computing resources/infrastructure are
> all built around batch submission into (usually) the HTCondor scheduler,
> and we've got a PoC using pyspark to replace the interactive portion of
> data analysis. To run pyspark on our main resources, we use some scripts
> around the standalone mode to spin up N slaves per-user**, which may or may
> not end up on the same host. I understood Xingbo's original mail to mean
> that wouldn't be allowed in the future, but from Sean's response, it seems
> like I'm incorrect.
> >
> > That being said, our use-case is very bursty, and it would be very good
> if there was a way we could have one pool of JVMs that could be shared
> between N different concurrent users instead of having N different pools of
> JVMs that each serve one person. We're already resource constrained, and
> we're expecting our data rates to increase 10x in 2026, so the less idle
> CPU, the better for us.
> >
> > Andrew
> >
> > * I work for one of the LHC experiments at CERN (https://cms.cern/) and
> there's two main "phases" of our data pipeline: production and analysis.
> The analysis half is currently implemented by having users writing some
> software, splitting the input dataset(s) into N parts and then submitting
> those jobs to the batch system (combining the outputs in a manual
> postprocessing step). In terms of scale, there are currently ~100 users
> running ~900 tasks over ~50k cpus. The use case relevant to this context is
> the terminal analysis phase which involves calculating some additional
> columns, applying calibrations, filtering out the 'interesting' events and
> extracting histograms describing those events. Conceptually, it's an
> iterative process of "extract plots, look at plots, change parameters", but
> running in a batch system means the latency is bad, so it can take a long
> time to converge to the right set of params.
> >
> > ** though we have much smaller, dedicated k8s/mesos/yarn clusters we use
> for prototyping
> >
> >>
> >> Thanks,
> >>
> >> Xingbo
> >>
> >> On Fri, Mar 13, 2020 at 10:23 AM Sean Owen  wrote:
> >>>
> >>> You have multiple workers in one Spark (standalone) app? this wouldn't
> >>> prevent N apps from each having a worker on a m

Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-13 Thread Andrew Melo
Hi Xingbo, Sean,

On Fri, Mar 13, 2020 at 12:31 PM Xingbo Jiang  wrote:

> Andrew, could you provide more context of your use case please? Is it like
> you deploy homogeneous containers on hosts with available resources, and
> each container launches one worker? Or you deploy workers directly on hosts
> thus you could have multiple workers from the same application on the same
> host?
>

Sure, I describe a bit more detail about the actual workload below [*], but
the short version is that our computing resources/infrastructure are all
built around batch submission into (usually) the HTCondor scheduler, and
we've got a PoC using pyspark to replace the interactive portion of data
analysis. To run pyspark on our main resources, we use some scripts around
the standalone mode to spin up N slaves per-user**, which may or may not
end up on the same host. I understood Xingbo's original mail to mean that
wouldn't be allowed in the future, but from Sean's response, it seems like
I'm incorrect.

That being said, our use-case is very bursty, and it would be very good if
there was a way we could have one pool of JVMs that could be shared between
N different concurrent users instead of having N different pools of JVMs
that each serve one person. We're already resource constrained, and we're
expecting our data rates to increase 10x in 2026, so the less idle CPU, the
better for us.

Andrew

* I work for one of the LHC experiments at CERN (https://cms.cern/)
and there's two main "phases" of our data pipeline: production and
analysis. The analysis half is currently implemented by having users
writing some software, splitting the input dataset(s) into N parts and then
submitting those jobs to the batch system (combining the outputs in a
manual postprocessing step). In terms of scale, there are currently ~100
users running ~900 tasks over ~50k cpus. The use case relevant to this
context is the terminal analysis phase which involves calculating some
additional columns, applying calibrations, filtering out the 'interesting'
events and extracting histograms describing those events. Conceptually,
it's an iterative process of "extract plots, look at plots, change
parameters", but running in a batch system means the latency is bad, so it
can take a long time to converge to the right set of params.

** though we have much smaller, dedicated k8s/mesos/yarn clusters we use
for prototyping


> Thanks,
>
> Xingbo
>
> On Fri, Mar 13, 2020 at 10:23 AM Sean Owen  wrote:
>
>> You have multiple workers in one Spark (standalone) app? this wouldn't
>> prevent N apps from each having a worker on a machine.
>>
>> On Fri, Mar 13, 2020 at 11:51 AM Andrew Melo 
>> wrote:
>> >
>> > Hello,
>> >
>> > On Fri, Feb 28, 2020 at 13:21 Xingbo Jiang 
>> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Based on my experience, there is no scenario that necessarily requires
>> deploying multiple Workers on the same node with Standalone backend. A
>> worker should book all the resources reserved to Spark on the host it is
>> launched, then it can allocate those resources to one or more executors
>> launched by this worker. Since each executor runs in a separated JVM, we
>> can limit the memory of each executor to avoid long GC pause.
>> >>
>> >> The remaining concern is the local-cluster mode is implemented by
>> launching multiple workers on the local host, we might need to re-implement
>> LocalSparkCluster to launch only one Worker and multiple executors. It
>> should be fine because local-cluster mode is only used in running Spark
>> unit test cases, thus end users should not be affected by this change.
>> >>
>> >> Removing multiple workers on the same host support could simplify the
>> deploy model of Standalone backend, and also reduce the burden to support
>> legacy deploy pattern in the future feature developments. (There is an
>> example in https://issues.apache.org/jira/browse/SPARK-27371 , where we
>> designed a complex approach to coordinate resource requirements from
>> different workers launched on the same host).
>> >>
>> >> The proposal is to update the document to deprecate the support of
>> system environment `SPARK_WORKER_INSTANCES` in Spark 3.0, and remove the
>> support in the next major version (Spark 3.1).
>> >>
>> >> Please kindly let me know if you have use cases relying on this
>> feature.
>> >
>> >
>> > When deploying spark on batch systems (by wrapping the standalone
>> deployment in scripts that can be consumed by the batch scheduler), we
>> typically end up with >1 worker per host. If I understand correctly, this
>> proposal would make our use case unsupported.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> >
>> >>
>> >> Thanks!
>> >>
>> >> Xingbo
>> >
>> > --
>> > It's dark in this basement.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-13 Thread Andrew Melo
Hello,

On Fri, Feb 28, 2020 at 13:21 Xingbo Jiang  wrote:

> Hi all,
>
> Based on my experience, there is no scenario that necessarily requires
> deploying multiple Workers on the same node with Standalone backend. A
> worker should book all the resources reserved to Spark on the host it is
> launched, then it can allocate those resources to one or more executors
> launched by this worker. Since each executor runs in a separated JVM, we
> can limit the memory of each executor to avoid long GC pause.
>
> The remaining concern is the local-cluster mode is implemented by
> launching multiple workers on the local host, we might need to re-implement
> LocalSparkCluster to launch only one Worker and multiple executors. It
> should be fine because local-cluster mode is only used in running Spark
> unit test cases, thus end users should not be affected by this change.
>
> Removing multiple workers on the same host support could simplify the
> deploy model of Standalone backend, and also reduce the burden to support
> legacy deploy pattern in the future feature developments. (There is an
> example in https://issues.apache.org/jira/browse/SPARK-27371 , where we
> designed a complex approach to coordinate resource requirements from
> different workers launched on the same host).
>
> The proposal is to update the document to deprecate the support of system
> environment `SPARK_WORKER_INSTANCES` in Spark 3.0, and remove the support
> in the next major version (Spark 3.1).
>
> Please kindly let me know if you have use cases relying on this feature.
>

When deploying spark on batch systems (by wrapping the standalone
deployment in scripts that can be consumed by the batch scheduler), we
typically end up with >1 worker per host. If I understand correctly, this
proposal would make our use case unsupported.

Thanks,
Andrew




> Thanks!
>
> Xingbo
>
-- 
It's dark in this basement.


Re: Enabling push-based shuffle in Spark

2020-01-27 Thread Long, Andrew
The easiest would be to create a fork of the code in github.   I can also 
accept diffs.

Cheers Andrew

From: Min Shen 
Date: Monday, January 27, 2020 at 12:48 PM
To: "Long, Andrew" , "dev@spark.apache.org" 

Subject: Re: Enabling push-based shuffle in Spark

Hi Andrew,

We are leveraging SPARK-6237 to control the off-heap memory consumption due to 
Netty.
With that change, the data is processed in a streaming fashion so Netty does 
not buffer an entire RPC in memory before handing it over to RPCHandler.
We tested with our internal stress testing framework, and did not see much 
change in the memory consumption of the shuffle service.
In terms of sharing the code, not sure what would be an effective way to do 
that.
If interested, maybe we can call a meeting to chat in more depth.

Best,
Min

On Mon, Jan 27, 2020 at 11:30 AM Long, Andrew 
mailto:loand...@amazon.com>> wrote:
Hey Min,

One thing of concern would be off heap memory utilization due to netty.  
Depending on the number of connections that you create.

Would it be possible to take a look at your code?  My team has a performance 
test harness that I'd like to test it with.

Cheers Andrew



On 1/23/20, 10:25 AM, "mshen" mailto:ms...@apache.org>> wrote:

Hi Wenchen,

Glad to know that you like this idea.
We also looked into making this pluggable in our early design phase.
While the ShuffleManager API for pluggable shuffle systems does provide
quite some room for customized behaviors for Spark shuffle, we feel that it
is still not enough for this case.

Right now, the shuffle block location information is tracked inside
MapOutputTracker and updated by DAGScheduler.
Since we are relocating the shuffle blocks to improve overall shuffle
throughput and efficiency, being able to update the information tracked
inside MapOutputTracker so reducers can access their shuffle input more
efficiently is thus necessary.
Letting DAGScheduler orchestrate this process also provides the benefit of
better coping with stragglers.
If DAGScheduler has no control or is agnostic of the block push progress, it
does leave a few gaps.

On the shuffle Netty protocol side, there are a lot that can be leveraged
from the existing code.
With improvements in SPARK-24355 and SPARK-30512, the shuffle service Netty
server is becoming much more reliable.
The work in SPARK-6237 also provided quite some leverage for streaming push
of shuffle blocks.
Instead of building all of these from scratch, we took the alternative route
of building on top of the existing Netty protocol to implement the shuffle
block push operation.

We feel that this design has the potential of further improving Spark
shuffle system's scalability and efficiency, making Spark an even better
compute engine.
Would like to explore how we can leverage the shuffle plugin API to make
this design more acceptable.



-
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>




Re: How to implement a "saveAsBinaryFile" function?

2020-01-16 Thread Long, Andrew
Hey Bing,

There’s a couple different approaches you could take.  The quickest and easiest 
would be to use the existing APIs

val bytes = spark.range(1000

bytes.foreachPartition(bytes =>{
  //W ARNING anything used in here will need to be serializable.
  // There's some magic to serializing the hadoop conf. see the hadoop wrapper 
class in the source
  val writer = FileSystem.get(null).create(new Path("s3://..."))
  bytes.foreach(b => writer.write(b))
  writer.close()
})

The more complicated but pretty approach would be to either implement a custom 
datasource.

From: "Duan,Bing" 
Date: Thursday, January 16, 2020 at 12:35 AM
To: "dev@spark.apache.org" 
Subject: How to implement a "saveAsBinaryFile" function?

Hi all:

I read binary data(protobuf format) from filesystem by binaryFiles function to 
a RDD[Array[Byte]]   it works fine. But when I save the it to filesystem by 
saveAsTextFile, the quotation mark was be escaped like this:
"\"20192_1\"",1,24,0,2,"\"S66.000x001\””,which  should be 
"20192_1",1,24,0,2,”S66.000x001”.

Anyone could give me some tip to implement a function like saveAsBinaryFile to 
persist the RDD[Array[Byte]]?

Bests!

Bing


Re: SortMergeJoinExec: Utilizing child partitioning when joining

2020-01-07 Thread Long, Andrew
“Where can I find information on how to run standard performance 
tests/benchmarks?“

The grand standard is spark-sql-perf and in particular the tpc-ds benchmark. 
Most of the big optimization teams are using this as the primary benchmark.  
One word of warning is that most groups have also extended this to add entirely 
new types of benchmarks which are not in open source but the core tpc-ds 
benchmark will get you most of the way there.

https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/
https://github.com/databricks/spark-sql-perf
http://www.tpc.org/tpcds/

“Are performance degradations to existing queries that are fixable by new 
equivalent queries not allowed for a new major spark version”

The general rule of thumb for my group (which is NOT databricks) is, as long as 
the geomean of tpcds increases you’re fine as long as you don’t break any 
existing queries.  For example regressing a couple queries by 5% is fine BUT 
causing a query that would have previously run to crash is not ok. Additionally 
we have a sample of user queries +etl processes that we try not to break either.

Cheers Andrew

From: Brett Marcott 
Date: Tuesday, January 7, 2020 at 12:00 AM
To: "Long, Andrew" 
Cc: "dev@spark.apache.org" 
Subject: Re: SortMergeJoinExec: Utilizing child partitioning when joining

1. Where can I find information on how to run standard performance 
tests/benchmarks?
2. Are performance degradations to existing queries that are fixable by new 
equivalent queries not allowed for a new major spark version?

On Thu, Jan 2, 2020 at 3:05 PM Brett Marcott 
mailto:brett.marc...@gmail.com>> wrote:
Thanks for the response Andrew.

1. The approach
The approach I mentioned will not introduce any new skew, so it should only be 
worsen performance if the user was relying on the shuffle to fix skew they had 
before.
The user can address this by either not introducing their own skewed partition 
in the first place, or repartitioning with less skew again before the join.
Today the user cannot change partitioning without changing the join condition 
in a hacky way:joinkey1 >= joinkey2 && joinkey1 <= joinkey2

The condition I mentioned below ensures that the same keys on left and right 
formed their respective subsets:
  left and right partition expressions have the same subset (with regards 
to indices) of their respective join keys

I don't believe EnsureRequirements will require any changes, just what the 
Exec's are saying is required.

2. output partitionings
Yea I got as far as you mentioned, but I didn't at first get why for outer 
joins only one side is used.
Now however, I think it makes sense because for outer joins you may be 
introducing nulls for at least one side, which makes that sides partitioning 
invalid right?

Warn New Year Regards,
Brett

On Thu, Jan 2, 2020 at 2:28 PM Long, Andrew 
mailto:loand...@amazon.com>> wrote:
“Thoughts on this approach?“

Just to warn you this is a hazardous optimization without cardinality 
information. Removing columns from the hash exchange reduces entropy 
potentially resulting in skew. Also keep in mind that if you reduce the number 
of columns on one side of the join you need todo it on the other. This will 
require you to rewrite EnsureRequirements or add a special case to detect this.

As a word of warning there’s a whole bunch of subtle things that 
EnsureRequirements is doing and its really easy to unintentionally create 
performance regressions while making improvements in other areas.

“Could someone help explain why the different join types have different output 
partitionings“

Long story short when a join happens the join exec zips together the partitions 
of the left and right side so that one partition of the join has the elements 
of the left and right.  In the case of an inner join this means that that the 
resulting RDD is now partitioned by both the left join keys and the right join 
keys.  I’d suggest taking a look at the join execs and take a look at how they 
build the result RDD from the partitions of the left and right RDDs.(see 
doExecute(…))  left/right outer does look surprising though.

You should see something like…

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>


Cheers Andrew

From: Brett Marcott mailto:brett.marc...@gmail.com>>
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

Hi all,

I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771

My initial idea for a fix is to change SortMergeJoinExec's (and 
ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of 
keys for partitioning:
left and right children's output partiti

Re: SortMergeJoinExec: Utilizing child partitioning when joining

2020-01-02 Thread Long, Andrew
“Thoughts on this approach?“

Just to warn you this is a hazardous optimization without cardinality 
information. Removing columns from the hash exchange reduces entropy 
potentially resulting in skew. Also keep in mind that if you reduce the number 
of columns on one side of the join you need todo it on the other. This will 
require you to rewrite EnsureRequirements or add a special case to detect this.

As a word of warning there’s a whole bunch of subtle things that 
EnsureRequirements is doing and its really easy to unintentionally create 
performance regressions while making improvements in other areas.

“Could someone help explain why the different join types have different output 
partitionings“

Long story short when a join happens the join exec zips together the partitions 
of the left and right side so that one partition of the join has the elements 
of the left and right.  In the case of an inner join this means that that the 
resulting RDD is now partitioned by both the left join keys and the right join 
keys.  I’d suggest taking a look at the join execs and take a look at how they 
build the result RDD from the partitions of the left and right RDDs.(see 
doExecute(…))  left/right outer does look surprising though.

You should see something like…

left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>


Cheers Andrew

From: Brett Marcott 
Date: Tuesday, December 31, 2019 at 11:49 PM
To: "dev@spark.apache.org" 
Subject: SortMergeJoinExec: Utilizing child partitioning when joining

Hi all,

I found this jira for an issue I ran into recently:
https://issues.apache.org/jira/browse/SPARK-28771

My initial idea for a fix is to change SortMergeJoinExec's (and 
ShuffledHashJoinExec) requiredChildDistribution.

At least if all below conditions are met, we could only require a subset of 
keys for partitioning:
left and right children's output partitionings are hashpartitioning with same 
numpartitions
left and right partition expressions have the same subset (with regards to 
indices) of their respective join keys

If that subset of keys is returned by requiredChildDistribution, then 
EnsureRequirements.ensureDistributionAndOrdering would not add a shuffle stage, 
hence reusing the children's partitioning.

1.Thoughts on this approach?

2. Could someone help explain why the different join types have different 
output partitionings in 
SortMergeJoinExec.outputPartitioning<https://github.com/apache/spark/blob/cdcd43cbf2479b258f4c5cfa0f6306f475d25cf2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L85-L96>?

Thanks,
Brett




Re: how to get partition column info in Data Source V2 writer

2019-12-17 Thread Andrew Melo
Hi Aakash

On Tue, Dec 17, 2019 at 12:42 PM aakash aakash 
wrote:

> Hi Spark dev folks,
>
> First of all kudos on this new Data Source v2, API looks simple and it
> makes easy to develop a new data source and use it.
>
> With my current work, I am trying to implement a new data source V2 writer
> with Spark 2.3 and I was wondering how I will get the info about partition
> by columns. I see that it has been passed to Data Source V1 from
> DataFrameWriter but not for V2.
>

Not directly related to your Q, but just so you're aware, the DSv2 API
evolved from 2.3->2.4 and then again for 2.4->3.0.

Cheers
Andrew


>
>
> Thanks,
> Aakash
>


CR for adding bucket join support to V2 Datasources

2019-11-18 Thread Long, Andrew
Hey Friends,

I recently created a pull request to add an optional support for bucket joins 
to V2 Datasources, via a concrete class representing the Spark Style ash 
Partitioning. If anyone has some free time Id appreciate a code review.  This 
also adds a concrete implementation of V2 ClusteredDistribution to make 
specifying Clustered Distributionseasier.

https://github.com/apache/spark/pull/26511

Cheers Andrew



Re: DSv2 reader lifecycle

2019-11-06 Thread Andrew Melo
Hi Ryan,

Thanks for the pointers

On Thu, Nov 7, 2019 at 8:13 AM Ryan Blue  wrote:

> Hi Andrew,
>
> This is expected behavior for DSv2 in 2.4. A separate reader is configured
> for each operation because the configuration will change. A count, for
> example, doesn't need to project any columns, but a count distinct will.
> Similarly, if your read has different filters we need to apply those to a
> separate reader for each scan.
>

Ah, I presumed that the interaction was slightly different, there was a
single reader configured and (e.g.) pruneSchema was called repeatedly to
change the desired output schema. I guess for 2.4 it's best for me to
cache/memoize the metadata for paths/files to keep them from being
repeatedly calculated.


>
> The newer API that we are releasing in Spark 3.0 addresses the concern
> that each reader is independent by using Catalog and Table interfaces. In
> the new version, Spark will load a table by name from a persistent catalog
> (loaded once) and use the table to create a reader for each operation. That
> way, you can load common metadata in the table, cache the table, and pass
> its info to readers as they are created.
>

That's good to know, I'll search around JIRA for docs describing that
functionality.

Thanks again,
Andrew


>
> rb
>
> On Tue, Nov 5, 2019 at 4:58 PM Andrew Melo  wrote:
>
>> Hello,
>>
>> During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears
>> that our DataSourceReader is being instantiated multiple times for the same
>> dataframe. For example, the following snippet
>>
>> Dataset df = spark
>> .read()
>> .format("edu.vanderbilt.accre.laurelin.Root")
>> .option("tree",  "Events")
>> .load("testdata/pristine/2018nanoaod1june2019.root");
>>
>> Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
>> createReader once (as an aside, this seems like a lot for 1000 columns?
>> "CodeGenerator: Code generated in 8162.847517 ms")
>>
>> but then running operations on that dataframe (e.g. df.count()) calls
>> createReader for each call, instead of holding the existing
>> DataSourceReader.
>>
>> Is that the expected behavior? Because of the file format, it's quite
>> expensive to deserialize all the various metadata, so I was holding the
>> deserialized version in the DataSourceReader, but if Spark is repeatedly
>> constructing new ones, then that doesn't help. If this is the expected
>> behavior, how should I handle this as a consumer of the API?
>>
>> Thanks!
>> Andrew
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


DSv2 reader lifecycle

2019-11-05 Thread Andrew Melo
Hello,

During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears that
our DataSourceReader is being instantiated multiple times for the same
dataframe. For example, the following snippet

Dataset df = spark
.read()
.format("edu.vanderbilt.accre.laurelin.Root")
.option("tree",  "Events")
.load("testdata/pristine/2018nanoaod1june2019.root");

Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
createReader once (as an aside, this seems like a lot for 1000 columns?
"CodeGenerator: Code generated in 8162.847517 ms")

but then running operations on that dataframe (e.g. df.count()) calls
createReader for each call, instead of holding the existing
DataSourceReader.

Is that the expected behavior? Because of the file format, it's quite
expensive to deserialize all the various metadata, so I was holding the
deserialized version in the DataSourceReader, but if Spark is repeatedly
constructing new ones, then that doesn't help. If this is the expected
behavior, how should I handle this as a consumer of the API?

Thanks!
Andrew


Re: Exposing functions to pyspark

2019-10-08 Thread Andrew Melo
Hello again,

Is it possible to grab a handle to the underlying DataSourceReader
backing a DataFrame? I see that there's no nice way to add extra
methods to Dataset, so being able to grab the DataSource backing
the dataframe would be a good escape hatch.

Cheers
Andrew

On Mon, Sep 30, 2019 at 3:48 PM Andrew Melo  wrote:
>
> Hello,
>
> I'm working on a DSv2 implementation with a userbase that is 100% pyspark 
> based.
>
> There's some interesting additional DS-level functionality I'd like to
> expose from the Java side to pyspark -- e.g. I/O metrics, which source
> site provided the data, etc...
>
> Does someone have an example of how to expose that to pyspark? We
> provide a python library for scientists to use, so I can also provide
> the python half, I just don't know where to begin. Part of the mental
> issue I'm having is that when a user does the following in pyspark:
>
> df = spark.read.format('edu.vanderbilt.accre.laurelin.Root') \
> .option("tree", "tree") \
> .load('small-flat-tree.root')
>
> They don't have a reference to any of my DS objects -- "df" is a
> DataFrame object, which I don't own.
>
> Does anyone have a tip?
> Thanks
> Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Exposing functions to pyspark

2019-09-30 Thread Andrew Melo
Hello,

I'm working on a DSv2 implementation with a userbase that is 100% pyspark based.

There's some interesting additional DS-level functionality I'd like to
expose from the Java side to pyspark -- e.g. I/O metrics, which source
site provided the data, etc...

Does someone have an example of how to expose that to pyspark? We
provide a python library for scientists to use, so I can also provide
the python half, I just don't know where to begin. Part of the mental
issue I'm having is that when a user does the following in pyspark:

df = spark.read.format('edu.vanderbilt.accre.laurelin.Root') \
.option("tree", "tree") \
.load('small-flat-tree.root')

They don't have a reference to any of my DS objects -- "df" is a
DataFrame object, which I don't own.

Does anyone have a tip?
Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Thoughts on Spark 3 release, or a preview release

2019-09-13 Thread Andrew Melo
Hi Spark Aficionados-

On Fri, Sep 13, 2019 at 15:08 Ryan Blue  wrote:

> +1 for a preview release.
>
> DSv2 is quite close to being ready. I can only think of a couple issues
> that we need to merge, like getting a fix for stats estimation done. I'll
> have a better idea once I've caught up from being away for ApacheCon and
> I'll add this to the agenda for our next DSv2 sync on Wednesday.
>

What does 3.0 mean for the DSv2 API? Does the API freeze at that point, or
would it still be allowed to change? I'm writing a DSv2 plug-in
(GitHub.com/spark-root/laurelin) and there's a couple little API things I
think could be useful, I've just not had time to write here/open a JIRA
about.

Thanks
Andrew


> On Fri, Sep 13, 2019 at 12:26 PM Dongjoon Hyun 
> wrote:
>
>> Ur, Sean.
>>
>> I prefer a full release like 2.0.0-preview.
>>
>> https://archive.apache.org/dist/spark/spark-2.0.0-preview/
>>
>> And, thank you, Xingbo!
>> Could you take a look at website generation? It seems to be broken on
>> `master`.
>>
>> Bests,
>> Dongjoon.
>>
>>
>> On Fri, Sep 13, 2019 at 11:30 AM Xingbo Jiang 
>> wrote:
>>
>>> Hi all,
>>>
>>> I would like to volunteer to be the release manager of Spark 3 preview,
>>> thanks!
>>>
>>> Sean Owen  于2019年9月13日周五 上午11:21写道:
>>>
>>>> Well, great to hear the unanimous support for a Spark 3 preview
>>>> release. Now, I don't know how to make releases myself :) I would
>>>> first open it up to our revered release managers: would anyone be
>>>> interested in trying to make one? sounds like it's not too soon to get
>>>> what's in master out for evaluation, as there aren't any major
>>>> deficiencies left, although a number of items to consider for the
>>>> final release.
>>>>
>>>> I think we just need one release, targeting Hadoop 3.x / Hive 2.x in
>>>> order to make it possible to test with JDK 11. (We're only on Scala
>>>> 2.12 at this point.)
>>>>
>>>> On Thu, Sep 12, 2019 at 7:32 PM Reynold Xin 
>>>> wrote:
>>>> >
>>>> > +1! Long due for a preview release.
>>>> >
>>>> >
>>>> > On Thu, Sep 12, 2019 at 5:26 PM, Holden Karau 
>>>> wrote:
>>>> >>
>>>> >> I like the idea from the PoV of giving folks something to start
>>>> testing against and exploring so they can raise issues with us earlier in
>>>> the process and we have more time to make calls around this.
>>>> >>
>>>> >> On Thu, Sep 12, 2019 at 4:15 PM John Zhuge 
>>>> wrote:
>>>> >>>
>>>> >>> +1  Like the idea as a user and a DSv2 contributor.
>>>> >>>
>>>> >>> On Thu, Sep 12, 2019 at 4:10 PM Jungtaek Lim 
>>>> wrote:
>>>> >>>>
>>>> >>>> +1 (as a contributor) from me to have preview release on Spark 3
>>>> as it would help to test the feature. When to cut preview release is
>>>> questionable, as major works are ideally to be done before that - if we are
>>>> intended to introduce new features before official release, that should
>>>> work regardless of this, but if we are intended to have opportunity to test
>>>> earlier, ideally it should.
>>>> >>>>
>>>> >>>> As a one of contributors in structured streaming area, I'd like to
>>>> add some items for Spark 3.0, both "must be done" and "better to have". For
>>>> "better to have", I pick some items for new features which committers
>>>> reviewed couple of rounds and dropped off without soft-reject (No valid
>>>> reason to stop). For Spark 2.4 users, only added feature for structured
>>>> streaming is Kafka delegation token. (given we assume revising Kafka
>>>> consumer pool as improvement) I hope we provide some gifts for structured
>>>> streaming users in Spark 3.0 envelope.
>>>> >>>>
>>>> >>>> > must be done
>>>> >>>> * SPARK-26154 Stream-stream joins - left outer join gives
>>>> inconsistent output
>>>> >>>> It's a correctness issue with multiple users reported, being
>>>> reported at Nov. 2018. There's a way to reproduce it consistently, and we
>>>> have a patch submitted at Jan. 2019 to fix it.
>>>> >>>>
>>>> >&g

Fwd: Custom aggregations: modular and lightweight solutions?

2019-08-21 Thread Andrew Leverentz
Hi All,

Apologies for cross-posting this, but I'm wondering if the dev list might
be a better place for my questions below.  For now, I'm developing set of
utilities for my own use, but if I can get these utilities working, I'd
like to see if it might be worth contributing them to the Spark project.

To summarize, I'm hoping to come up with a cleaner & more
functional-programming oriented way of defining custom grouping
calculations on Datasets / DataFrames, as described in my first email, from
August 12.  My second email (from Aug 13) provides a smaller and more
self-contained example that I think illustrates the core stumbling block
I'm running into.

Thanks,
~ Andrew


-- Forwarded message -
From: Andrew Leverentz 
Date: Tue, Aug 13, 2019 at 12:59 PM
Subject: Re: Custom aggregations: modular and lightweight solutions?
To: 


Here's a simpler example that I think gets at the heart of what I'm trying
to do: DynamicSchemaExample.scala
<https://gist.github.com/alev000/5be58bffb2dbc64bcdcc45fefb025a6e>.  Here,
I'm dynamically creating a sequence of Rows and also dynamically creating a
corresponding schema (StructType), but the RowEncoder derived from the
schema doesn't seem to handle the nested structure of the Rows.  This
example fails with a similar error (in this case, "scala.Tuple2$mcDD$sp is
not a valid external type for schema of struct<_1:double,_2:double>").

If I could find a way to get this example working (for arbitrary values of
rowSize), I suspect that it would also give me a solution to the
custom-aggregation issue I outlined in my previous email.  Any suggestions
would be much appreciated.

Thanks,
~ Andrew



On Mon, Aug 12, 2019 at 5:31 PM Andrew Leverentz <
andrew.levere...@empiricotx.com> wrote:

> Hi All,
>
> I'm attempting to clean up some Spark code which performs groupByKey /
> mapGroups to compute custom aggregations, and I could use some help
> understanding the Spark API's necessary to make my code more modular and
> maintainable.
>
> In particular, my current approach is as follows:
>
>- Start with a Dataset[CaseClass1]
>- Apply groupByKey(f), where f is a function that extracts a tuple of
>keys
>- Apply mapGroups(g), where g computes multiple custom aggregations:
>   - Iterate through the rows in each group, updating a large, mutable
>   CustomState object.
>   - At the end of each group, transform the current key and the
>   CustomState into an instance of CaseClass2.
>
> In other words, we start with a dataset of CaseClass1 objects and end up
> with a dataset of CaseClass2 objects, using instances of a complex
> CustomState class to store the intermediate state during the aggregation.
>
> We have dozens of custom aggregation calculations to perform on this data,
> and I'd like to be able streamline the process of introducing new
> aggregations and comparing multiple parameterized variations of the same
> aggregations side-by-side.  The current approach requires us to touch
> several tightly coupled pieces of code in order to add simple variations to
> existing aggregate functions.
>
> The UDAF API seems to be designed for this use case, but I've found it to
> be just as cumbersome to create new UDAF's as it is to maintain my current
> code.
>
> To address this, I've tried a couple of approaches (described below),
> although I've run into problems with both of them.
>
> At a high level, both of my approaches require a Dataset[T], a key
> extractor function (T => K), and a collection of instances of a custom
> class GroupingCalculation[T, S, R].  Here, T is the data type of each row
> in the dataset, K is the type of the key by which the rows should be
> grouped, S is the type of the intermediate state during aggregation, and R
> is the result type of each aggregation.  In this context, the data types T
> and K are fixed, but the state and result types (S and R) may vary among
> the GroupingCalculation instances.  The resulting DataFrame will have Rows
> which are basically concatenations of {K, R1, R2, ..., Rn}, where R1, ...,
> Rn are the result types for the GroupingCollection instances.
>
> (1) My first approach operates by constructing a
> UserDefinedAggregateFunction by applying ScalaReflection.schemaFor[...] to
> T, S, and R.  After some digging and experimentation, I found a way to use
> CatalystTypeConverters and ExpressionEncoders to populate the
> MutableAggregationBuffer.  Unfortunately, once I finally got it running,
> this approach yielded a runtime 10x slower than the original approach
> described above. I suspect that adding an extra encoding/decoding layer on
> top of the UDAF was what slowed it down.  Because of this, I'm setting
> aside this approach for now.
>
> (2) Using a similar API to (1), I replaced

Timeline for Spark 3.0

2019-06-28 Thread Long, Andrew
Hey Friends,

Is there a timeline for spark 3.0 in terms of the first RC and final release?

Cheers Andrew


DSV2 API Question

2019-06-25 Thread Andrew Melo
Hello,

I've (nearly) implemented a DSV2-reader interface to read particle physics
data stored in the ROOT (https://root.cern.ch/) file format. You can think
of these ROOT files as roughly parquet-like: column-wise and nested (i.e. a
column can be of type "float[]", meaning each row in the column is a
variable-length  array of floats). The overwhelming majority of our columns
are these variable-length arrays, since they represent physical quantities
that vary widely with each particle collision*.

Exposing these columns via the "SupportsScanColumnarBatch" interface has
raised a question I have about the DSV2 API. I know the interface is
currently Evolving, but I don't know if this is the appropriate place to
ask about it (I presume JIRA is a good place as well, but I had trouble
finding exactly where the best place to join is)

There is no provision in the org.apache.spark.sql.vectorized.ColumnVector
interface to return multiple rows of arrays (i.e. no "getArrays" analogue
to "getArray"). A big use case we have is to pipe these data through UDFs,
so it would be nice to be able to get the data from the file into a UDF
batch without having to convert to an intermediate row-wise representation.
Looking into ColumnarArray, however, it seems like instead of storing a
single offset and length, it could be extended to arrays of "offsets" and
"lengths". The public interface could remain the same by adding a 2nd
constructor which accepts arrays and keeping the existing constructor as a
degenerate case of a 1-length array.


* e.g. "electron_momentum" column will have a different number of entries
each row, one for each electron that is produced in a collision.


Re: Detect executor core count

2019-06-18 Thread Andrew Melo
On Tue, Jun 18, 2019 at 5:40 PM Steve Loughran 
wrote:

> be aware that older java 8 versions count the #of cores in the host, not
> those allocated for the container they run in
> https://bugs.openjdk.java.net/browse/JDK-8140793
>
>
Ergh, that's good to know. I suppose, though, that in any case, there
should be a SparkSession available if I'm in the executor context, so I can
fallback to something sensible just in case.

Thanks for the help, everyone!


> On Tue, Jun 18, 2019 at 8:13 PM Ilya Matiach 
> wrote:
>
>> Hi Andrew,
>>
>> I tried to do something similar to that in the LightGBM
>> classifier/regressor/ranker in mmlspark package, I try to use the spark
>> conf and if not configured I get the processors from the JVM directly:
>>
>>
>> https://github.com/Azure/mmlspark/blob/master/src/lightgbm/src/main/scala/LightGBMUtils.scala#L172
>>
>>
>>
>> If you know of a better way, please let me know!
>>
>>
>>
>> val spark = dataset.sparkSession
>>
>> try {
>>
>>   val confCores = spark.sparkContext.getConf
>>
>> .get("spark.executor.cores").toInt
>>
>>   val confTaskCpus = spark.sparkContext.getConf
>>
>> .get("spark.task.cpus", "1").toInt
>>
>>   confCores / confTaskCpus
>>
>> } catch {
>>
>>   case _: NoSuchElementException =>
>>
>> // If spark.executor.cores is not defined, get the cores per JVM
>>
>>     import spark.implicits._
>>
>> val numMachineCores = spark.range(0, 1)
>>
>>   .map(_ =>
>> java.lang.Runtime.getRuntime.availableProcessors).collect.head
>>
>> numMachineCores
>>
>> }
>>
>>
>>
>> Thank you, Ilya
>>
>>
>>
>> *From:* Andrew Melo 
>> *Sent:* Tuesday, June 18, 2019 11:32 AM
>> *To:* dev 
>> *Subject:* Detect executor core count
>>
>>
>>
>> Hello,
>>
>>
>>
>> Is there a way to detect the number of cores allocated for an executor
>> within a java-based InputPartitionReader?
>>
>>
>>
>> Thanks!
>>
>> Andrew
>>
>


Detect executor core count

2019-06-18 Thread Andrew Melo
Hello,

Is there a way to detect the number of cores allocated for an executor
within a java-based InputPartitionReader?

Thanks!
Andrew


Re: DataSourceV2Reader Q

2019-05-21 Thread Andrew Melo
Hi Ryan,

On Tue, May 21, 2019 at 2:48 PM Ryan Blue  wrote:
>
> Are you sure that your schema conversion is correct? If you're running with a 
> recent Spark version, then that line is probably `name.hashCode()`. That file 
> was last updated 6 months ago so I think it is likely that `name` is the null 
> in your version.

Thanks for taking a look -- in my traceback, "line 264" of
attributeReference.hashCode() is:

h = h * 37 + metadata.hashCode()

If I look within the StructType at the top of the schema, each
StructField indeed has null for the metadata, which I improperly
passing in instead of Metadata.empty()

Thanks again,
Andrew

>
> On Tue, May 21, 2019 at 11:39 AM Andrew Melo  wrote:
>>
>> Hello,
>>
>> I'm developing a DataSourceV2 reader for the ROOT (https://root.cern/)
>> file format to replace a previous DSV1 source that was in use before.
>>
>> I have a bare skeleton of the reader, which can properly load the
>> files and pass their schema into Spark 2.4.3, but any operation on the
>> resulting DataFrame (except for printSchema()) causes an NPE deep in
>> the guts of spark [1]. I'm baffled, though, since both logging
>> statements and coverage says that neither planBatchInputPartitions nor
>> any of the methods in my partition class are called -- the only thing
>> called is readSchema and the constructors.
>>
>> I followed the pattern from "JavaBatchDataSourceV2.java" -- is it
>> possible that test-case isn't up to date? Are there any other example
>> Java DSV2 readers out in the wild I could compare against?
>>
>> Thanks!
>> Andrew
>>
>> [1]
>>
>> java.lang.NullPointerException
>> at 
>> org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:264)
>> at 
>> scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:129)
>> at 
>> scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
>> at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
>> at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
>> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
>> at scala.collection.mutable.AbstractSet.apply(Set.scala:46)
>> at scala.collection.SeqLike$$anonfun$distinct$1.apply(SeqLike.scala:506)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at scala.collection.SeqLike$class.distinct(SeqLike.scala:505)
>> at scala.collection.AbstractSeq.distinct(Seq.scala:41)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
>> at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>> at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.unique(package.scala:147)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct$lzycompute(package.scala:152)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct(package.scala:151)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:229)
>> at 
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:892)
>> at 
>> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:889)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRefe

DataSourceV2Reader Q

2019-05-21 Thread Andrew Melo
Hello,

I'm developing a DataSourceV2 reader for the ROOT (https://root.cern/)
file format to replace a previous DSV1 source that was in use before.

I have a bare skeleton of the reader, which can properly load the
files and pass their schema into Spark 2.4.3, but any operation on the
resulting DataFrame (except for printSchema()) causes an NPE deep in
the guts of spark [1]. I'm baffled, though, since both logging
statements and coverage says that neither planBatchInputPartitions nor
any of the methods in my partition class are called -- the only thing
called is readSchema and the constructors.

I followed the pattern from "JavaBatchDataSourceV2.java" -- is it
possible that test-case isn't up to date? Are there any other example
Java DSV2 readers out in the wild I could compare against?

Thanks!
Andrew

[1]

java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:264)
at 
scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:129)
at 
scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.mutable.AbstractSet.apply(Set.scala:46)
at scala.collection.SeqLike$$anonfun$distinct$1.apply(SeqLike.scala:506)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.SeqLike$class.distinct(SeqLike.scala:505)
at scala.collection.AbstractSeq.distinct(Seq.scala:41)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.unique(package.scala:147)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct$lzycompute(package.scala:152)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct(package.scala:151)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:229)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:892)
at 
org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:889)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:958)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:958)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache

Bucketing and catalyst

2019-05-02 Thread Long, Andrew
Hey Friends,

How aware of bucketing is Catalyst? I’ve been trying to piece together how 
Catalyst knows that it can remove a sort and shuffle given that both tables are 
bucketed and sorted the same way. Is there any classes in particular I should 
look at?

Cheers Andrew


Re: Stage 152 contains a task of very large size (12747 KB). The maximum recommended task size is 100 KB

2019-05-01 Thread Long, Andrew
It turned out that I was unintentionally copying multiple copies of the Hadoop 
config to every partition in an rdd. >.<  I was able to debug this by setting a 
break point on the warning message and inspecting the partition object itself.

Cheers Andrew

From: Russell Spitzer 
Date: Thursday, April 25, 2019 at 8:47 AM
To: "Long, Andrew" 
Cc: dev 
Subject: Re: FW: Stage 152 contains a task of very large size (12747 KB). The 
maximum recommended task size is 100 KB

I usually only see that in regards to folks parallelizing very large objects. 
From what I know, it's really just the data inside the "Partition" class of the 
RDD that is being sent back and forth. So usually something like 
spark.parallelize(Seq(reallyBigMap)) or something like that. The parallelize 
function jams all that data into the RDD's Partition metadata so that can 
easily overwhelm the task size.

On Tue, Apr 23, 2019 at 3:57 PM Long, Andrew  
wrote:
Hey Friends,

Is there an easy way of figuring out whats being pull into the task context?  
I’ve been getting the following message which I suspect means I’ve 
unintentional caught some large objects but figuring out what those objects are 
is stumping me.

19/04/23 13:52:13 WARN org.apache.spark.internal.Logging$class TaskSetManager: 
Stage 152 contains a task of very large size (12747 KB). The maximum 
recommended task size is 100 KB

Cheers Andrew


FW: Stage 152 contains a task of very large size (12747 KB). The maximum recommended task size is 100 KB

2019-04-23 Thread Long, Andrew
Hey Friends,

Is there an easy way of figuring out whats being pull into the task context?  
I’ve been getting the following message which I suspect means I’ve 
unintentional caught some large objects but figuring out what those objects are 
is stumping me.

19/04/23 13:52:13 WARN org.apache.spark.internal.Logging$class TaskSetManager: 
Stage 152 contains a task of very large size (12747 KB). The maximum 
recommended task size is 100 KB

Cheers Andrew


Sort order in bucketing in a custom datasource

2019-04-16 Thread Long, Andrew
Hey Friends,

Is it possible to specify the sort order or bucketing in a way that can be used 
by the optimizer in spark?

Cheers Andrew


Which parts of a parquet read happen on the driver vs the executor?

2019-04-11 Thread Long, Andrew
Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  
Writes are working but I’m struggling with getting reads working due to 
serialization issues. I’ve got code that works in master=local but not in yarn. 
 So here are my questions.


  1.  Is there an easy way to tell if a particular function in spark will be 
run on the driver or the executor?  My current system is that if the function 
uses the spark session it runs on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver 
or the executor?  Dyue to the spark session I was suspecting that it was run on 
the driver and then the resulting iterator was sent to the executor to run the 
read but I’ve been running into serialization issues.

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: 
Failed to serialize task 26, not attempting to retry it.
java.io.NotSerializableException: scala.collection.Iterator$$anon$12
Serialization stack:
- object not serializable (class: 
scala.collection.Iterator$$anon$12, value: non-empty iterator)
- writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class 
scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@6993864a)
- writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, 
List(non-empty iterator))
- field (class: 
com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class 
scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: 
String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
 = {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
  )

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
new ParquetFileFormat().buildReaderWithPartitionValues(
  sparkSession = spark,
  dataSchema = fileSchema,
  partitionSchema = partitionSchema,//this should be empty for non 
partitioned fields
  requiredSchema = requiredSchema,
  filters = Seq.empty,
  options = Map.empty,
  hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
  })

  rows
}


Cheers Andrew


DataSourceV2 exceptions

2019-04-08 Thread Andrew Melo
Hello,

I'm developing a (java) DataSourceV2 to read a columnar fileformat
popular in a number of physical sciences (https://root.cern.ch/). (I
also understand that the API isn't fixed and subject to change).

My question is -- what is the expected way to transmit exceptions from
the DataSource up to Spark? The DSV2 interface (unless I'm misreading
it) doesn't specify any caught exceptions that can be thrown in the
DS, so should I instead catch/rethrow any exceptions as uncaught
exceptions? If so, is there a recommended hierarchy to throw from?

thanks!
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Announcing Apache Spark 2.4.1

2019-04-05 Thread Andrew Melo
On Fri, Apr 5, 2019 at 9:41 AM Jungtaek Lim  wrote:
>
> Thanks Andrew for reporting this. I just submitted the fix. 
> https://github.com/apache/spark/pull/24304

Thanks!

>
> On Fri, Apr 5, 2019 at 3:21 PM Andrew Melo  wrote:
>>
>> Hello,
>>
>> I'm not sure if this is the proper place to report it, but the 2.4.1
>> version of the config docs apparently didn't render right into HTML
>> (scroll down to "Compression and Serialization")
>>
>> https://spark.apache.org/docs/2.4.1/configuration.html#available-properties
>>
>> By comparison, the 2.4.0 version of the docs renders correctly.
>>
>> Cheers
>> Andrew
>>
>> On Fri, Apr 5, 2019 at 7:59 AM DB Tsai  wrote:
>> >
>> > +user list
>> >
>> > We are happy to announce the availability of Spark 2.4.1!
>> >
>> > Apache Spark 2.4.1 is a maintenance release, based on the branch-2.4
>> > maintenance branch of Spark. We strongly recommend all 2.4.0 users to
>> > upgrade to this stable release.
>> >
>> > In Apache Spark 2.4.1, Scala 2.12 support is GA, and it's no longer
>> > experimental.
>> > We will drop Scala 2.11 support in Spark 3.0, so please provide us 
>> > feedback.
>> >
>> > To download Spark 2.4.1, head over to the download page:
>> > http://spark.apache.org/downloads.html
>> >
>> > To view the release notes:
>> > https://spark.apache.org/releases/spark-release-2-4-1.html
>> >
>> > One more thing: to add a little color to this release, it's the
>> > largest RC ever (RC9)!
>> > We tried to incorporate many critical fixes at the last minute, and
>> > hope you all enjoy it.
>> >
>> > We would like to acknowledge all community members for contributing to
>> > this release. This release would not have been possible without you.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Announcing Apache Spark 2.4.1

2019-04-05 Thread Andrew Melo
Hello,

I'm not sure if this is the proper place to report it, but the 2.4.1
version of the config docs apparently didn't render right into HTML
(scroll down to "Compression and Serialization")

https://spark.apache.org/docs/2.4.1/configuration.html#available-properties

By comparison, the 2.4.0 version of the docs renders correctly.

Cheers
Andrew

On Fri, Apr 5, 2019 at 7:59 AM DB Tsai  wrote:
>
> +user list
>
> We are happy to announce the availability of Spark 2.4.1!
>
> Apache Spark 2.4.1 is a maintenance release, based on the branch-2.4
> maintenance branch of Spark. We strongly recommend all 2.4.0 users to
> upgrade to this stable release.
>
> In Apache Spark 2.4.1, Scala 2.12 support is GA, and it's no longer
> experimental.
> We will drop Scala 2.11 support in Spark 3.0, so please provide us feedback.
>
> To download Spark 2.4.1, head over to the download page:
> http://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-1.html
>
> One more thing: to add a little color to this release, it's the
> largest RC ever (RC9)!
> We tried to incorporate many critical fixes at the last minute, and
> hope you all enjoy it.
>
> We would like to acknowledge all community members for contributing to
> this release. This release would not have been possible without you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Manually reading parquet files.

2019-03-21 Thread Long, Andrew
Thanks a ton for the help!

Is there a standardized way of converting the internal row to rows?

I’ve tried this but im getting an exception

val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)
  case b: ColumnarBatch => b.rowIterator().asScala
})
  .map(enconder.fromRow(_))
  .toList

java.lang.RuntimeException: Error while decoding: 
java.lang.UnsupportedOperationException: Cannot evaluate expression: 
getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, 
IntegerType), getcolumnbyordinal(2, StringType).toString, 
StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), 
StructField(col_a,StringType,true))

at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" 
Cc: "dev@spark.apache.org" , "u...@spark.apache.org" 
, "horizon-...@amazon.com" 
Subject: Re: Manually reading parquet files.

You're getting InternalRow instances. They probably have the data you want, but 
the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew  
wrote:
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in 
the middle of a lambda and I’m running into some issues.  This is what id like 
todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non 
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,24,66647361])
//??this is wrong I think

Has anyone attempted something similar?

Cheers Andrew



--
Ryan Blue
Software Engineer
Netflix


Manually reading parquet files.

2019-03-21 Thread Long, Andrew
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in 
the middle of a lambda and I’m running into some issues.  This is what id like 
todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non 
partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,24,66647361])
//??this is wrong I think

Has anyone attempted something similar?

Cheers Andrew



Re: SPIP: Accelerator-aware Scheduling

2019-03-01 Thread Andrew Melo
Hi,

On Fri, Mar 1, 2019 at 9:48 AM Xingbo Jiang  wrote:
>
> Hi Sean,
>
> To support GPU scheduling with YARN cluster, we have to update the hadoop 
> version to 3.1.2+. However, if we decide to not upgrade hadoop to beyond that 
> version for Spark 3.0, then we just have to disable/fallback the GPU 
> scheduling with YARN, users shall still be able to have that feature with 
> Standalone or Kubernetes cluster.
>
> We didn't include the Mesos support in current SPIP because we didn't receive 
> use cases that require GPU scheduling on Mesos cluster, however, we can still 
> add Mesos support in the future if we observe valid use cases.

First time caller, long time listener. We have GPUs in our Mesos-based
Spark cluster, and it would be nice to use them with Spark-based
GPU-enabled frameworks (our use case is deep learning applications).

Cheers
Andrew

>
> Thanks!
>
> Xingbo
>
> Sean Owen  于2019年3月1日周五 下午10:39写道:
>>
>> Two late breaking questions:
>>
>> This basically requires Hadoop 3.1 for YARN support?
>> Mesos support is listed as a non goal but it already has support for 
>> requesting GPUs in Spark. That would be 'harmonized' with this 
>> implementation even if it's not extended?
>>
>> On Fri, Mar 1, 2019, 7:48 AM Xingbo Jiang  wrote:
>>>
>>> I think we are aligned on the commitment, I'll start a vote thread for this 
>>> shortly.
>>>
>>> Xiangrui Meng  于2019年2月27日周三 上午6:47写道:
>>>>
>>>> In case there are issues visiting Google doc, I attached PDF files to the 
>>>> JIRA.
>>>>
>>>> On Tue, Feb 26, 2019 at 7:41 AM Xingbo Jiang  wrote:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I want send a revised SPIP on implementing Accelerator(GPU)-aware 
>>>>> Scheduling. It improves Spark by making it aware of GPUs exposed by 
>>>>> cluster managers, and hence Spark can match GPU resources with user task 
>>>>> requests properly. If you have scenarios that need to run 
>>>>> workloads(DL/ML/Signal Processing etc.) on Spark cluster with GPU nodes, 
>>>>> please help review and check how it fits into your use cases. Your 
>>>>> feedback would be greatly appreciated!
>>>>>
>>>>> # Links to SPIP and Product doc:
>>>>>
>>>>> * Jira issue for the SPIP: 
>>>>> https://issues.apache.org/jira/browse/SPARK-24615
>>>>> * Google Doc: 
>>>>> https://docs.google.com/document/d/1C4J_BPOcSCJc58HL7JfHtIzHrjU0rLRdQM3y7ejil64/edit?usp=sharing
>>>>> * Product Doc: 
>>>>> https://docs.google.com/document/d/12JjloksHCdslMXhdVZ3xY5l1Nde3HRhIrqvzGnK_bNE/edit?usp=sharing
>>>>>
>>>>> Thank you!
>>>>>
>>>>> Xingbo

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hi Ryan,

On Mon, Feb 4, 2019 at 12:17 PM Ryan Blue  wrote:
>
> To partition by a condition, you would need to create a column with the 
> result of that condition. Then you would partition by that column. The sort 
> option would also work here.

We actually do something similar to filter based on physics properties
by running a python UDF to create a column then filtering on that
column. Doing something similar to sort/partition would also require a
shuffle though, right?

>
> I don't think that there is much of a use case for this. You have a set of 
> conditions on which to partition your data, and partitioning is already 
> supported. The idea to use conditions to create separate data frames would 
> actually make that harder because you'd need to create and name tables for 
> each one.

At the end, however, we do need separate dataframes for each of these
subsamples, unless there's something basic I'm missing in how the
partitioning works. After the input datasets are split into signal and
background regions, we still need to perform further (different)
computations on each of the subsamples. e.g. for subsamples with
exactly 2 electrons, we'll need to calculate the sum of their 4-d
momenta, while samples with <2 electrons will need subtract two
different physical quantities -- several more steps before we get to
the point where we'll histogram the different subsamples for the
outputs.

Cheers
Andrew


>
> On Mon, Feb 4, 2019 at 9:16 AM Andrew Melo  wrote:
>>
>> Hello Ryan,
>>
>> On Mon, Feb 4, 2019 at 10:52 AM Ryan Blue  wrote:
>> >
>> > Andrew, can you give us more information about why partitioning the output 
>> > data doesn't work for your use case?
>> >
>> > It sounds like all you need to do is to create a table partitioned by A 
>> > and B, then you would automatically get the divisions you want. If what 
>> > you're looking for is a way to scale the number of combinations then you 
>> > can use formats that support more partitions, or you could sort by the 
>> > fields and rely on Parquet row group pruning to filter out data you don't 
>> > want.
>> >
>>
>> TBH, I don't understand what that would look like in pyspark and what
>> the consequences would be. Looking at the docs, it doesn't appear to
>> be the syntax for partitioning on a condition (most of our conditions
>> are of the form 'X > 30'). The use of Spark is still somewhat new in
>> our field, so it's possible we're not using it correctly.
>>
>> Cheers
>> Andrew
>>
>> > rb
>> >
>> > On Mon, Feb 4, 2019 at 8:33 AM Andrew Melo  wrote:
>> >>
>> >> Hello
>> >>
>> >> On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>> >> >
>> >> > I've seen many application need to split dataset to multiple datasets 
>> >> > based on some conditions. As there is no method to do it in one place, 
>> >> > developers use filter method multiple times. I think it can be useful 
>> >> > to have method to split dataset based on condition in one iteration, 
>> >> > something like partition method of scala (of-course scala partition 
>> >> > just split list into two list, but something more general can be more 
>> >> > useful).
>> >> > If you think it can be helpful, I can create Jira issue and work on it 
>> >> > to send PR.
>> >>
>> >> This would be a really useful feature for our use case (processing
>> >> collision data from the LHC). We typically want to take some sort of
>> >> input and split into multiple disjoint outputs based on some
>> >> conditions. E.g. if we have two conditions A and B, we'll end up with
>> >> 4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
>> >> combinatorics explode like n^2, when we could produce them all up
>> >> front with this "multi filter" (or however it would be called).
>> >>
>> >> Cheers
>> >> Andrew
>> >>
>> >> >
>> >> > Best Regards
>> >> > Moein
>> >> >
>> >> > --
>> >> >
>> >> > Moein Hosseini
>> >> > Data Engineer
>> >> > mobile: +98 912 468 1859
>> >> > site: www.moein.xyz
>> >> > email: moein...@gmail.com
>> >> >
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > --
>> > Ryan Blue
>> > Software Engineer
>> > Netflix
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hello Ryan,

On Mon, Feb 4, 2019 at 10:52 AM Ryan Blue  wrote:
>
> Andrew, can you give us more information about why partitioning the output 
> data doesn't work for your use case?
>
> It sounds like all you need to do is to create a table partitioned by A and 
> B, then you would automatically get the divisions you want. If what you're 
> looking for is a way to scale the number of combinations then you can use 
> formats that support more partitions, or you could sort by the fields and 
> rely on Parquet row group pruning to filter out data you don't want.
>

TBH, I don't understand what that would look like in pyspark and what
the consequences would be. Looking at the docs, it doesn't appear to
be the syntax for partitioning on a condition (most of our conditions
are of the form 'X > 30'). The use of Spark is still somewhat new in
our field, so it's possible we're not using it correctly.

Cheers
Andrew

> rb
>
> On Mon, Feb 4, 2019 at 8:33 AM Andrew Melo  wrote:
>>
>> Hello
>>
>> On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>> >
>> > I've seen many application need to split dataset to multiple datasets 
>> > based on some conditions. As there is no method to do it in one place, 
>> > developers use filter method multiple times. I think it can be useful to 
>> > have method to split dataset based on condition in one iteration, 
>> > something like partition method of scala (of-course scala partition just 
>> > split list into two list, but something more general can be more useful).
>> > If you think it can be helpful, I can create Jira issue and work on it to 
>> > send PR.
>>
>> This would be a really useful feature for our use case (processing
>> collision data from the LHC). We typically want to take some sort of
>> input and split into multiple disjoint outputs based on some
>> conditions. E.g. if we have two conditions A and B, we'll end up with
>> 4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
>> combinatorics explode like n^2, when we could produce them all up
>> front with this "multi filter" (or however it would be called).
>>
>> Cheers
>> Andrew
>>
>> >
>> > Best Regards
>> > Moein
>> >
>> > --
>> >
>> > Moein Hosseini
>> > Data Engineer
>> > mobile: +98 912 468 1859
>> > site: www.moein.xyz
>> > email: moein...@gmail.com
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hello

On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>
> I've seen many application need to split dataset to multiple datasets based 
> on some conditions. As there is no method to do it in one place, developers 
> use filter method multiple times. I think it can be useful to have method to 
> split dataset based on condition in one iteration, something like partition 
> method of scala (of-course scala partition just split list into two list, but 
> something more general can be more useful).
> If you think it can be helpful, I can create Jira issue and work on it to 
> send PR.

This would be a really useful feature for our use case (processing
collision data from the LHC). We typically want to take some sort of
input and split into multiple disjoint outputs based on some
conditions. E.g. if we have two conditions A and B, we'll end up with
4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
combinatorics explode like n^2, when we could produce them all up
front with this "multi filter" (or however it would be called).

Cheers
Andrew

>
> Best Regards
> Moein
>
> --
>
> Moein Hosseini
> Data Engineer
> mobile: +98 912 468 1859
> site: www.moein.xyz
> email: moein...@gmail.com
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark data quality bug when reading parquet files from hive metastore

2018-09-07 Thread Long, Andrew
Thanks Fokko,

I will definitely take a look at this.

Cheers Andrew

From: "Driesprong, Fokko" 
Date: Friday, August 24, 2018 at 2:39 AM
To: "reubensaw...@hotmail.com" 
Cc: "dev@spark.apache.org" 
Subject: Re: Spark data quality bug when reading parquet files from hive 
metastore

Hi Andrew,

This blog gives an idea how to schema is resolved: 
https://blog.godatadriven.com/multiformat-spark-partition There is some 
optimisation going on when reading Parquet using Spark. Hope this helps.

Cheers, Fokko


Op wo 22 aug. 2018 om 23:59 schreef t4 
mailto:reubensaw...@hotmail.com>>:
https://issues.apache.org/jira/browse/SPARK-23576 ?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org<mailto:dev-unsubscr...@spark.apache.org>


Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hi,

I'm a long-time listener, first-time committer to spark, so this is
good to get my feet wet. I'm particularly interested in SPARK-23836,
which is an itch I may want to dive into and scratch myself in the
next month or so since it's pretty painful for our use-case.

Thanks!
Andrew

On Mon, Aug 27, 2018 at 2:20 PM, Holden Karau  wrote:
> Sure, I don't think you should wait on that being merged in. If you want to
> take the JIRA go ahead (although if you're already familiar with the Spark
> code base it might make sense to leave it as a starter issue for someone who
> is just getting started).
>
> On Mon, Aug 27, 2018 at 12:18 PM Andrew Melo  wrote:
>>
>> Hi Holden,
>>
>> I'm agnostic to the approach (though it seems cleaner to have an
>> explicit API for it). If you would like, I can take that JIRA and
>> implement it (should be a 3-line function).
>>
>> Cheers
>> Andrew
>>
>> On Mon, Aug 27, 2018 at 2:14 PM, Holden Karau 
>> wrote:
>> > Seems reasonable. We should probably add `getActiveSession` to the
>> > PySpark
>> > API (filed a starter JIRA
>> > https://issues.apache.org/jira/browse/SPARK-25255
>> > )
>> >
>> > On Mon, Aug 27, 2018 at 12:09 PM Andrew Melo 
>> > wrote:
>> >>
>> >> Hello Sean, others -
>> >>
>> >> Just to confirm, is it OK for client applications to access
>> >> SparkContext._active_spark_context, if it wraps the accesses in `with
>> >> SparkContext._lock:`?
>> >>
>> >> If that's acceptable to Spark, I'll implement the modifications in the
>> >> Jupyter extensions.
>> >>
>> >> thanks!
>> >> Andrew
>> >>
>> >> On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo 
>> >> wrote:
>> >> > Hi Sean,
>> >> >
>> >> > On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> >> >> Ah, python.  How about SparkContext._active_spark_context then?
>> >> >
>> >> > Ah yes, that looks like the right member, but I'm a bit wary about
>> >> > depending on functionality of objects with leading underscores. I
>> >> > assumed that was "private" and subject to change. Is that something I
>> >> > should be unconcerned about.
>> >> >
>> >> > The other thought is that the accesses with SparkContext are
>> >> > protected
>> >> > by "SparkContext._lock" -- should I also use that lock?
>> >> >
>> >> > Thanks for your help!
>> >> > Andrew
>> >> >
>> >> >>
>> >> >> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo 
>> >> >> wrote:
>> >> >>>
>> >> >>> Hi Sean,
>> >> >>>
>> >> >>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> >> >>> > Is SparkSession.getActiveSession what you're looking for?
>> >> >>>
>> >> >>> Perhaps -- though there's not a corresponding python function, and
>> >> >>> I'm
>> >> >>> not exactly sure how to call the scala getActiveSession without
>> >> >>> first
>> >> >>> instantiating the python version and causing a JVM to start.
>> >> >>>
>> >> >>> Is there an easy way to call getActiveSession that doesn't start a
>> >> >>> JVM?
>> >> >>>
>> >> >>> Cheers
>> >> >>> Andrew
>> >> >>>
>> >> >>> >
>> >> >>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo
>> >> >>> > 
>> >> >>> > wrote:
>> >> >>> >>
>> >> >>> >> Hello,
>> >> >>> >>
>> >> >>> >> One pain point with various Jupyter extensions [1][2] that
>> >> >>> >> provide
>> >> >>> >> visual feedback about running spark processes is the lack of a
>> >> >>> >> public
>> >> >>> >> API to introspect the web URL. The notebook server needs to know
>> >> >>> >> the
>> >> >>> >> URL to find information about the current SparkContext.
>> >> >>> >>
>> >> >&g

Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hi Holden,

I'm agnostic to the approach (though it seems cleaner to have an
explicit API for it). If you would like, I can take that JIRA and
implement it (should be a 3-line function).

Cheers
Andrew

On Mon, Aug 27, 2018 at 2:14 PM, Holden Karau  wrote:
> Seems reasonable. We should probably add `getActiveSession` to the PySpark
> API (filed a starter JIRA https://issues.apache.org/jira/browse/SPARK-25255
> )
>
> On Mon, Aug 27, 2018 at 12:09 PM Andrew Melo  wrote:
>>
>> Hello Sean, others -
>>
>> Just to confirm, is it OK for client applications to access
>> SparkContext._active_spark_context, if it wraps the accesses in `with
>> SparkContext._lock:`?
>>
>> If that's acceptable to Spark, I'll implement the modifications in the
>> Jupyter extensions.
>>
>> thanks!
>> Andrew
>>
>> On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo  wrote:
>> > Hi Sean,
>> >
>> > On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> >> Ah, python.  How about SparkContext._active_spark_context then?
>> >
>> > Ah yes, that looks like the right member, but I'm a bit wary about
>> > depending on functionality of objects with leading underscores. I
>> > assumed that was "private" and subject to change. Is that something I
>> > should be unconcerned about.
>> >
>> > The other thought is that the accesses with SparkContext are protected
>> > by "SparkContext._lock" -- should I also use that lock?
>> >
>> > Thanks for your help!
>> > Andrew
>> >
>> >>
>> >> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo 
>> >> wrote:
>> >>>
>> >>> Hi Sean,
>> >>>
>> >>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> >>> > Is SparkSession.getActiveSession what you're looking for?
>> >>>
>> >>> Perhaps -- though there's not a corresponding python function, and I'm
>> >>> not exactly sure how to call the scala getActiveSession without first
>> >>> instantiating the python version and causing a JVM to start.
>> >>>
>> >>> Is there an easy way to call getActiveSession that doesn't start a
>> >>> JVM?
>> >>>
>> >>> Cheers
>> >>> Andrew
>> >>>
>> >>> >
>> >>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>> >>> > wrote:
>> >>> >>
>> >>> >> Hello,
>> >>> >>
>> >>> >> One pain point with various Jupyter extensions [1][2] that provide
>> >>> >> visual feedback about running spark processes is the lack of a
>> >>> >> public
>> >>> >> API to introspect the web URL. The notebook server needs to know
>> >>> >> the
>> >>> >> URL to find information about the current SparkContext.
>> >>> >>
>> >>> >> Simply looking for "localhost:4040" works most of the time, but
>> >>> >> fails
>> >>> >> if multiple spark notebooks are being run on the same host -- spark
>> >>> >> increments the port for each new context, leading to confusion when
>> >>> >> the notebooks are trying to probe the web interface for
>> >>> >> information.
>> >>> >>
>> >>> >> I'd like to implement an analog to SparkContext.getOrCreate(),
>> >>> >> perhaps
>> >>> >> called "getIfExists()" that returns the current singleton if it
>> >>> >> exists, or None otherwise. The Jupyter code would then be able to
>> >>> >> use
>> >>> >> this entrypoint to query Spark for an active Spark context, which
>> >>> >> it
>> >>> >> could use to probe the web URL.
>> >>> >>
>> >>> >> It's a minor change, but this would be my first contribution to
>> >>> >> Spark,
>> >>> >> and I want to make sure my plan was kosher before I implemented it.
>> >>> >>
>> >>> >> Thanks!
>> >>> >> Andrew
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>> >>> >>
>> >>> >> [2] https://github.com/mozilla/jupyter-spark
>> >>> >>
>> >>> >>
>> >>> >> -
>> >>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>> >>
>> >>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hello Sean, others -

Just to confirm, is it OK for client applications to access
SparkContext._active_spark_context, if it wraps the accesses in `with
SparkContext._lock:`?

If that's acceptable to Spark, I'll implement the modifications in the
Jupyter extensions.

thanks!
Andrew

On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo  wrote:
> Hi Sean,
>
> On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> Ah, python.  How about SparkContext._active_spark_context then?
>
> Ah yes, that looks like the right member, but I'm a bit wary about
> depending on functionality of objects with leading underscores. I
> assumed that was "private" and subject to change. Is that something I
> should be unconcerned about.
>
> The other thought is that the accesses with SparkContext are protected
> by "SparkContext._lock" -- should I also use that lock?
>
> Thanks for your help!
> Andrew
>
>>
>> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo  wrote:
>>>
>>> Hi Sean,
>>>
>>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>>> > Is SparkSession.getActiveSession what you're looking for?
>>>
>>> Perhaps -- though there's not a corresponding python function, and I'm
>>> not exactly sure how to call the scala getActiveSession without first
>>> instantiating the python version and causing a JVM to start.
>>>
>>> Is there an easy way to call getActiveSession that doesn't start a JVM?
>>>
>>> Cheers
>>> Andrew
>>>
>>> >
>>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>>> > wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> One pain point with various Jupyter extensions [1][2] that provide
>>> >> visual feedback about running spark processes is the lack of a public
>>> >> API to introspect the web URL. The notebook server needs to know the
>>> >> URL to find information about the current SparkContext.
>>> >>
>>> >> Simply looking for "localhost:4040" works most of the time, but fails
>>> >> if multiple spark notebooks are being run on the same host -- spark
>>> >> increments the port for each new context, leading to confusion when
>>> >> the notebooks are trying to probe the web interface for information.
>>> >>
>>> >> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>>> >> called "getIfExists()" that returns the current singleton if it
>>> >> exists, or None otherwise. The Jupyter code would then be able to use
>>> >> this entrypoint to query Spark for an active Spark context, which it
>>> >> could use to probe the web URL.
>>> >>
>>> >> It's a minor change, but this would be my first contribution to Spark,
>>> >> and I want to make sure my plan was kosher before I implemented it.
>>> >>
>>> >> Thanks!
>>> >> Andrew
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>>> >>
>>> >> [2] https://github.com/mozilla/jupyter-spark
>>> >>
>>> >> -
>>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >>
>>> >

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark data quality bug when reading parquet files from hive metastore

2018-08-22 Thread Long, Andrew
Hello Friends,

I’ve encountered a bug where spark silently corrupts data when reading from a 
parquet hive table where the table schema does not match the file schema.  I’d 
like to give a shot at adding some extra validations to the code to handle this 
corner case and I was wondering if anyone had any suggestions for where to 
start looking in the spark code.

Cheers Andrew


Re: SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hi Sean,

On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
> Ah, python.  How about SparkContext._active_spark_context then?

Ah yes, that looks like the right member, but I'm a bit wary about
depending on functionality of objects with leading underscores. I
assumed that was "private" and subject to change. Is that something I
should be unconcerned about.

The other thought is that the accesses with SparkContext are protected
by "SparkContext._lock" -- should I also use that lock?

Thanks for your help!
Andrew

>
> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo  wrote:
>>
>> Hi Sean,
>>
>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> > Is SparkSession.getActiveSession what you're looking for?
>>
>> Perhaps -- though there's not a corresponding python function, and I'm
>> not exactly sure how to call the scala getActiveSession without first
>> instantiating the python version and causing a JVM to start.
>>
>> Is there an easy way to call getActiveSession that doesn't start a JVM?
>>
>> Cheers
>> Andrew
>>
>> >
>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>> > wrote:
>> >>
>> >> Hello,
>> >>
>> >> One pain point with various Jupyter extensions [1][2] that provide
>> >> visual feedback about running spark processes is the lack of a public
>> >> API to introspect the web URL. The notebook server needs to know the
>> >> URL to find information about the current SparkContext.
>> >>
>> >> Simply looking for "localhost:4040" works most of the time, but fails
>> >> if multiple spark notebooks are being run on the same host -- spark
>> >> increments the port for each new context, leading to confusion when
>> >> the notebooks are trying to probe the web interface for information.
>> >>
>> >> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>> >> called "getIfExists()" that returns the current singleton if it
>> >> exists, or None otherwise. The Jupyter code would then be able to use
>> >> this entrypoint to query Spark for an active Spark context, which it
>> >> could use to probe the web URL.
>> >>
>> >> It's a minor change, but this would be my first contribution to Spark,
>> >> and I want to make sure my plan was kosher before I implemented it.
>> >>
>> >> Thanks!
>> >> Andrew
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>> >>
>> >> [2] https://github.com/mozilla/jupyter-spark
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hi Sean,

On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
> Is SparkSession.getActiveSession what you're looking for?

Perhaps -- though there's not a corresponding python function, and I'm
not exactly sure how to call the scala getActiveSession without first
instantiating the python version and causing a JVM to start.

Is there an easy way to call getActiveSession that doesn't start a JVM?

Cheers
Andrew

>
> On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo  wrote:
>>
>> Hello,
>>
>> One pain point with various Jupyter extensions [1][2] that provide
>> visual feedback about running spark processes is the lack of a public
>> API to introspect the web URL. The notebook server needs to know the
>> URL to find information about the current SparkContext.
>>
>> Simply looking for "localhost:4040" works most of the time, but fails
>> if multiple spark notebooks are being run on the same host -- spark
>> increments the port for each new context, leading to confusion when
>> the notebooks are trying to probe the web interface for information.
>>
>> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>> called "getIfExists()" that returns the current singleton if it
>> exists, or None otherwise. The Jupyter code would then be able to use
>> this entrypoint to query Spark for an active Spark context, which it
>> could use to probe the web URL.
>>
>> It's a minor change, but this would be my first contribution to Spark,
>> and I want to make sure my plan was kosher before I implemented it.
>>
>> Thanks!
>> Andrew
>>
>>
>>
>>
>>
>> [1] https://krishnan-r.github.io/sparkmonitor/
>>
>> [2] https://github.com/mozilla/jupyter-spark
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hello,

One pain point with various Jupyter extensions [1][2] that provide
visual feedback about running spark processes is the lack of a public
API to introspect the web URL. The notebook server needs to know the
URL to find information about the current SparkContext.

Simply looking for "localhost:4040" works most of the time, but fails
if multiple spark notebooks are being run on the same host -- spark
increments the port for each new context, leading to confusion when
the notebooks are trying to probe the web interface for information.

I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
called "getIfExists()" that returns the current singleton if it
exists, or None otherwise. The Jupyter code would then be able to use
this entrypoint to query Spark for an active Spark context, which it
could use to probe the web URL.

It's a minor change, but this would be my first contribution to Spark,
and I want to make sure my plan was kosher before I implemented it.

Thanks!
Andrew





[1] https://krishnan-r.github.io/sparkmonitor/

[2] https://github.com/mozilla/jupyter-spark

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Feedback on first commit + jira issue I opened

2018-05-31 Thread Long, Andrew
Hello Friends,

I’m a new committer and I’ve submitted my first patch and I had some questions 
about documentation standards.  In my patch(jira below)  I’ve added a config 
parameter to adjust the number of records show when a user calls .show() on a 
dataframe.  I was hoping someone could double check my small diff to make sure 
I wasn’t making any rookie mistakes before I submit a pull request.

https://issues.apache.org/jira/browse/SPARK-24442

Cheers Andrew


Re: [VOTE] Spark 2.3.0 (RC2)

2018-02-01 Thread Andrew Ash
I'd like to nominate SPARK-23290
<https://issues.apache.org/jira/browse/SPARK-23290> as a potential blocker
for the 2.3.0 release.  It's a regression from 2.2.0 in that user pyspark
code that works in 2.2.0 now fails in the 2.3.0 RCs: the type return type
of date columns changed from object to datetime64[ns].  My understanding of
the Spark Versioning Policy <http://spark.apache.org/versioning-policy.html> is
that user code should continue to run in future versions of Spark with the
same major version number.

Thanks!

On Thu, Feb 1, 2018 at 9:50 AM, Tom Graves <tgraves...@yahoo.com.invalid>
wrote:

>
> Testing with spark 2.3 and I see a difference in the sql coalesce talking
> to hive vs spark 2.2. It seems spark 2.3 ignores the coalesce.
>
> Query:
> spark.sql("SELECT COUNT(DISTINCT(something)) FROM sometable WHERE dt >=
> '20170301' AND dt <= '20170331' AND something IS NOT
> NULL").coalesce(16).show()
>
> in spark 2.2 the coalesce works here, but in spark 2.3, it doesn't.
>  Anyone know about this issue or are there some weird config changes,
> otherwise I'll file a jira?
>
> Note I also see a performance difference when reading cached data. Spark
> 2.3. Small query on 19GB cached data, spark 2.3 is 30% worse.  This is only
> 13 seconds on spark 2.2 vs 17 seconds on spark 2.3.  Straight up reading
> from hive (orc) seems better though.
>
> Tom
>
>
>
> On Thursday, February 1, 2018, 11:23:45 AM CST, Michael Heuer <
> heue...@gmail.com> wrote:
>
>
> We found two classes new to Spark 2.3.0 that must be registered in Kryo
> for our tests to pass on RC2
>
> org.apache.spark.sql.execution.datasources.BasicWriteTaskStats
> org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
>
> https://github.com/bigdatagenomics/adam/pull/1897
>
> Perhaps a mention in release notes?
>
>michael
>
>
> On Thu, Feb 1, 2018 at 3:29 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> All MLlib QA JIRAs resolved. Looks like SparkR too, so from the ML side
> that should be everything outstanding.
>
>
> On Thu, 1 Feb 2018 at 06:21 Yin Huai <yh...@databricks.com> wrote:
>
> seems we are not running tests related to pandas in pyspark tests (see my
> email "python tests related to pandas are skipped in jenkins"). I think we
> should fix this test issue and make sure all tests are good before cutting
> RC3.
>
> On Wed, Jan 31, 2018 at 10:12 AM, Sameer Agarwal <samee...@apache.org>
> wrote:
>
> Just a quick status update on RC3 -- SPARK-23274
> <https://issues.apache.org/jira/browse/SPARK-23274> was resolved
> yesterday and tests have been quite healthy throughout this week and the
> last. I'll cut the new RC as soon as the remaining blocker (SPARK-23202
> <https://issues.apache.org/jira/browse/SPARK-23202>) is resolved.
>
>
> On 30 January 2018 at 10:12, Andrew Ash <and...@andrewash.com> wrote:
>
> I'd like to nominate SPARK-23274
> <https://issues.apache.org/jira/browse/SPARK-23274> as a potential
> blocker for the 2.3.0 release as well, due to being a regression from
> 2.2.0.  The ticket has a simple repro included, showing a query that works
> in prior releases but now fails with an exception in the catalyst optimizer.
>
> On Fri, Jan 26, 2018 at 10:41 AM, Sameer Agarwal <sameer.a...@gmail.com>
> wrote:
>
> This vote has failed due to a number of aforementioned blockers. I'll
> follow up with RC3 as soon as the 2 remaining (non-QA) blockers are
> resolved: https://s.apache. org/oXKi <https://s.apache.org/oXKi>
>
>
> On 25 January 2018 at 12:59, Sameer Agarwal <sameer.a...@gmail.com> wrote:
>
>
> Most tests pass on RC2, except I'm still seeing the timeout caused by 
> https://issues.apache.org/
> jira/browse/SPARK-23055
> <https://issues.apache.org/jira/browse/SPARK-23055> ; the tests never
> finish. I followed the thread a bit further and wasn't clear whether it was
> subsequently re-fixed for 2.3.0 or not. It says it's resolved along with 
> https://issues.apache.
> org/jira/browse/SPARK-22908
> <https://issues.apache.org/jira/browse/SPARK-22908>  for 2.3.0 though I
> am still seeing these tests fail or hang:
>
> - subscribing topic by name from earliest offsets (failOnDataLoss: false)
> - subscribing topic by name from earliest offsets (failOnDataLoss: true)
>
>
> Sean, while some of these tests were timing out on RC1, we're not aware of
> any known issues in RC2. Both maven (https://amplab.cs.berkeley.
> edu/jenkins/view/Spark%20QA% 20Test%20(Dashboard)/job/
> spark-branch-2.3-test-maven- hadoop-2.6/146/testReport/org.
> apache.spark.sql.kafka010/ history/
> <https://amplab.cs.ber

Re: [VOTE] Spark 2.3.0 (RC2)

2018-01-30 Thread Andrew Ash
I'd like to nominate SPARK-23274
 as a potential blocker
for the 2.3.0 release as well, due to being a regression from 2.2.0.  The
ticket has a simple repro included, showing a query that works in prior
releases but now fails with an exception in the catalyst optimizer.

On Fri, Jan 26, 2018 at 10:41 AM, Sameer Agarwal 
wrote:

> This vote has failed due to a number of aforementioned blockers. I'll
> follow up with RC3 as soon as the 2 remaining (non-QA) blockers are
> resolved: https://s.apache.org/oXKi
>
>
> On 25 January 2018 at 12:59, Sameer Agarwal  wrote:
>
>>
>> Most tests pass on RC2, except I'm still seeing the timeout caused by
>>> https://issues.apache.org/jira/browse/SPARK-23055 ; the tests never
>>> finish. I followed the thread a bit further and wasn't clear whether it was
>>> subsequently re-fixed for 2.3.0 or not. It says it's resolved along with
>>> https://issues.apache.org/jira/browse/SPARK-22908 for 2.3.0 though I am
>>> still seeing these tests fail or hang:
>>>
>>> - subscribing topic by name from earliest offsets (failOnDataLoss: false)
>>> - subscribing topic by name from earliest offsets (failOnDataLoss: true)
>>>
>>
>> Sean, while some of these tests were timing out on RC1, we're not aware
>> of any known issues in RC2. Both maven (https://amplab.cs.berkeley.ed
>> u/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-
>> branch-2.3-test-maven-hadoop-2.6/146/testReport/org.apache.
>> spark.sql.kafka010/history/) and sbt (https://amplab.cs.berkeley.ed
>> u/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-
>> branch-2.3-test-sbt-hadoop-2.6/123/testReport/org.apache.
>> spark.sql.kafka010/history/) historical builds on jenkins
>> for org.apache.spark.sql.kafka010 look fairly healthy. If you're still
>> seeing timeouts in RC2, can you create a JIRA with any applicable build/env
>> info?
>>
>>
>>
>>> On Tue, Jan 23, 2018 at 9:01 AM Sean Owen  wrote:
>>>
 I'm not seeing that same problem on OS X and /usr/bin/tar. I tried
 unpacking it with 'xvzf' and also unzipping it first, and it untarred
 without warnings in either case.

 I am encountering errors while running the tests, different ones each
 time, so am still figuring out whether there is a real problem or just
 flaky tests.

 These issues look like blockers, as they are inherently to be completed
 before the 2.3 release. They are mostly not done. I suppose I'd -1 on
 behalf of those who say this needs to be done first, though, we can keep
 testing.

 SPARK-23105 Spark MLlib, GraphX 2.3 QA umbrella
 SPARK-23114 Spark R 2.3 QA umbrella

 Here are the remaining items targeted for 2.3:

 SPARK-15689 Data source API v2
 SPARK-20928 SPIP: Continuous Processing Mode for Structured Streaming
 SPARK-21646 Add new type coercion rules to compatible with Hive
 SPARK-22386 Data Source V2 improvements
 SPARK-22731 Add a test for ROWID type to OracleIntegrationSuite
 SPARK-22735 Add VectorSizeHint to ML features documentation
 SPARK-22739 Additional Expression Support for Objects
 SPARK-22809 pyspark is sensitive to imports with dots
 SPARK-22820 Spark 2.3 SQL API audit


 On Mon, Jan 22, 2018 at 7:09 PM Marcelo Vanzin 
 wrote:

> +0
>
> Signatures check out. Code compiles, although I see the errors in [1]
> when untarring the source archive; perhaps we should add "use GNU tar"
> to the RM checklist?
>
> Also ran our internal tests and they seem happy.
>
> My concern is the list of open bugs targeted at 2.3.0 (ignoring the
> documentation ones). It is not long, but it seems some of those need
> to be looked at. It would be nice for the committers who are involved
> in those bugs to take a look.
>
> [1] https://superuser.com/questions/318809/linux-os-x-tar-incomp
> atibility-tarballs-created-on-os-x-give-errors-when-unt
>
>
> On Mon, Jan 22, 2018 at 1:36 PM, Sameer Agarwal 
> wrote:
> > Please vote on releasing the following candidate as Apache Spark
> version
> > 2.3.0. The vote is open until Friday January 26, 2018 at 8:00:00 am
> UTC and
> > passes if a majority of at least 3 PMC +1 votes are cast.
> >
> >
> > [ ] +1 Release this package as Apache Spark 2.3.0
> >
> > [ ] -1 Do not release this package because ...
> >
> >
> > To learn more about Apache Spark, please see
> https://spark.apache.org/
> >
> > The tag to be voted on is v2.3.0-rc2:
> > https://github.com/apache/spark/tree/v2.3.0-rc2
> > (489ecb0ef23e5d9b705e5e5bae4fa3d871bdac91)
> >
> > List of JIRA tickets resolved in this release can be found here:
> > https://issues.apache.org/jira/projects/SPARK/versions/12339551
> >
> > The 

Re: Kubernetes: why use init containers?

2018-01-12 Thread Andrew Ash
+1 on the first release being marked experimental.  Many major features
coming into Spark in the past have gone through a stabilization process

On Fri, Jan 12, 2018 at 1:18 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> BTW I most probably will not have time to get back to this at any time
> soon, so if anyone is interested in doing some clean up, I'll leave my
> branch up.
>
> I'm seriously thinking about proposing that we document the k8s
> backend as experimental in 2.3; it seems there still a lot to be
> cleaned up in terms of user interface (as in extensibility and
> customizability), documentation, and mainly testing, and we're pretty
> far into the 2.3 cycle for all of those to be sorted out.
>
> On Thu, Jan 11, 2018 at 8:19 AM, Anirudh Ramanathan
> <ramanath...@google.com> wrote:
> > If we can separate concerns those out, that might make sense in the short
> > term IMO.
> > There are several benefits to reusing spark-submit and spark-class as you
> > pointed out previously,
> > so, we should be looking to leverage those irrespective of how we do
> > dependency management -
> > in the interest of conformance with the other cluster managers.
> >
> > I like the idea of passing arguments through in a way that it doesn't
> > trigger the dependency management code for now.
> > In the interest of time for 2.3, if we could target the just that (and
> > revisit the init containers afterwards),
> > there should be enough time to make the change, test and release with
> > confidence.
> >
> > On Wed, Jan 10, 2018 at 3:45 PM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
> >>
> >> On Wed, Jan 10, 2018 at 3:00 PM, Anirudh Ramanathan
> >> <ramanath...@google.com> wrote:
> >> > We can start by getting a PR going perhaps, and start augmenting the
> >> > integration testing to ensure that there are no surprises -
> with/without
> >> > credentials, accessing GCS, S3 etc as well.
> >> > When we get enough confidence and test coverage, let's merge this in.
> >> > Does that sound like a reasonable path forward?
> >>
> >> I think it's beneficial to separate this into two separate things as
> >> far as discussion goes:
> >>
> >> - using spark-submit: the code should definitely be starting the
> >> driver using spark-submit, and potentially the executor using
> >> spark-class.
> >>
> >> - separately, we can decide on whether to keep or remove init
> containers.
> >>
> >> Unfortunately, code-wise, those are not separate. If you get rid of
> >> init containers, my current p.o.c. has most of the needed changes
> >> (only lightly tested).
> >>
> >> But if you keep init containers, you'll need to mess with the
> >> configuration so that spark-submit never sees spark.jars /
> >> spark.files, so it doesn't trigger its dependency download code. (YARN
> >> does something similar, btw.) That will surely mean different changes
> >> in the current k8s code (which I wanted to double check anyway because
> >> I remember seeing some oddities related to those configs in the logs).
> >>
> >> To comment on one point made by Andrew:
> >> > there's almost a parallel here with spark.yarn.archive, where that
> >> > configures the cluster (YARN) to do distribution pre-runtime
> >>
> >> That's more of a parallel to the docker image; spark.yarn.archive
> >> points to a jar file with Spark jars in it so that YARN can make Spark
> >> available to the driver / executors running in the cluster.
> >>
> >> Like the docker image, you could include other stuff that is not
> >> really part of standard Spark in that archive too, or even not have
> >> Spark at all there, if you want things to just fail. :-)
> >>
> >> --
> >> Marcelo
> >
> >
> >
> >
> > --
> > Anirudh Ramanathan
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Kubernetes: why use init containers?

2018-01-10 Thread Andrew Ash
It seems we have two standard practices for resource distribution in place
here:

- the Spark way is that the application (Spark) distributes the resources
*during* app execution, and does this by exposing files/jars on an http
server on the driver (or pre-staged elsewhere), and executors downloading
from that location (driver or remote)
- the Kubernetes way is that the cluster manager (Kubernetes) distributes
the resources *before* app execution, and does this primarily via docker
images, and secondarily through init containers for non-image resources.
I'd imagine a motivation for this choice in k8s' part is immutability of
the application at runtime

When the Spark and K8s standard practices are in conflict (as they seem to
be here), which convention should be followed?

Looking at the Spark-on-YARN integration, there's almost a parallel here
with spark.yarn.archive, where that configures the cluster (YARN) to do
distribution pre-runtime instead of the application mid-runtime.

Based purely on the lines-of-code removal, right now I lean towards
eliminating init containers.  It doesn't seem like credential segregation
between init container and main pod container is that valuable right now,
and the retryability could/should be in all of Spark's cluster managers,
not just k8s.

So I support Anirudh's suggestion to move towards bringing the change
demonstrated in Marcelo's POC into master.

On Wed, Jan 10, 2018 at 3:00 PM, Anirudh Ramanathan <
ramanath...@google.com.invalid> wrote:

> Thanks for this discussion everyone. It has been very useful in getting an
> overall understanding here.
> I think in general, consensus is that this change doesn't introduce
> behavioral changes, and it's definitely an advantage to reuse the
> constructs that Spark provides to us.
>
> Moving on to a different question here - of pushing this through to Spark.
> The init containers have been tested over the past two Spark releases by
> external users and integration testing - and this would be a fundamental
> change to that behavior.
> We should work on getting enough test coverage and confidence here.
>
> We can start by getting a PR going perhaps, and start augmenting the
> integration testing to ensure that there are no surprises - with/without
> credentials, accessing GCS, S3 etc as well.
> When we get enough confidence and test coverage, let's merge this in.
> Does that sound like a reasonable path forward?
>
>
>
> On Wed, Jan 10, 2018 at 2:53 PM, Marcelo Vanzin 
> wrote:
>
>> On Wed, Jan 10, 2018 at 2:51 PM, Matt Cheah  wrote:
>> > those sidecars may perform side effects that are undesirable if the
>> main Spark application failed because dependencies weren’t available
>>
>> If the contract is that the Spark driver pod does not have an init
>> container, and the driver handles its own dependencies, then by
>> definition that situation cannot exist.
>>
>> --
>> Marcelo
>>
>
>
>
> --
> Anirudh Ramanathan
>


Re: Palantir replease under org.apache.spark?

2018-01-09 Thread Andrew Ash
That source repo is at https://github.com/palantir/spark/ with artifacts
published to Palantir's bintray at
https://palantir.bintray.com/releases/org/apache/spark/  If you're seeing
any of them in Maven Central please flag, as that's a mistake!

Andrew

On Tue, Jan 9, 2018 at 10:10 AM, Sean Owen <so...@cloudera.com> wrote:

> Just to follow up -- those are actually in a Palantir repo, not Central.
> Deploying to Central would be uncourteous, but this approach is legitimate
> and how it has to work for vendors to release distros of Spark etc.
>
>
> On Tue, Jan 9, 2018 at 11:43 AM Nan Zhu <zhunanmcg...@gmail.com> wrote:
>
>> Hi, all
>>
>> Out of curious, I just found a bunch of Palantir release under
>> org.apache.spark in maven central (https://mvnrepository.com/
>> artifact/org.apache.spark/spark-core_2.11)?
>>
>> Is it on purpose?
>>
>> Best,
>>
>> Nan
>>
>>
>>


Re: Leveraging S3 select

2017-12-08 Thread Andrew Duffy
Hey Steve,

Happen to have a link to the TPC-DS benchmark data w/random S3 reads? I've done 
a decent amount of digging, but all I've found is a reference in a slide deck 
and some jira tickets.

From: Steve Loughran 
Date: Tuesday, December 5, 2017 at 09:44
To: "Lalwani, Jayesh" 
Cc: Apache Spark Dev 
Subject: Re: Leveraging S3 select




On 29 Nov 2017, at 21:45, Lalwani, Jayesh 
> wrote:

AWS announced at re:Invent that they are launching S3 Select. This can allow 
Spark to push down predicates to S3, rather than read the entire file in 
memory. Are there any plans to update Spark to use S3 Select?


  1.  ORC and Parquet don't read the whole file in memory anyway, except in the 
special case that the file is gzipped
  2.  Hadoop's s3a <= 2.7 doesn't handle the aggressive seeks of those columnar 
formats that well, as it does a GET pos-EOF & has to abort the TCP connection 
if the seek is backwards
  3.  Hadoop 2.8+ with spark.hadoop.fs.s3a.experimental.fadvise=random switches 
to random IO and only does smaller GET reads of the data requested (actually 
min(min-read-length, buffer-size). This delivers ~3x performance boost in 
TCP-DS benchmarks


I don't yet know how much more efficient the new mechanism will be against 
columnar data, given those facts. You'd need to do experiments

The place to implement this would be though predicate push down from the file 
format to the FS. ORC & Parquet support predicate pushdown, so they'd need to 
recognise when the underlying store could do some of the work for them, open 
the store input stream differently, and use a whole new (undefined?) API to the 
queries. Most likely: s3a would add a way to specify a predicate to select on 
in open(), as well as the expected file type. This would need the underlying 
mechanism to also support those formats though, which the announcement doesn't/

Someone could do something more immediately though some modified CSV data 
source which did the pushdown. However, If you are using CSV for your datasets, 
there's something fundamental w.r.t your data storage policy you need to look 
at. It works sometimes as an exchange format, though I prefer Avro there due to 
its schemas and support for more complex structures.  As a format you run 
queries over? No.


Faster and Lower memory implementation toPandas

2017-11-16 Thread Andrew Andrade
Hello devs,

I know a lot of great work has been done recently with pandas to spark
dataframes and vice versa using Apache Arrow, but I faced a specific pain
point on a low memory setup without Arrow.

Specifically I was finding a driver OOM running a toPandas on a small
dataset (<100 MB compressed).  There was discussion about toPandas being
slow
<http://apache-spark-developers-list.1001551.n3.nabble.com/toPandas-very-slow-td16794.html>
in March 2016 due to a self.collect().  A solution was found to create Pandas
DataFrames or Numpy Arrays using MapPartitions for each partition
<https://gist.github.com/joshlk/871d58e01417478176e7>, but it was never
implemented back into dataframe.py

I understand that using Apache arrow will solve this, but in a setup
without Arrow (like the one where I faced the painpoint), I investigated
memory usage of toPanda and to_pandas (dataframe per partition) and played
with the number of partitions.  The findings are here
<https://gist.github.com/mrandrewandrade/7f5ff26c5275376d3cd5e427ca74d50f>.

The summary of the findings are that on a 147MB dataset, toPandas memory
usage was about 784MB while while doing it partition by partition (with 100
partitions) had an overhead of 76.30 MM and took almost half of the time to
run.  I realize that Arrow solves this but the modification is quite small
and would greatly assist anyone who isn't able to use Arrow.

Would a PR [1] from me to address this issue be welcome?

Thanks,

Andrew

[1] From Josh's Gist

def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy
fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df:  pyspark.sql.DataFrame
:param n_partitions:int or None
:return:pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand


Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path

2017-09-07 Thread Andrew Ash
+0 (non-binding)

I think there are benefits to unifying all the Spark-internal datasources
into a common public API for sure.  It will serve as a forcing function to
ensure that those internal datasources aren't advantaged vs datasources
developed externally as plugins to Spark, and that all Spark features are
available to all datasources.

But I also think this read-path proposal avoids the more difficult
questions around how to continue pushing datasource performance forwards.
James Baker (my colleague) had a number of questions about advanced
pushdowns (combined sorting and filtering), and Reynold also noted that
pushdown of aggregates and joins are desirable on longer timeframes as
well.  The Spark community saw similar requests, for aggregate pushdown in
SPARK-12686, join pushdown in SPARK-20259, and arbitrary plan pushdown
in SPARK-12449.  Clearly a number of people are interested in this kind of
performance work for datasources.

To leave enough space for datasource developers to continue experimenting
with advanced interactions between Spark and their datasources, I'd propose
we leave some sort of escape valve that enables these datasources to keep
pushing the boundaries without forking Spark.  Possibly that looks like an
additional unsupported/unstable interface that pushes down an entire
(unstable API) logical plan, which is expected to break API on every
release.   (Spark attempts this full-plan pushdown, and if that fails Spark
ignores it and continues on with the rest of the V2 API for
compatibility).  Or maybe it looks like something else that we don't know
of yet.  Possibly this falls outside of the desired goals for the V2 API
and instead should be a separate SPIP.

If we had a plan for this kind of escape valve for advanced datasource
developers I'd be an unequivocal +1.  Right now it feels like this SPIP is
focused more on getting the basics right for what many datasources are
already doing in API V1 combined with other private APIs, vs pushing
forward state of the art for performance.

Andrew

On Wed, Sep 6, 2017 at 10:56 PM, Suresh Thalamati <
suresh.thalam...@gmail.com> wrote:

> +1 (non-binding)
>
>
> On Sep 6, 2017, at 7:29 PM, Wenchen Fan <cloud0...@gmail.com> wrote:
>
> Hi all,
>
> In the previous discussion, we decided to split the read and write path of
> data source v2 into 2 SPIPs, and I'm sending this email to call a vote for
> Data Source V2 read path only.
>
> The full document of the Data Source API V2 is:
> https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-
> Z8qU5Frf6WMQZ6jJVM/edit
>
> The ready-for-review PR that implements the basic infrastructure for the
> read path is:
> https://github.com/apache/spark/pull/19136
>
> The vote will be up for the next 72 hours. Please reply with your vote:
>
> +1: Yeah, let's go forward and implement the SPIP.
> +0: Don't really care.
> -1: I don't think this is a good idea because of the following technical
> reasons.
>
> Thanks!
>
>
>


Re: SPIP: Spark on Kubernetes

2017-08-15 Thread Andrew Ash
+1 (non-binding)

We're moving large amounts of infrastructure from a combination of open
source and homegrown cluster management systems to unify on Kubernetes and
want to bring Spark workloads along with us.

On Tue, Aug 15, 2017 at 2:29 PM, liyinan926  wrote:

> +1 (non-binding)
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/SPIP-Spark-on-
> Kubernetes-tp22147p22164.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Use Apache ORC in Apache Spark 2.3

2017-08-10 Thread Andrew Ash
@Reynold no I don't use the HiveCatalog -- I'm using a custom
implementation of ExternalCatalog instead.

On Thu, Aug 10, 2017 at 3:34 PM, Dong Joon Hyun <dh...@hortonworks.com>
wrote:

> Thank you, Andrew and Reynold.
>
>
>
> Yes, it will reduce the old Hive dependency eventually, at least, ORC
> codes.
>
>
>
> And, Spark without `-Phive` can ORC like Parquet.
>
>
>
> This is one milestone for `Feature parity for ORC with Parquet
> (SPARK-20901)`.
>
>
>
> Bests,
>
> Dongjoon
>
>
>
> *From: *Reynold Xin <r...@databricks.com>
> *Date: *Thursday, August 10, 2017 at 3:23 PM
> *To: *Andrew Ash <and...@andrewash.com>
> *Cc: *Dong Joon Hyun <dh...@hortonworks.com>, "dev@spark.apache.org" <
> dev@spark.apache.org>, Apache Spark PMC <priv...@spark.apache.org>
> *Subject: *Re: Use Apache ORC in Apache Spark 2.3
>
>
>
> Do you not use the catalog?
>
>
>
>
>
> On Thu, Aug 10, 2017 at 3:22 PM, Andrew Ash <and...@andrewash.com> wrote:
>
> I would support moving ORC from sql/hive -> sql/core because it brings me
> one step closer to eliminating Hive from my Spark distribution by removing
> -Phive at build time.
>
>
>
> On Thu, Aug 10, 2017 at 9:48 AM, Dong Joon Hyun <dh...@hortonworks.com>
> wrote:
>
> Thank you again for coming and reviewing this PR.
>
>
>
> So far, we discussed the followings.
>
>
>
> 1. `Why are we adding this to core? Why not just the hive module?` (@rxin)
>
>- `sql/core` module gives more benefit than `sql/hive`.
>
>- Apache ORC library (`no-hive` version) is a general and resonably
> small library designed for non-hive apps.
>
>
>
> 2. `Can we add smaller amount of new code to use this, too?` (@kiszk)
>
>- The previous #17980 , #17924, and #17943 are the complete examples
> containing this PR.
>
>- This PR is focusing on dependency only.
>
>
>
> 3. `Why don't we then create a separate orc module? Just copy a few of the
> files over?` (@rxin)
>
>-  Apache ORC library is the same with most of other data sources(CSV,
> JDBC, JSON, PARQUET, TEXT) which live inside `sql/core`
>
>- It's better to use as a library instead of copying ORC files because
> Apache ORC shaded jar has many files. We had better depend on Apache ORC
> community's effort until an unavoidable reason for copying occurs.
>
>
>
> 4. `I do worry in the future whether ORC would bring in a lot more jars`
> (@rxin)
>
>- The ORC core library's dependency tree is aggressively kept as small
> as possible. I've gone through and excluded unnecessary jars from our
> dependencies. I also kick back pull requests that add unnecessary new
> dependencies. (@omalley)
>
>
>
> 5. `In the long term, Spark should move to using only the vectorized
> reader in ORC's core” (@omalley)
>
> - Of course.
>
>
>
> I’ve been waiting for new comments and discussion since last week.
>
> Apparently, there is no further comments except the last comment(5) from
> Owen in this week.
>
>
>
> Please give your opinion if you think we need some change on the current
> PR (as-is).
>
> FYI, there is one LGTM on the PR (as-is) and no -1 so far.
>
>
>
> Thank you again for supporting new ORC improvement in Apache Spark.
>
>
>
> Bests,
>
> Dongjoon.
>
>
>
>
>
> *From: *Dong Joon Hyun <dh...@hortonworks.com>
> *Date: *Friday, August 4, 2017 at 8:05 AM
> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
> *Cc: *Apache Spark PMC <priv...@spark.apache.org>
> *Subject: *Use Apache ORC in Apache Spark 2.3
>
>
>
> Hi, All.
>
>
>
> Apache Spark always has been a fast and general engine, and
>
> supports Apache ORC inside `sql/hive` module with Hive dependency since
> Spark 1.4.X (SPARK-2883).
>
> However, there are many open issues about `Feature parity for ORC with
> Parquet (SPARK-20901)` as of today.
>
>
>
> With new Apache ORC 1.4 (released 8th May), Apache Spark is able to get
> the following benefits.
>
>
>
> - Usability:
>
> * Users can use `ORC` data sources without hive module (-Phive)
> like `Parquet` format.
>
>
>
> - Stability & Maintanability:
>
> * ORC 1.4 already has many fixes.
>
> * In the future, Spark can upgrade ORC library independently from
> Hive
>(similar to Parquet library, too)
>
> * Eventually, reduce the dependecy on old Hive 1.2.1.
>
>
>
> - Speed:
>
> * Last but not least, Spark can use both Spark `ColumnarBatch` and
> ORC `RowBatch` toget

Re: Use Apache ORC in Apache Spark 2.3

2017-08-10 Thread Andrew Ash
I would support moving ORC from sql/hive -> sql/core because it brings me
one step closer to eliminating Hive from my Spark distribution by removing
-Phive at build time.

On Thu, Aug 10, 2017 at 9:48 AM, Dong Joon Hyun 
wrote:

> Thank you again for coming and reviewing this PR.
>
>
>
> So far, we discussed the followings.
>
>
>
> 1. `Why are we adding this to core? Why not just the hive module?` (@rxin)
>
>- `sql/core` module gives more benefit than `sql/hive`.
>
>- Apache ORC library (`no-hive` version) is a general and resonably
> small library designed for non-hive apps.
>
>
>
> 2. `Can we add smaller amount of new code to use this, too?` (@kiszk)
>
>- The previous #17980 , #17924, and #17943 are the complete examples
> containing this PR.
>
>- This PR is focusing on dependency only.
>
>
>
> 3. `Why don't we then create a separate orc module? Just copy a few of the
> files over?` (@rxin)
>
>-  Apache ORC library is the same with most of other data sources(CSV,
> JDBC, JSON, PARQUET, TEXT) which live inside `sql/core`
>
>- It's better to use as a library instead of copying ORC files because
> Apache ORC shaded jar has many files. We had better depend on Apache ORC
> community's effort until an unavoidable reason for copying occurs.
>
>
>
> 4. `I do worry in the future whether ORC would bring in a lot more jars`
> (@rxin)
>
>- The ORC core library's dependency tree is aggressively kept as small
> as possible. I've gone through and excluded unnecessary jars from our
> dependencies. I also kick back pull requests that add unnecessary new
> dependencies. (@omalley)
>
>
>
> 5. `In the long term, Spark should move to using only the vectorized
> reader in ORC's core” (@omalley)
>
> - Of course.
>
>
>
> I’ve been waiting for new comments and discussion since last week.
>
> Apparently, there is no further comments except the last comment(5) from
> Owen in this week.
>
>
>
> Please give your opinion if you think we need some change on the current
> PR (as-is).
>
> FYI, there is one LGTM on the PR (as-is) and no -1 so far.
>
>
>
> Thank you again for supporting new ORC improvement in Apache Spark.
>
>
>
> Bests,
>
> Dongjoon.
>
>
>
>
>
> *From: *Dong Joon Hyun 
> *Date: *Friday, August 4, 2017 at 8:05 AM
> *To: *"dev@spark.apache.org" 
> *Cc: *Apache Spark PMC 
> *Subject: *Use Apache ORC in Apache Spark 2.3
>
>
>
> Hi, All.
>
>
>
> Apache Spark always has been a fast and general engine, and
>
> supports Apache ORC inside `sql/hive` module with Hive dependency since
> Spark 1.4.X (SPARK-2883).
>
> However, there are many open issues about `Feature parity for ORC with
> Parquet (SPARK-20901)` as of today.
>
>
>
> With new Apache ORC 1.4 (released 8th May), Apache Spark is able to get
> the following benefits.
>
>
>
> - Usability:
>
> * Users can use `ORC` data sources without hive module (-Phive)
> like `Parquet` format.
>
>
>
> - Stability & Maintanability:
>
> * ORC 1.4 already has many fixes.
>
> * In the future, Spark can upgrade ORC library independently from
> Hive
>(similar to Parquet library, too)
>
> * Eventually, reduce the dependecy on old Hive 1.2.1.
>
>
>
> - Speed:
>
> * Last but not least, Spark can use both Spark `ColumnarBatch` and
> ORC `RowBatch` together
>
>   which means full vectorization support.
>
>
>
> First of all, I'd love to improve Apache Spark in the following steps in
> the time frame of Spark 2.3.
>
>
>
> - SPARK-21422: Depend on Apache ORC 1.4.0
>
> - SPARK-20682: Add a new faster ORC data source based on Apache ORC
>
> - SPARK-20728: Make ORCFileFormat configurable between sql/hive and
> sql/core
>
> - SPARK-16060: Vectorized Orc Reader
>
>
>
> I’ve made above PRs since 9th May, the day after Apache ORC 1.4 release,
>
> but the PRs seems to need more attention of PMC since this is an important
> change.
>
> Since the discussion on Apache Spark 2.3 cadence is already started this
> week,
>
> I thought it’s a best time to ask you about this.
>
>
>
> Could anyone of you help me to proceed ORC improvement in Apache Spark
> community?
>
>
>
> Please visit the minimal PR and JIRA issue as a starter.
>
>
>
>- https://github.com/apache/spark/pull/18640
>- https://issues.apache.org/jira/browse/SPARK-21422
>
>
>
> Thank you in advance.
>
>
>
> Bests,
>
> Dongjoon Hyun.
>


Re: [VOTE] Apache Spark 2.2.0 (RC1)

2017-04-28 Thread Andrew Ash
-1 due to regression from 2.1.1

In 2.2.0-rc1 we bumped the Parquet version from 1.8.1 to 1.8.2 in commit
26a4cba3ff <https://github.com/apache/spark/commit/26a4cba3ff>.  Parquet
1.8.2 includes a backport from 1.9.0: PARQUET-389
<https://issues.apache.org/jira/browse/PARQUET-389> in commit 2282c22c
<https://github.com/apache/parquet-mr/commit/2282c22c>

This backport caused a regression in Spark, where filtering on columns
containing dots in the column name pushes the filter down into Parquet
where Parquet incorrectly handles the predicate.  Spark pushes the String
"col.dots" as the column name, but Parquet interprets this as
"struct.field" where the predicate is on a field of a struct.  The ultimate
result is that the predicate always returns zero results, causing a data
correctness issue.

This issue is filed in Spark as SPARK-20364
<https://issues.apache.org/jira/browse/SPARK-20364> and has a PR fix up at PR
#17680 <https://github.com/apache/spark/pull/17680>.

I nominate SPARK-20364 <https://issues.apache.org/jira/browse/SPARK-20364> as
a release blocker due to the data correctness regression.

Thanks!
Andrew

On Thu, Apr 27, 2017 at 6:49 PM, Sean Owen <so...@cloudera.com> wrote:

> By the way the RC looks good. Sigs and license are OK, tests pass with
> -Phive -Pyarn -Phadoop-2.7. +1 from me.
>
> On Thu, Apr 27, 2017 at 7:31 PM Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.2.0. The vote is open until Tues, May 2nd, 2017 at 12:00 PST and
>> passes if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 2.2.0
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is v2.2.0-rc1
>> <https://github.com/apache/spark/tree/v2.2.0-rc1> (8ccb4a57c82146c
>> 1a8f8966c7e64010cf5632cb6)
>>
>> List of JIRA tickets resolved can be found with this filter
>> <https://issues.apache.org/jira/browse/SPARK-20134?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.1>
>> .
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://home.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1235/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc1-docs/
>>
>>
>> *FAQ*
>>
>> *How can I help test this release?*
>>
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> *What should happen to JIRA tickets still targeting 2.2.0?*
>>
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.3.0 or 2.2.1.
>>
>> *But my bug isn't fixed!??!*
>>
>> In order to make timely releases, we will typically not hold the release
>> unless the bug in question is a regression from 2.1.1.
>>
>


Re: Scala left join with multiple columns Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.

2017-04-03 Thread Andrew Ray
You probably don't want null safe equals (<=>) with a left join.

On Mon, Apr 3, 2017 at 5:46 PM gjohnson35 
wrote:

> The join condition with && is throwing an exception:
>
>  val df = baseDF.join(mccDF, mccDF("medical_claim_id") <=>
> baseDF("medical_claim_id")
>   && mccDF("medical_claim_detail_id") <=>
> baseDF("medical_claim_detail_id"), "left")
>   .join(revCdDF, revCdDF("revenue_code_padded_str") <=>
> mccDF("mcc_code"), "left")
>   .select(baseDF("medical_claim_id"),
> baseDF("medical_claim_detail_id"),
> baseDF("revenue_code"), baseDF("rev_code_distinct_count"),
> baseDF("rtos_1_1_count"), baseDF("rtos_1_0_count"),
> baseDF("er_visit_flag"), baseDF("observation_stay_flag"),
> revCdDF("rtos_2_code"), revCdDF("rtos_2_hierarchy"))
>   .where(revCdDF("rtos_2_code").between(8, 27).isNotNull)
>   .groupBy(
> baseDF("medical_claim_id"),
> baseDF("medical_claim_detail_id")
>   )
>   .agg(min(revCdDF("rtos_2_code").alias("min_rtos_2_8_thru_27")),
> min(revCdDF("rtos_2_hierarchy").alias("min_rtos_2_8_thru_27_hier")))
>
>
> This query runs fine:
>
> val df = baseDF.join(mccDF, mccDF("medical_claim_id") <=>
> baseDF("medical_claim_id"), "left")
> .join(mccDF, mccDF("medical_claim_detail_id") <=>
> baseDF("medical_claim_detail_id"), "left")
>   .join(revCdDF, revCdDF("revenue_code_padded_str") <=>
> mccDF("mcc_code"), "left")
>   .select(baseDF("medical_claim_id"),
> baseDF("medical_claim_detail_id"),
> baseDF("revenue_code"), baseDF("rev_code_distinct_count"),
> baseDF("rtos_1_1_count"), baseDF("rtos_1_0_count"),
> baseDF("er_visit_flag"), baseDF("observation_stay_flag"),
> revCdDF("rtos_2_code"), revCdDF("rtos_2_hierarchy"))
>   .where(revCdDF("rtos_2_code").between(8, 27).isNotNull)
>   .groupBy(
> baseDF("medical_claim_id"),
> baseDF("medical_claim_detail_id")
>   )
>   .agg(min(revCdDF("rtos_2_code").alias("min_rtos_2_8_thru_27")),
> min(revCdDF("rtos_2_hierarchy").alias("min_rtos_2_8_thru_27_hier")))
>
> If I remove the multiple Columns in the join and create a join statement
> for
> each one then the exception goes away.  Is there a better way to join
> multiple columns?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Scala-left-join-with-multiple-columns-Join-condition-is-missing-or-trivial-Use-the-CROSS-JOIN-syntax-tp21297.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Broadcast big dataset

2016-09-28 Thread Andrew Duffy
Have you tried upping executor memory? There's a separate spark conf for that: 
spark.executor.memory
In general driver configurations don't automatically apply to executors.





On Wed, Sep 28, 2016 at 7:03 AM -0700, "WangJianfei" 
 wrote:










Hi Devs
 In my application, i just broadcast a dataset(about 500M) to  the
ececutors(100+), I got a java heap error
Jmartad-7219.hadoop.jd.local:53591 (size: 4.0 MB, free: 3.3 GB)
16/09/28 15:56:48 INFO BlockManagerInfo: Added broadcast_9_piece19 in memory
on BJHC-Jmartad-9012.hadoop.jd.local:53197 (size: 4.0 MB, free: 3.3 GB)
16/09/28 15:56:49 INFO BlockManagerInfo: Added broadcast_9_piece8 in memory
on BJHC-Jmartad-84101.hadoop.jd.local:52044 (size: 4.0 MB, free: 3.3 GB)
16/09/28 15:56:58 INFO BlockManagerInfo: Removed broadcast_8_piece0 on
172.22.176.114:37438 in memory (size: 2.7 KB, free: 3.1 GB)
16/09/28 15:56:58 WARN TaskSetManager: Lost task 125.0 in stage 7.0 (TID
130, BJHC-Jmartad-9376.hadoop.jd.local): java.lang.OutOfMemoryError: Java
heap space
at 
java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3465)
at
java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3271)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)

My configuration is 4G memory in driver.  Any advice is appreciated.
Thank you!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Broadcast-big-dataset-tp19127.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org








Re: What's the use of RangePartitioner.hashCode

2016-09-21 Thread Andrew Duffy
Pedantic note about hashCode and equals: the equality doesn't need to be 
bidirectional, you just need to ensure that a.hashCode == b.hashCode when 
a.equals(b), the bidirectional case is usually harder to satisfy due to 
possibility of collisions.

Good info: 
http://www.programcreek.com/2011/07/java-equals-and-hashcode-contract/
_
From: Jakob Odersky 
Sent: Wednesday, September 21, 2016 15:12
Subject: Re: What's the use of RangePartitioner.hashCode
To: WangJianfei 
Cc: dev 


Hi,
It is used jointly with a custom implementation of the `equals`
method. In Scala, you can override the `equals` method to change the
behaviour of `==` comparison. On example of this would be to compare
classes based on their parameter values (i.e. what case classes do).
Partitioners aren't case classes however it makes sense to have a
value comparison between them (see RDD.subtract for an example) and
hence they redefine the equals method.
When redefining an equals method, it is good practice to also redefine
the hashCode method so that `a == b` iff `a.hashCode == b.hashCode`
(e.g. this is useful when your objects will be stored in a hash map).
You can learn more about redefining the equals method and hashcodes
here 
https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch04s16.html


regards,
--Jakob

On Thu, Sep 15, 2016 at 6:17 PM, WangJianfei
 wrote:
> who can give me an example of the use of RangePartitioner.hashCode, thank
> you!
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/What-s-the-use-of-RangePartitioner-hashCode-tp18953.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org






master snapshots not publishing?

2016-07-21 Thread Andrew Duffy
I’m trying to use a Snapshot build off of master, and after looking through
Jenkins it appears that the last commit where the snapshot was built is
back on 757dc2c09d23400dacac22e51f52062bbe471136, 22 days ago:
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/



Looking at the Jenkins page it says that the master-maven build is
disabled, is this purposeful?



-Andrew


Re: [DISCUSS] Removing or changing maintainer process

2016-05-19 Thread Andrew Or
+1, some maintainers are hard to find

2016-05-19 9:03 GMT-07:00 Imran Rashid :

> +1 (binding) on removal of maintainers
>
> I dont' have a strong opinion yet on how to have a system for finding the
> right reviewers.  I agree it would be nice to have something to help you
> find reviewers, though I'm a little skeptical of anything automatic.
>
> On Thu, May 19, 2016 at 10:34 AM, Matei Zaharia 
> wrote:
>
>> Hi folks,
>>
>> Around 1.5 years ago, Spark added a maintainer process for reviewing API
>> and architectural changes (
>> https://cwiki.apache.org/confluence/display/SPARK/Committers#Committers-ReviewProcessandMaintainers)
>> to make sure these are seen by people who spent a lot of time on that
>> component. At the time, the worry was that changes might go unnoticed as
>> the project grows, but there were also concerns that this approach makes
>> the project harder to contribute to and less welcoming. Since implementing
>> the model, I think that a good number of developers concluded it doesn't
>> make a huge difference, so because of these concerns, it may be useful to
>> remove it. I've also heard that we should try to keep some other
>> instructions for contributors to find the "right" reviewers, so it would be
>> great to see suggestions on that. For my part, I'd personally prefer
>> something "automatic", such as easily tracking who reviewed each patch and
>> having people look at the commit history of the module they want to work
>> on, instead of a list that needs to be maintained separately.
>>
>> Matei
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: HDFS as Shuffle Service

2016-04-28 Thread Andrew Ray
Yes, HDFS has serious problems with creating lots of files. But we can
always just create a single merged file on HDFS per task.
On Apr 28, 2016 11:17 AM, "Reynold Xin"  wrote:

Hm while this is an attractive idea in theory, in practice I think you are
substantially overestimating HDFS' ability to handle a lot of small,
ephemeral files. It has never really been optimized for that use case.

On Thu, Apr 28, 2016 at 11:15 AM, Michael Gummelt 
wrote:

> > if after a work-load burst your cluster dynamically changes from 1
> workers to 1000, will the typical HDFS replication factor be sufficient to
> retain access to the shuffle files in HDFS
>
> HDFS isn't resizing.  Spark is.  HDFS files should be HA and durable.
>
> On Thu, Apr 28, 2016 at 11:08 AM, Mark Hamstra 
> wrote:
>
>> Yes, replicated and distributed shuffle materializations are key
>> requirement to maintain performance in a fully elastic cluster where
>> Executors aren't just reallocated across an essentially fixed number of
>> Worker nodes, but rather the number of Workers itself is dynamic.
>> Retaining the file interface to those shuffle materializations while also
>> using HDFS for the spark.local.dirs has a certain amount of attraction, but
>> I also wonder whether a typical HDFS deployment is really sufficient to
>> handle this kind of elastic cluster scaling.  For instance and assuming
>> HDFS co-located on worker nodes, if after a work-load burst your cluster
>> dynamically changes from 1 workers to 1000, will the typical HDFS
>> replication factor be sufficient to retain access to the shuffle files in
>> HDFS, or will we instead be seeing numerous FetchFailure exceptions, Tasks
>> recomputed or Stages aborted, etc. so that the net effect is not all that
>> much different than if the shuffle files had not been relocated to HDFS and
>> the Executors or ShuffleService instances had just disappeared along with
>> the worker nodes?
>>
>> On Thu, Apr 28, 2016 at 10:46 AM, Michael Gummelt > > wrote:
>>
>>> > Why would you run the shuffle service on 10K nodes but Spark executors
>>> on just 100 nodes? wouldn't you also run that service just on the 100
>>> nodes?
>>>
>>> We have to start the service beforehand, out of band, and we don't know
>>> a priori where the Spark executors will land.  Those 100 executors could
>>> land on any of the 10K nodes.
>>>
>>> > What does plumbing it through HDFS buy you in comparison?
>>>
>>> It drops the shuffle service requirement, which is HUGE.  It means Spark
>>> can completely vacate the machine when it's not in use, which is crucial
>>> for a large, multi-tenant cluster.  ShuffledRDDs can now read the map files
>>> from HDFS, rather than the ancestor executors, which means we can shut
>>> executors down immediately after the shuffle files are written.
>>>
>>> > There's some additional overhead and if anything you lose some control
>>> over locality, in a context where I presume HDFS itself is storing data on
>>> much more than the 100 Spark nodes.
>>>
>>> Write locality would be sacrificed, but the descendent executors were
>>> already doing a remote read (they have to read from multiple ancestor
>>> executors), so there's no additional cost in read locality.  In fact, if we
>>> take advantage of HDFS's favored node feature, we could make it likely that
>>> all map files for a given partition land on the same node, so the
>>> descendent executor would never have to do a remote read!  We'd effectively
>>> shift the remote IO from read side to write side, for theoretically no
>>> change in performance.
>>>
>>> In summary:
>>>
>>> Advantages:
>>> - No shuffle service dependency (increased utilization, decreased
>>> management cost)
>>> - Shut executors down immediately after shuffle files are written,
>>> rather than waiting for a timeout (increased utilization)
>>> - HDFS is HA, so shuffle files survive a node failure, which isn't true
>>> for the shuffle service (decreased latency during failures)
>>> - Potential ability to parallelize shuffle file reads if we write a new
>>> shuffle iterator (decreased latency)
>>>
>>> Disadvantages
>>> - Increased write latency (but potentially not if we implement it
>>> efficiently.  See above).
>>> - Would need some sort of GC on HDFS shuffle files
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 28, 2016 at 1:36 AM, Sean Owen  wrote:
>>>
 Why would you run the shuffle service on 10K nodes but Spark executors
 on just 100 nodes? wouldn't you also run that service just on the 100
 nodes?

 What does plumbing it through HDFS buy you in comparison? There's some
 additional overhead and if anything you lose some control over
 locality, in a context where I presume HDFS itself is storing data on
 much more than the 100 Spark nodes.

 On Thu, Apr 28, 2016 at 1:34 AM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:
 >> 

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

2016-04-18 Thread Andrew Ray
While you can't automatically push the limit *through* the join, we could
push it *into* the join (stop processing after generating 10 records). I
believe that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
hvanhov...@questtec.nl> wrote:

> I am not sure if you can push a limit through a join. This becomes
> problematic if not all keys are present on both sides; in such a case a
> limit can produce fewer rows than the set limit.
>
> This might be a rare case in which whole stage codegen is slower, due to
> the fact that we need to buffer the result of such a stage. You could try
> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>
> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan :
>
>> Hi,
>>
>> I ran the following query in spark (latest master codebase) and it took a
>> lot of time to complete even though it was a broadcast hash join.
>>
>> It appears that limit computation is done only after computing complete
>> join condition.  Shouldn't the limit condition be pushed to
>> BroadcastHashJoin (wherein it would have to stop processing after
>> generating 10 rows?).  Please let me know if my understanding on this is
>> wrong.
>>
>>
>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
>> 10;
>>
>> 
>> | == Physical Plan ==
>> CollectLimit 10
>> +- WholeStageCodegen
>>:  +- Project [l_partkey#893]
>>: +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>> BuildRight, None
>>::- Project [l_partkey#893]
>>::  +- Filter isnotnull(l_partkey#893)
>>:: +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct
>>:+- INPUT
>>+- BroadcastExchange
>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>> bigint)),List(ps_partkey#908))
>>   +- WholeStageCodegen
>>  :  +- Project [ps_partkey#908]
>>  : +- Filter isnotnull(ps_partkey#908)
>>  :+- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct
>>  |
>> 
>>
>>
>>
>>
>> --
>> ~Rajesh.B
>>
>
>


Re: [discuss] ending support for Java 7 in Spark 2.0

2016-03-25 Thread Andrew Ray
+1 on removing Java 7 and Scala 2.10 support.

It looks to be entirely possible to support Java 8 containers in a YARN
cluster otherwise running Java 7 (example code for alt JAVA_HOME
https://issues.apache.org/jira/secure/attachment/12671739/YARN-1964.patch)
so really there should be no big problem. Even if that somehow doesn't work
I'm still +1 as the benefits are so large.

I'd also like to point out that it is completely trivial to have multiple
versions of Spark running concurrently on a YARN cluster. At my previous
(extremely large) employer we had almost every release since 1.0 installed,
with the latest being default and production apps pinned to a specific
version. So if you want to keep using some Scala 2.10 only library or just
don't want to migrate to Java 8, feel free to continue using Spark 1.x for
those applications.

IMHO we need to move on from EOL stuff to make room for the future (Java 9,
Scala 2.12) and Spark 2.0 is the only chance we are going to have to do so
for a long time.

--Andrew

On Thu, Mar 24, 2016 at 10:55 PM, Mridul Muralidharan <mri...@gmail.com>
wrote:

>
> I do agree w.r.t scala 2.10 as well; similar arguments apply (though there
> is a nuanced diff - source compatibility for scala vs binary compatibility
> wrt Java)
> Was there a proposal which did not go through ? Not sure if I missed it.
>
> Regards
> Mridul
>
>
> On Thursday, March 24, 2016, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i think that logic is reasonable, but then the same should also apply to
>> scala 2.10, which is also unmaintained/unsupported at this point (basically
>> has been since march 2015 except for one hotfix due to a license
>> incompatibility)
>>
>> who wants to support scala 2.10 three years after they did the last
>> maintenance release?
>>
>>
>> On Thu, Mar 24, 2016 at 9:59 PM, Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>>> Removing compatibility (with jdk, etc) can be done with a major release-
>>> given that 7 has been EOLed a while back and is now unsupported, we have to
>>> decide if we drop support for it in 2.0 or 3.0 (2+ years from now).
>>>
>>> Given the functionality & performance benefits of going to jdk8, future
>>> enhancements relevant in 2.x timeframe ( scala, dependencies) which
>>> requires it, and simplicity wrt code, test & support it looks like a good
>>> checkpoint to drop jdk7 support.
>>>
>>> As already mentioned in the thread, existing yarn clusters are
>>> unaffected if they want to continue running jdk7 and yet use
>>> spark2 (install jdk8 on all nodes and use it via JAVA_HOME, or worst case
>>> distribute jdk8 as archive - suboptimal).
>>> I am unsure about mesos (standalone might be easier upgrade I guess ?).
>>>
>>>
>>> Proposal is for 1.6x line to continue to be supported with critical
>>> fixes; newer features will require 2.x and so jdk8
>>>
>>> Regards
>>> Mridul
>>>
>>>
>>> On Thursday, March 24, 2016, Marcelo Vanzin <van...@cloudera.com> wrote:
>>>
>>>> On Thu, Mar 24, 2016 at 4:50 PM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>> > If you want to go down that route, you should also ask somebody who
>>>> has had
>>>> > experience managing a large organization's applications and try to
>>>> update
>>>> > Scala version.
>>>>
>>>> I understand both sides. But if you look at what I've been asking
>>>> since the beginning, it's all about the cost and benefits of dropping
>>>> support for java 1.7.
>>>>
>>>> The biggest argument in your original e-mail is about testing. And the
>>>> testing cost is much bigger for supporting scala 2.10 than it is for
>>>> supporting java 1.7. If you read one of my earlier replies, it should
>>>> be even possible to just do everything in a single job - compile for
>>>> java 7 and still be able to test things in 1.8, including lambdas,
>>>> which seems to be the main thing you were worried about.
>>>>
>>>>
>>>> > On Thu, Mar 24, 2016 at 4:48 PM, Marcelo Vanzin <van...@cloudera.com>
>>>> wrote:
>>>> >>
>>>> >> On Thu, Mar 24, 2016 at 4:46 PM, Reynold Xin <r...@databricks.com>
>>>> wrote:
>>>> >> > Actually it's *way* harder to upgrade Scala from 2.10 to 2.11, than
>>>> >> > upgrading the JVM runtime from 7 to 8, because Scala 2.10 and 2.11
>>>> are
>>>> >> > not
>>>> >> > binary compatible, whereas JVM 7 and 8 are binary compatible except
>>>> >> > certain
>>>> >> > esoteric cases.
>>>> >>
>>>> >> True, but ask anyone who manages a large cluster how long it would
>>>> >> take them to upgrade the jdk across their cluster and validate all
>>>> >> their applications and everything... binary compatibility is a tiny
>>>> >> drop in that bucket.
>>>> >>
>>>> >> --
>>>> >> Marcelo
>>>> >
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Marcelo
>>>>
>>>> -
>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>>
>>>>
>>


Re: [discuss] ending support for Java 7 in Spark 2.0

2016-03-24 Thread Andrew Ash
Spark 2.x has to be the time for Java 8.

I'd rather increase JVM major version on a Spark major version than on a
Spark minor version, and I'd rather Spark do that upgrade for the 2.x
series than the 3.x series (~2yr from now based on the lifetime of Spark
1.x).  If we wait until the next opportunity for a breaking change to Spark
(3.x) we might be upgrading to Java 9 at that point rather than Java 8.

If Spark users need Java 7 they are free to continue using the 1.x series,
the same way that folks who need Java 6 are free to continue using 1.4

On Thu, Mar 24, 2016 at 11:46 AM, Stephen Boesch  wrote:

> +1 for java8 only   +1 for 2.11+ only .At this point scala libraries
> supporting only 2.10 are typically less active and/or poorly maintained.
> That trend will only continue when considering the lifespan of spark 2.X.
>
> 2016-03-24 11:32 GMT-07:00 Steve Loughran :
>
>>
>> On 24 Mar 2016, at 15:27, Koert Kuipers  wrote:
>>
>> i think the arguments are convincing, but it also makes me wonder if i
>> live in some kind of alternate universe... we deploy on customers clusters,
>> where the OS, python version, java version and hadoop distro are not chosen
>> by us. so think centos 6, cdh5 or hdp 2.3, java 7 and python 2.6. we simply
>> have access to a single proxy machine and launch through yarn. asking them
>> to upgrade java is pretty much out of the question or a 6+ month ordeal. of
>> the 10 client clusters i can think of on the top of my head all of them are
>> on java 7, none are on java 8. so by doing this you would make spark 2
>> basically unusable for us (unless most of them have plans of upgrading in
>> near term to java 8, i will ask around and report back...).
>>
>>
>>
>> It's not actually mandatory for the process executing in the Yarn cluster
>> to run with the same JVM as the rest of the Hadoop stack; all that is
>> needed is for the environment variables to set up the JAVA_HOME and PATH.
>> Switching JVMs not something which YARN makes it easy to do, but it may be
>> possible, especially if Spark itself provides some hooks, so you don't have
>> to manually lay with setting things up. That may be something which could
>> significantly ease adoption of Spark 2 in YARN clusters. Same for Python.
>>
>> This is something I could probably help others to address
>>
>>
>


Re: java.lang.OutOfMemoryError: Unable to acquire bytes of memory

2016-03-21 Thread Andrew Or
@Nezih, can you try again after setting `spark.memory.useLegacyMode` to
true? Can you still reproduce the OOM that way?

2016-03-21 10:29 GMT-07:00 Nezih Yigitbasi :

> Hi Spark devs,
> I am using 1.6.0 with dynamic allocation on yarn. I am trying to run a
> relatively big application with 10s of jobs and 100K+ tasks and my app
> fails with the exception below. The closest jira issue I could find is
> SPARK-11293 , which is
> a critical bug that has been open for a long time. There are other similar
> jira issues (all fixed): SPARK-10474
> , SPARK-10733
> , SPARK-10309
> , SPARK-10379
> .
>
> Any workarounds to this issue or any plans to fix it?
>
> Thanks a lot,
> Nezih
>
> 16/03/19 05:12:09 INFO memory.TaskMemoryManager: Memory used in task 
> 4687016/03/19 05:12:09 INFO memory.TaskMemoryManager: Acquired by 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@1c36f801: 32.0 KB16/03/19 
> 05:12:09 INFO memory.TaskMemoryManager: 1512915599 bytes of memory were used 
> by task 46870 but are not associated with specific consumers16/03/19 05:12:09 
> INFO memory.TaskMemoryManager: 1512948367 bytes of memory are used for 
> execution and 156978343 bytes of memory are used for storage16/03/19 05:12:09 
> ERROR executor.Executor: Managed memory leak detected; size = 1512915599 
> bytes, TID = 4687016/03/19 05:12:09 ERROR executor.Executor: Exception in 
> task 77.0 in stage 273.0 (TID 46870)
> java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:09 ERROR 
> util.SparkUncaughtExceptionHandler: Uncaught exception in thread 
> Thread[Executor task launch worker-8,5,main]
> java.lang.OutOfMemoryError: Unable to acquire 128 bytes of memory, got 0
> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:354)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:375)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)16/03/19 05:12:10 INFO 
> storage.DiskBlockManager: Shutdown hook called16/03/19 05:12:10 INFO 
> util.ShutdownHookManager: Shutdown hook called
>
> ​
>


Re: [VOTE] Release Apache Spark 1.6.1 (RC1)

2016-03-08 Thread Andrew Or
+1

2016-03-08 10:59 GMT-08:00 Yin Huai :

> +1
>
> On Mon, Mar 7, 2016 at 12:39 PM, Reynold Xin  wrote:
>
>> +1 (binding)
>>
>>
>> On Sun, Mar 6, 2016 at 12:08 PM, Egor Pahomov 
>> wrote:
>>
>>> +1
>>>
>>> Spark ODBC server is fine, SQL is fine.
>>>
>>> 2016-03-03 12:09 GMT-08:00 Yin Yang :
>>>
 Skipping docker tests, the rest are green:

 [INFO] Spark Project External Kafka ... SUCCESS
 [01:28 min]
 [INFO] Spark Project Examples . SUCCESS
 [02:59 min]
 [INFO] Spark Project External Kafka Assembly .. SUCCESS [
 11.680 s]
 [INFO]
 
 [INFO] BUILD SUCCESS
 [INFO]
 
 [INFO] Total time: 02:16 h
 [INFO] Finished at: 2016-03-03T11:17:07-08:00
 [INFO] Final Memory: 152M/4062M

 On Thu, Mar 3, 2016 at 8:55 AM, Yin Yang  wrote:

> When I ran test suite using the following command:
>
> build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6
> -Dhadoop.version=2.7.0 package
>
> I got failure in Spark Project Docker Integration Tests :
>
> 16/03/02 17:36:46 INFO RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
> ^[[31m*** RUN ABORTED ***^[[0m
> ^[[31m  com.spotify.docker.client.DockerException:
> java.util.concurrent.ExecutionException:
> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
> java.io.IOException: No such file or directory^[[0m
> ^[[31m  at
> com.spotify.docker.client.DefaultDockerClient.propagate(DefaultDockerClient.java:1141)^[[0m
> ^[[31m  at
> com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1082)^[[0m
> ^[[31m  at
> com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
> ^[[31m  at
> org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
> ^[[31m  at
> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
> ^[[31m  at
> org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1492)^[[0m
> ^[[31m  at
> org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1528)^[[0m
> ^[[31m  ...^[[0m
> ^[[31m  Cause: java.util.concurrent.ExecutionException:
> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
> java.io.IOException: No such file or directory^[[0m
> ^[[31m  at
> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)^[[0m
> ^[[31m  at
> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)^[[0m
> ^[[31m  at
> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)^[[0m
> ^[[31m  at
> com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1080)^[[0m
> ^[[31m  at
> com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
> ^[[31m  at
> org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
> ^[[31m  at
> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
> ^[[31m  at
> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
> ^[[31m  ...^[[0m
> ^[[31m  Cause:
> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
> java.io.IOException: No such file or directory^[[0m
> ^[[31m  at
> org.glassfish.jersey.apache.connector.ApacheConnector.apply(ApacheConnector.java:481)^[[0m
> ^[[31m  at
> org.glassfish.jersey.apache.connector.ApacheConnector$1.run(ApacheConnector.java:491)^[[0m
> ^[[31m  at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)^[[0m
> ^[[31m  at
> java.util.concurrent.FutureTask.run(FutureTask.java:262)^[[0m
> ^[[31m  at
> 

Re: Welcoming two new committers

2016-02-08 Thread Andrew Or
Welcome!

2016-02-08 10:55 GMT-08:00 Bhupendra Mishra :

> Congratulations to both. and welcome to group.
>
> On Mon, Feb 8, 2016 at 10:45 PM, Matei Zaharia 
> wrote:
>
>> Hi all,
>>
>> The PMC has recently added two new Spark committers -- Herman van Hovell
>> and Wenchen Fan. Both have been heavily involved in Spark SQL and Tungsten,
>> adding new features, optimizations and APIs. Please join me in welcoming
>> Herman and Wenchen.
>>
>> Matei
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Spark 1.6: Why Including hive-jdbc in assembly when -Phive-provided is set?

2016-02-03 Thread Andrew Lee
Hi All,


I have a question regarding the hive-jdbc library that is being included in the 
assembly JAR.


Build command.

mvn -U -X -Phadoop-2.6 -Phadoop-provided -Phive-provided -Pyarn 
-Phive-thriftserver -Psparkr -DskipTests install


In the pom.xml file, the scope for hive JARs are set to 'compile', however, 
there is one entry

https://github.com/apache/spark/blob/branch-1.6/pom.xml#L1414

[https://avatars1.githubusercontent.com/u/47359?v=3=400]

apache/spark
github.com
spark - Mirror of Apache Spark




that includes it again.


The assembly JAR shows the following content with 'jar tf'.


org/apache/hive/
org/apache/hive/jdbc/
org/apache/hive/jdbc/HiveDatabaseMetaData.class
org/apache/hive/jdbc/ZooKeeperHiveClientHelper.class
org/apache/hive/jdbc/ZooKeeperHiveClientHelper$DummyWatcher.class
org/apache/hive/jdbc/HiveQueryResultSet$Builder.class
org/apache/hive/jdbc/HiveResultSetMetaData.class
org/apache/hive/jdbc/HivePreparedStatement.class
org/apache/hive/jdbc/HiveStatement$1.class
org/apache/hive/jdbc/JdbcUriParseException.class
org/apache/hive/jdbc/HiveDataSource.class
org/apache/hive/jdbc/HttpBasicAuthInterceptor.class
org/apache/hive/jdbc/JdbcColumn.class
org/apache/hive/jdbc/Utils$JdbcConnectionParams.class
org/apache/hive/jdbc/HiveMetaDataResultSet.class
org/apache/hive/jdbc/HiveDriver.class
org/apache/hive/jdbc/JdbcTable.class
org/apache/hive/jdbc/HiveBaseResultSet.class
org/apache/hive/jdbc/HiveDatabaseMetaData$GetTablesComparator.class
org/apache/hive/jdbc/HiveDatabaseMetaData$1.class
org/apache/hive/jdbc/HiveStatement.class
org/apache/hive/jdbc/ZooKeeperHiveClientException.class
org/apache/hive/jdbc/HiveQueryResultSet$1.class
org/apache/hive/jdbc/Utils.class
org/apache/hive/jdbc/HiveConnection$1.class
org/apache/hive/jdbc/JdbcColumn$1.class
org/apache/hive/jdbc/HiveBaseResultSet$1.class
org/apache/hive/jdbc/HttpKerberosRequestInterceptor.class
org/apache/hive/jdbc/JdbcColumnAttributes.class
org/apache/hive/jdbc/HiveCallableStatement.class
org/apache/hive/jdbc/HiveDatabaseMetaData$GetColumnsComparator.class
org/apache/hive/jdbc/ClosedOrCancelledStatementException.class
org/apache/hive/jdbc/HiveQueryResultSet.class
org/apache/hive/jdbc/HttpRequestInterceptorBase.class
org/apache/hive/jdbc/HiveConnection.class
org/apache/hive/service/
org/apache/hive/service/server/
org/apache/hive/service/server/HiveServerServerOptionsProcessor.class


Would like to know why this is there and can we remove that? and link the 
hive-jdbc during runtime?






Re: [VOTE] Release Apache Spark 1.6.0 (RC4)

2015-12-22 Thread Andrew Or
+1

2015-12-22 12:43 GMT-08:00 Reynold Xin :

> +1
>
>
> On Tue, Dec 22, 2015 at 12:29 PM, Michael Armbrust  > wrote:
>
>> I'll kick the voting off with a +1.
>>
>> On Tue, Dec 22, 2015 at 12:10 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Please vote on releasing the following candidate as Apache Spark version
>>> 1.6.0!
>>>
>>> The vote is open until Friday, December 25, 2015 at 18:00 UTC and
>>> passes if a majority of at least 3 +1 PMC votes are cast.
>>>
>>> [ ] +1 Release this package as Apache Spark 1.6.0
>>> [ ] -1 Do not release this package because ...
>>>
>>> To learn more about Apache Spark, please see http://spark.apache.org/
>>>
>>> The tag to be voted on is *v1.6.0-rc4
>>> (4062cda3087ae42c6c3cb24508fc1d3a931accdf)
>>> *
>>>
>>> The release files, including signatures, digests, etc. can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-1.6.0-rc4-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://people.apache.org/keys/committer/pwendell.asc
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1176/
>>>
>>> The test repository (versioned as v1.6.0-rc4) for this release can be
>>> found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1175/
>>>
>>> The documentation corresponding to this release can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-1.6.0-rc4-docs/
>>>
>>> ===
>>> == How can I help test this release? ==
>>> ===
>>> If you are a Spark user, you can help us test this release by taking an
>>> existing Spark workload and running on this release candidate, then
>>> reporting any regressions.
>>>
>>> 
>>> == What justifies a -1 vote for this release? ==
>>> 
>>> This vote is happening towards the end of the 1.6 QA period, so -1 votes
>>> should only occur for significant regressions from 1.5. Bugs already
>>> present in 1.5, minor regressions, or bugs related to new features will not
>>> block this release.
>>>
>>> ===
>>> == What should happen to JIRA tickets still targeting 1.6.0? ==
>>> ===
>>> 1. It is OK for documentation patches to target 1.6.0 and still go into
>>> branch-1.6, since documentations will be published separately from the
>>> release.
>>> 2. New features for non-alpha-modules should target 1.7+.
>>> 3. Non-blocker bug fixes should target 1.6.1 or 1.7.0, or drop the
>>> target version.
>>>
>>>
>>> ==
>>> == Major changes to help you focus your testing ==
>>> ==
>>>
>>> Notable changes since 1.6 RC3
>>>
>>>   - SPARK-12404 - Fix serialization error for Datasets with
>>> Timestamps/Arrays/Decimal
>>>   - SPARK-12218 - Fix incorrect pushdown of filters to parquet
>>>   - SPARK-12395 - Fix join columns of outer join for DataFrame using
>>>   - SPARK-12413 - Fix mesos HA
>>>
>>> Notable changes since 1.6 RC2
>>> - SPARK_VERSION has been set correctly
>>> - SPARK-12199 ML Docs are publishing correctly
>>> - SPARK-12345 Mesos cluster mode has been fixed
>>>
>>> Notable changes since 1.6 RC1
>>> Spark Streaming
>>>
>>>- SPARK-2629  
>>>trackStateByKey has been renamed to mapWithState
>>>
>>> Spark SQL
>>>
>>>- SPARK-12165 
>>>SPARK-12189  Fix
>>>bugs in eviction of storage memory by execution.
>>>- SPARK-12258  correct
>>>passing null into ScalaUDF
>>>
>>> Notable Features Since 1.5Spark SQL
>>>
>>>- SPARK-11787  Parquet
>>>Performance - Improve Parquet scan performance when using flat
>>>schemas.
>>>- SPARK-10810 
>>>Session Management - Isolated devault database (i.e USE mydb) even
>>>on shared clusters.
>>>- SPARK-   Dataset
>>>API - A type-safe API (similar to RDDs) that performs many
>>>operations on serialized binary data and code generation (i.e. Project
>>>Tungsten).
>>>- SPARK-1  Unified
>>>Memory Management - Shared memory for execution and caching instead
>>>of exclusive division of the regions.
>>>- SPARK-11197  SQL
>>>Queries on Files - Concise syntax 

Re: [VOTE] Release Apache Spark 1.6.0 (RC2)

2015-12-14 Thread Andrew Or
+1

Ran PageRank on standalone mode with 4 nodes and noticed a speedup after
the specific commits that were in RC2 but not RC1:

c247b6a Dec 10 [SPARK-12155][SPARK-12253] Fix executor OOM in unified
memory management
05e441e Dec 9 [SPARK-12165][SPARK-12189] Fix bugs in eviction of storage
memory by execution

Also jobs that triggered these issues now run successfully.


2015-12-14 10:45 GMT-08:00 Reynold Xin :

> +1
>
> Tested some dataframe operations on my Mac.
>
>
> On Saturday, December 12, 2015, Michael Armbrust 
> wrote:
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 1.6.0!
>>
>> The vote is open until Tuesday, December 15, 2015 at 6:00 UTC and passes
>> if a majority of at least 3 +1 PMC votes are cast.
>>
>> [ ] +1 Release this package as Apache Spark 1.6.0
>> [ ] -1 Do not release this package because ...
>>
>> To learn more about Apache Spark, please see http://spark.apache.org/
>>
>> The tag to be voted on is *v1.6.0-rc2
>> (23f8dfd45187cb8f2216328ab907ddb5fbdffd0b)
>> *
>>
>> The release files, including signatures, digests, etc. can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-1.6.0-rc2-bin/
>>
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1169/
>>
>> The test repository (versioned as v1.6.0-rc2) for this release can be
>> found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1168/
>>
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-1.6.0-rc2-docs/
>>
>> ===
>> == How can I help test this release? ==
>> ===
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> 
>> == What justifies a -1 vote for this release? ==
>> 
>> This vote is happening towards the end of the 1.6 QA period, so -1 votes
>> should only occur for significant regressions from 1.5. Bugs already
>> present in 1.5, minor regressions, or bugs related to new features will not
>> block this release.
>>
>> ===
>> == What should happen to JIRA tickets still targeting 1.6.0? ==
>> ===
>> 1. It is OK for documentation patches to target 1.6.0 and still go into
>> branch-1.6, since documentations will be published separately from the
>> release.
>> 2. New features for non-alpha-modules should target 1.7+.
>> 3. Non-blocker bug fixes should target 1.6.1 or 1.7.0, or drop the target
>> version.
>>
>>
>> ==
>> == Major changes to help you focus your testing ==
>> ==
>>
>> Spark 1.6.0 PreviewNotable changes since 1.6 RC1Spark Streaming
>>
>>- SPARK-2629  
>>trackStateByKey has been renamed to mapWithState
>>
>> Spark SQL
>>
>>- SPARK-12165 
>>SPARK-12189  Fix
>>bugs in eviction of storage memory by execution.
>>- SPARK-12258  correct
>>passing null into ScalaUDF
>>
>> Notable Features Since 1.5Spark SQL
>>
>>- SPARK-11787  Parquet
>>Performance - Improve Parquet scan performance when using flat
>>schemas.
>>- SPARK-10810 
>>Session Management - Isolated devault database (i.e USE mydb) even on
>>shared clusters.
>>- SPARK-   Dataset
>>API - A type-safe API (similar to RDDs) that performs many operations
>>on serialized binary data and code generation (i.e. Project Tungsten).
>>- SPARK-1  Unified
>>Memory Management - Shared memory for execution and caching instead
>>of exclusive division of the regions.
>>- SPARK-11197  SQL
>>Queries on Files - Concise syntax for running SQL queries over files
>>of any supported format without registering a table.
>>- SPARK-11745  Reading
>>non-standard JSON files - Added options to read non-standard JSON
>>files (e.g. single-quotes, unquoted attributes)

Re: Removing the Mesos fine-grained mode

2015-11-23 Thread Andrew Or
@Jerry Lam

Can someone confirm if it is true that dynamic allocation on mesos "is
> designed to run one executor per slave with the configured amount of
> resources." I copied this sentence from the documentation. Does this mean
> there is at most 1 executor per node? Therefore,  if you have a big
> machine, you need to allocate a fat executor on this machine in order to
> fully utilize it?


Mesos inherently does not support multiple executors per slave currently.
This is actually not related to dynamic allocation. There is, however, an
outstanding patch to add support for multiple executors per slave. When
that feature is merged, it will work well with dynamic allocation.


2015-11-23 9:27 GMT-08:00 Adam McElwee :

>
>
> On Mon, Nov 23, 2015 at 7:36 AM, Iulian Dragoș  > wrote:
>
>>
>>
>> On Sat, Nov 21, 2015 at 3:37 AM, Adam McElwee  wrote:
>>
>>> I've used fine-grained mode on our mesos spark clusters until this week,
>>> mostly because it was the default. I started trying coarse-grained because
>>> of the recent chatter on the mailing list about wanting to move the mesos
>>> execution path to coarse-grained only. The odd things is, coarse-grained vs
>>> fine-grained seems to yield drastic cluster utilization metrics for any of
>>> our jobs that I've tried out this week.
>>>
>>> If this is best as a new thread, please let me know, and I'll try not to
>>> derail this conversation. Otherwise, details below:
>>>
>>
>> I think it's ok to discuss it here.
>>
>>
>>> We monitor our spark clusters with ganglia, and historically, we
>>> maintain at least 90% cpu utilization across the cluster. Making a single
>>> configuration change to use coarse-grained execution instead of
>>> fine-grained consistently yields a cpu utilization pattern that starts
>>> around 90% at the beginning of the job, and then it slowly decreases over
>>> the next 1-1.5 hours to level out around 65% cpu utilization on the
>>> cluster. Does anyone have a clue why I'd be seeing such a negative effect
>>> of switching to coarse-grained mode? GC activity is comparable in both
>>> cases. I've tried 1.5.2, as well as the 1.6.0 preview tag that's on github.
>>>
>>
>> I'm not very familiar with Ganglia, and how it computes utilization. But
>> one thing comes to mind: did you enable dynamic allocation
>> 
>> on coarse-grained mode?
>>
>
> Dynamic allocation is definitely not enabled. The only delta between runs
> is adding --conf "spark.mesos.coarse=true" the job submission. Ganglia is
> just pulling stats from the procfs, and I've never seen it report bad
> results. If I sample any of the 100-200 nodes in the cluster, dstat
> reflects the same average cpu that I'm seeing reflected in ganglia.
>
>>
>> iulian
>>
>
>


Re: Spark 1.4.2 release and votes conversation?

2015-11-16 Thread Andrew Lee
I did, and it passes all of our test case, so I'm wondering what did I miss. I 
know there is the memory leak spill JIRA SPARK-11293, but not sure if that will 
go in 1.4.2 or 1.4.3, etc.




From: Reynold Xin <r...@databricks.com>
Sent: Friday, November 13, 2015 1:31 PM
To: Andrew Lee
Cc: dev@spark.apache.org
Subject: Re: Spark 1.4.2 release and votes conversation?

In the interim, you can just build it off branch-1.4 if you want.


On Fri, Nov 13, 2015 at 1:30 PM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
I actually tried to build a binary for 1.4.2 and wanted to start voting, but 
there was an issue with the release script that failed the jenkins job. Would 
be great to kick off a 1.4.2 release.


On Fri, Nov 13, 2015 at 1:00 PM, Andrew Lee 
<alee...@hotmail.com<mailto:alee...@hotmail.com>> wrote:

Hi All,


I'm wondering if Spark 1.4.2 had been voted by any chance or if I have 
overlooked and we are targeting 1.4.3?


By looking at the JIRA

https://issues.apache.org/jira/browse/SPARK/fixforversion/12332833/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-summary-panel


All issues were resolved and no blockers. Anyone knows what happened to this 
release?



or was there any recommendation to skip that and ask users to use Spark 1.5.2 
instead?




Spark 1.4.2 release and votes conversation?

2015-11-13 Thread Andrew Lee
Hi All,


I'm wondering if Spark 1.4.2 had been voted by any chance or if I have 
overlooked and we are targeting 1.4.3?


By looking at the JIRA

https://issues.apache.org/jira/browse/SPARK/fixforversion/12332833/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-summary-panel


All issues were resolved and no blockers. Anyone knows what happened to this 
release?



or was there any recommendation to skip that and ask users to use Spark 1.5.2 
instead?


Re: Support for local disk columnar storage for DataFrames

2015-11-12 Thread Andrew Duffy
Relevant link:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files

On Wed, Nov 11, 2015 at 7:31 PM, Reynold Xin  wrote:

> Thanks for the email. Can you explain what the difference is between this
> and existing formats such as Parquet/ORC?
>
>
> On Wed, Nov 11, 2015 at 4:59 AM, Cristian O <
> cristian.b.op...@googlemail.com> wrote:
>
>> Hi,
>>
>> I was wondering if there's any planned support for local disk columnar
>> storage.
>>
>> This could be an extension of the in-memory columnar store, or possibly
>> something similar to the recently added local checkpointing for RDDs
>>
>> This could also have the added benefit of enabling iterative usage for
>> DataFrames by pruning the query plan through local checkpoints.
>>
>> A further enhancement would be to add update support to the columnar
>> format (in the immutable copy-on-write sense of course), by maintaining
>> references to unchanged row blocks and only copying and mutating the ones
>> that have changed.
>>
>> A use case here is streaming and merging updates in a large dataset that
>> can be efficiently stored internally in a columnar format, rather than
>> accessing a more inefficient external  data store like HDFS or Cassandra.
>>
>> Thanks,
>> Cristian
>>
>
>


Re: Concurrency issue in SQLExecution.withNewExecutionId

2015-09-10 Thread Andrew Or
@Olivier, did you use scala's parallel collections by any chance? If not,
what form of concurrency were you using?

2015-09-10 13:01 GMT-07:00 Andrew Or <and...@databricks.com>:

> Thanks for reporting this, I have filed
> https://issues.apache.org/jira/browse/SPARK-10548.
>
> 2015-09-10 9:09 GMT-07:00 Olivier Toupin <olivier.tou...@gmail.com>:
>
>> Look at this code:
>>
>>
>> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L42
>>
>> and
>>
>>
>> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L87
>>
>> This exception is there to prevent "nested `withNewExecutionId`" but what
>> if
>> there is two concurrent commands that happens to run on the same thread?
>> Then the thread local getLocalProperty will returns an execution id,
>> triggering that exception.
>>
>> This is not hypothetical,  one of our spark job crash randomly with the
>> following stack trace (Using Spark 1.5, it ran without problem in Spark
>> 1.4.1):
>>
>> java.lang.IllegalArgumentException: spark.sql.execution.id is already set
>> at
>>
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
>> at
>> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
>> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
>>
>>
>> Also imagine the following:
>>
>> future { df1.count() }
>> future { df2.count() }
>>
>> Could we double check this if this an issue?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Concurrency-issue-in-SQLExecution-withNewExecutionId-tp14035.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>


Re: Concurrency issue in SQLExecution.withNewExecutionId

2015-09-10 Thread Andrew Or
Thanks for reporting this, I have filed
https://issues.apache.org/jira/browse/SPARK-10548.

2015-09-10 9:09 GMT-07:00 Olivier Toupin :

> Look at this code:
>
>
> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L42
>
> and
>
>
> https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala#L87
>
> This exception is there to prevent "nested `withNewExecutionId`" but what
> if
> there is two concurrent commands that happens to run on the same thread?
> Then the thread local getLocalProperty will returns an execution id,
> triggering that exception.
>
> This is not hypothetical,  one of our spark job crash randomly with the
> following stack trace (Using Spark 1.5, it ran without problem in Spark
> 1.4.1):
>
> java.lang.IllegalArgumentException: spark.sql.execution.id is already set
> at
>
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
> at
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
>
>
> Also imagine the following:
>
> future { df1.count() }
> future { df2.count() }
>
> Could we double check this if this an issue?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Concurrency-issue-in-SQLExecution-withNewExecutionId-tp14035.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Flaky test in DAGSchedulerSuite?

2015-09-04 Thread Andrew Or
(merge into master, thanks for the quick fix Pete).

2015-09-04 15:58 GMT-07:00 Cheolsoo Park :

> Thank you Pete!
>
> On Fri, Sep 4, 2015 at 1:40 PM, Pete Robbins  wrote:
>
>> raised https://issues.apache.org/jira/browse/SPARK-10454 and PR
>>
>> On 4 September 2015 at 21:24, Pete Robbins  wrote:
>>
>>> I've also just hit this and was about to raise a JIRA for this if there
>>> isn't one already. I have a simple fix.
>>>
>>> On 4 September 2015 at 19:09, Cheolsoo Park 
>>> wrote:
>>>
 Hi devs,

 I noticed this test case fails intermittently in Jenkins.

 For eg, see the following builds-

 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41991/

 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41999/

 The test failed in different PRs, and the failure looks unrelated to
 changes in the PRs. Looks like the test was added by the following commit-

 commit 80e2568b25780a7094199239da8ad6cfb6efc9f7
 Author: Imran Rashid 
 Date:   Mon Jul 20 10:28:32 2015 -0700
 [SPARK-8103][core] DAGScheduler should not submit multiple
 concurrent attempts for a stag

 Thanks!
 Cheolsoo

>>>
>>>
>>
>


Re: What is the difference between SlowSparkPullRequestBuilder and SparkPullRequestBuilder?

2015-07-22 Thread Andrew Or
Hi Yu,

As it stands today, they are identical except for trigger mechanism. When
you say test this please or push a commit, SparkPullRequestBuilder is the
one that's running the tests. SlowSparkPullRequestBuilder, however, is not
used by default, but only triggered when you say slow test please.
Functionally there is currently no difference; the latter came about
recently in an ongoing experiment to make unit tests run faster.

-Andrew

2015-07-21 22:47 GMT-07:00 Yu Ishikawa yuu.ishikawa+sp...@gmail.com:

 Hi all,

 When we send a PR, it seems that two requests to run tests are thrown to
 the
 Jenkins sometimes.
 What is the difference between SparkPullRequestBuilder and
 SlowSparkPullRequestBuilder?

 Thanks,
 Yu



 -
 -- Yu Ishikawa
 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/What-is-the-difference-between-SlowSparkPullRequestBuilder-and-SparkPullRequestBuilder-tp13377.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org




  1   2   3   >