Revisiting Python / pandas UDF (continues)

2019-12-04 Thread Hyukjin Kwon
Hi all,

I would like to finish redesigning Pandas UDF ones in Spark 3.0.
If you guys don't have a minor concern in general about (see
https://issues.apache.org/jira/browse/SPARK-28264),
I would like to start soon after addressing existing comments.

Please take a look and comment on the design docs.

Thanks!


Re: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread bo yang
Thanks guys for the discussion in the email and also this afternoon!

>From our experience, we do not need to change Spark DAG scheduler to
implement a remote shuffle service. Current Spark shuffle manager
interfaces are pretty good and easy to implement. But we do feel the need
to modify MapStatus to make it more generic.

The current limit with MapStatus is that it assumes* a map output only
exists on a single executor* (see following). One easy update could be
making MapStatus supports the scenario where *a map output could be on
multiple remote servers*.

private[spark] sealed trait MapStatus {
def location: BlockManagerId
}

class BlockManagerId private {
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
}

Also, MapStatus is a sealed trait, thus our ShuffleManager plugin could not
extend it with our own implementation. How about *making MapStatus a public
non-sealed trait*? So different Shuffle Manager plugin could implement
their own MapStatus classes.

Best,
Bo

On Wed, Dec 4, 2019 at 3:27 PM Ben Sidhom  wrote:

> Hey Imran (and everybody who made it to the sync today):
>
> Thanks for the comments. Responses below:
>
> Scheduling and re-executing tasks
>>> Allow coordination between the service and the Spark DAG scheduler as to
>>> whether a given block/partition needs to be recomputed when a task fails or
>>> when shuffle block data cannot be read. Having such coordination is
>>> important, e.g., for suppressing recomputation after aborted executors or
>>> for forcing late recomputation if the service internally acts as a cache.
>>> One catchall solution is to have the shuffle manager provide an indication
>>> of whether shuffle data is external to executors (or nodes). Another
>>> option: allow the shuffle manager (likely on the driver) to be queried for
>>> the existence of shuffle data for a given executor ID (or perhaps map task,
>>> reduce task, etc). Note that this is at the level of data the scheduler is
>>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>>> internal details for some shuffle managers.
>>
>>
>> sounds reasonable, and I think @Matt Cheah  mentioned something like this
>> has come up with their work on SPARK-25299 and was going to be added even
>> for that work.  (of course, need to look at the actual proposal closely and
>> how it impacts the scheduler.)
>
>
> While this is something that was discussed before, it is not something
> that is *currently* in the scope of SPARK-25299. Given the number of
> parties who are doing async data pushes (either as a backup, as in the case
> of the proposal in SPARK-25299, or as the sole mechanism of data
> distribution), I expect this to be an issue at the forefront for many
> people. I have not yet written a specific proposal for how this should be
> done. Rather, I wanted to gauge how many others see this as an important
> issue and figure out the most reasonable solutions for the community as a
> whole. It sounds like people have been getting by this using hacks so far.
> I would be curious to hear what does and does not work well and which
> solutions we would be OK with in Spark upstream.
>
>
> ShuffleManager API
>>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>>> service knows that data is still active. This is one way to enable
>>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>>> on robust communication with Spark and in general has a distinct lifecycle
>>> from the Spark deployment(s) it talks to. This would likely take the form
>>> of a callback on ShuffleManager itself, but there are other approaches.
>>
>>
>
> I believe this can already be done, but maybe its much uglier than it
>> needs to be (though I don't recall the details off the top of my head).
>
>
> As far as I'm aware, this would need to be added out-of-band, e.g., by the
> ShuffleManager itself firing off its own heartbeat thread(s) (on the
> driver, executors, or both). While obviously this is possible, it's also
> prone to leaks and puts more burden on shuffle implementations. In fact, I
> don't have a robust understanding of the lifecycle of the ShuffleManager
> object itself. IIRC (from some ad-hoc tests I did a while back), a new one
> is spawned on each executor itself (as opposed to being instantiated once
> on the driver and deserialized onto executors). If executor
> (ShuffleManager) instances do not receive shutdown hooks, shuffle
> implementations may be prone to resource leaks. Worse, if the behavior of
> ShuffleManager instantiation is not stable between Spark releases, there
> may be correctness issues due to intializers/constructors running in
> unexpected ways. Then you have the ShuffleManager instance used for
> registration. As far as I can tell, this runs on the driver, but might this
> be migrated between machines (either now or in future Spark releases),
> e.g., in cluster mode?
>
> If this were taken care of by the Spark 

RE: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread Jia, Ke A
Hi Ben and Felix,
This is Jia Ke from Intel Big Data Team. And I'm also interested in this. Would 
you please add me to the invite, thanks a lot.

Best regards,
Jia Ke
From: Qi,He 
Sent: Thursday, December 05, 2019 11:12 AM
To: Saisai Shao 
Cc: Liu,Linhong ; Aniket Mokashi ; 
Felix Cheung ; Ben Sidhom 
; John Zhuge ; bo yang 
; Amogh Margoor ; Ryan Blue 
; Spark Dev List ; Christopher Crosbie 
; Griselda Cuevas ; Holden Karau 
; Mayank Ahuja ; Kalyan Sivakumar 
; alfo...@fb.com; Felix Cheung ; Matt 
Cheah ; Yifei Huang (PD) 
Subject: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. 
Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao mailto:sai.sai.s...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee mailto:lihao...@gmail.com>>
抄送: "Liu,Linhong" mailto:liulinh...@baidu.com>>, Aniket 
Mokashi mailto:aniket...@gmail.com>>, Felix Cheung 
mailto:felixcheun...@hotmail.com>>, Ben Sidhom 
mailto:sid...@google.com.invalid>>, John Zhuge 
mailto:jzh...@apache.org>>, bo yang 
mailto:bobyan...@gmail.com>>, Amogh Margoor 
mailto:amo...@qubole.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, Spark Dev List 
mailto:dev@spark.apache.org>>, Christopher Crosbie 
mailto:crosb...@google.com>>, Griselda Cuevas 
mailto:g...@google.com>>, Holden Karau 
mailto:hol...@pigscanfly.ca>>, Mayank Ahuja 
mailto:mah...@qubole.com>>, Kalyan Sivakumar 
mailto:kaly...@qubole.com>>, 
"alfo...@fb.com" 
mailto:alfo...@fb.com>>, Felix Cheung 
mailto:fel...@uber.com>>, Matt Cheah 
mailto:mch...@palantir.com>>, "Yifei Huang (PD)" 
mailto:yif...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the 
invite, thanks a lot.

Best regards,
Saisai

Greg Lee mailto:lihao...@gmail.com>> 于2019年12月2日周一 
下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is 
also our solution and continuous exploring direction for  improving stability 
of Hadoop MR and Spark in the production environment. We would love to hear 
about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong mailto:liulinh...@baidu.com>> 于2019年11月29日周五 
下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a 
disaggregated shuffle service (we call it DCE) as well. We launched this in 
production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle 
to the same DCE shuffle service and stability improved a lot (we can handle 
more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated 
shuffle and my team and I are very glad to share our experience and help on 
this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi mailto:aniket...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung mailto:felixcheun...@hotmail.com>>
Cc: Ben Sidhom mailto:sid...@google.com.invalid>>, 
John Zhuge mailto:jzh...@apache.org>>, bo yang 
mailto:bobyan...@gmail.com>>, Amogh Margoor 
mailto:amo...@qubole.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, Spark Dev List 
mailto:dev@spark.apache.org>>, Christopher Crosbie 
mailto:crosb...@google.com>>, Griselda Cuevas 
mailto:g...@google.com>>, Holden Karau 
mailto:hol...@pigscanfly.ca>>, Mayank Ahuja 
mailto:mah...@qubole.com>>, Kalyan Sivakumar 
mailto:kaly...@qubole.com>>, 
"alfo...@fb.com" 
mailto:alfo...@fb.com>>, Felix Cheung 
mailto:fel...@uber.com>>, Matt Cheah 
mailto:mch...@palantir.com>>, "Yifei Huang (PD)" 
mailto:yif...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung 
mailto:felixcheun...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r 
me and I will add you.



From: Ben Sidhom mailto:sid...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge mailto:jzh...@apache.org>>
Cc: bo yang mailto:bobyan...@gmail.com>>; Amogh Margoor 
mailto:amo...@qubole.com>>; Ryan Blue 
mailto:rb...@netflix.com>>; Ben Sidhom 
mailto:sid...@google.com.invalid>>; Spark Dev List 
mailto:dev@spark.apache.org>>; Christopher Crosbie 
mailto:crosb...@google.com>>; Griselda Cuevas 
mailto:g...@google.com>>; Holden Karau 
mailto:hol...@pigscanfly.ca>>; Mayank Ahuja 
mailto:mah...@qubole.com>>; Kalyan Sivakumar 
mailto:kaly...@qubole.com>>; 
alfo...@fb.com mailto:alfo...@fb.com>>; 
Felix Cheung mailto:fel...@uber.com>>; Matt Cheah 

Re: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread Qi,He
Hi Ben and Felix

This is Qi He from Baidu,same team with Linhong,I’m also interested in this. 
Would you please add me to the invite, thanks a lot.

Thanks
Qi, He

发件人: Saisai Shao mailto:sai.sai.s...@gmail.com>>
日期: 2019年12月4日 星期三 下午5:57
至: Greg Lee mailto:lihao...@gmail.com>>
抄送: "Liu,Linhong" mailto:liulinh...@baidu.com>>, Aniket 
Mokashi mailto:aniket...@gmail.com>>, Felix Cheung 
mailto:felixcheun...@hotmail.com>>, Ben Sidhom 
mailto:sid...@google.com.invalid>>, John Zhuge 
mailto:jzh...@apache.org>>, bo yang 
mailto:bobyan...@gmail.com>>, Amogh Margoor 
mailto:amo...@qubole.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, Spark Dev List 
mailto:dev@spark.apache.org>>, Christopher Crosbie 
mailto:crosb...@google.com>>, Griselda Cuevas 
mailto:g...@google.com>>, Holden Karau 
mailto:hol...@pigscanfly.ca>>, Mayank Ahuja 
mailto:mah...@qubole.com>>, Kalyan Sivakumar 
mailto:kaly...@qubole.com>>, 
"alfo...@fb.com" 
mailto:alfo...@fb.com>>, Felix Cheung 
mailto:fel...@uber.com>>, Matt Cheah 
mailto:mch...@palantir.com>>, "Yifei Huang (PD)" 
mailto:yif...@palantir.com>>
主题: Re: Enabling fully disaggregated shuffle on Spark

Hi Ben and Felix, I'm also interested in this. Would you please add me to the 
invite, thanks a lot.

Best regards,
Saisai

Greg Lee mailto:lihao...@gmail.com>> 于2019年12月2日周一 
下午11:34写道:
Hi Felix & Ben,

This is Li Hao from Baidu, same team with Linhong.

As mentioned in Linhong’s email, independent disaggregated shuffle service is 
also our solution and continuous exploring direction for  improving stability 
of Hadoop MR and Spark in the production environment. We would love to hear 
about this topic from community and share our experience .

Please add me to this event, thanks.

Best Regards
Li Hao

Liu,Linhong mailto:liulinh...@baidu.com>> 于2019年11月29日周五 
下午5:09写道:
Hi Felix & Ben,
This is Linhong from Baidu based in Beijing, and we are internally using a 
disaggregated shuffle service (we call it DCE) as well. We launched this in 
production 3 years ago for Hadoop shuffle. Last year we migrated spark shuffle 
to the same DCE shuffle service and stability improved a lot (we can handle 
more than 100T shuffle now).
It would be nice if there is a Spark shuffle API support fully disaggregated 
shuffle and my team and I are very glad to share our experience and help on 
this topic.
So, if It’s possible, please add me to this event.

Thanks,
Liu, Linhong

From: Aniket Mokashi mailto:aniket...@gmail.com>>
Date: Thursday, November 21, 2019 at 2:12 PM
To: Felix Cheung mailto:felixcheun...@hotmail.com>>
Cc: Ben Sidhom mailto:sid...@google.com.invalid>>, 
John Zhuge mailto:jzh...@apache.org>>, bo yang 
mailto:bobyan...@gmail.com>>, Amogh Margoor 
mailto:amo...@qubole.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, Spark Dev List 
mailto:dev@spark.apache.org>>, Christopher Crosbie 
mailto:crosb...@google.com>>, Griselda Cuevas 
mailto:g...@google.com>>, Holden Karau 
mailto:hol...@pigscanfly.ca>>, Mayank Ahuja 
mailto:mah...@qubole.com>>, Kalyan Sivakumar 
mailto:kaly...@qubole.com>>, 
"alfo...@fb.com" 
mailto:alfo...@fb.com>>, Felix Cheung 
mailto:fel...@uber.com>>, Matt Cheah 
mailto:mch...@palantir.com>>, "Yifei Huang (PD)" 
mailto:yif...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

Felix - please add me to this event.

Ben - should we move this proposal to a doc and open it up for edits/comments.

On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung 
mailto:felixcheun...@hotmail.com>> wrote:
Great!

Due to number of constraints I won’t be sending link directly here but please r 
me and I will add you.



From: Ben Sidhom mailto:sid...@google.com.INVALID>>
Sent: Wednesday, November 20, 2019 9:10:01 AM
To: John Zhuge mailto:jzh...@apache.org>>
Cc: bo yang mailto:bobyan...@gmail.com>>; Amogh Margoor 
mailto:amo...@qubole.com>>; Ryan Blue 
mailto:rb...@netflix.com>>; Ben Sidhom 
mailto:sid...@google.com.invalid>>; Spark Dev List 
mailto:dev@spark.apache.org>>; Christopher Crosbie 
mailto:crosb...@google.com>>; Griselda Cuevas 
mailto:g...@google.com>>; Holden Karau 
mailto:hol...@pigscanfly.ca>>; Mayank Ahuja 
mailto:mah...@qubole.com>>; Kalyan Sivakumar 
mailto:kaly...@qubole.com>>; 
alfo...@fb.com mailto:alfo...@fb.com>>; 
Felix Cheung mailto:fel...@uber.com>>; Matt Cheah 
mailto:mch...@palantir.com>>; Yifei Huang (PD) 
mailto:yif...@palantir.com>>
Subject: Re: Enabling fully disaggregated shuffle on Spark

That sounds great!

On Wed, Nov 20, 2019 at 9:02 AM John Zhuge 
mailto:jzh...@apache.org>> wrote:
That will be great. Please send us the invite.

On Wed, Nov 20, 2019 at 8:56 AM bo yang 
mailto:bobyan...@gmail.com>> wrote:
Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! 
Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm 
PST. We could discuss more details there. Do you want to join?

On Tue, Nov 19, 2019 at 4:23 PM Amogh 

Re: [DISCUSS] Consistent relation resolution behavior in SparkSQL

2019-12-04 Thread Wenchen Fan
+1, I think it's good for both end-users and Spark developers:
* for end-users, when they lookup a table, they don't need to care which
command triggers it, as the behavior is consistent in all the places.
* for Spark developers, we may simplify the code quite a bit. For now we
have two code paths to lookup tables: one for SELECT/INSERT and one for
other commands.

Thanks,
Wenchen

On Mon, Dec 2, 2019 at 9:12 AM Terry Kim  wrote:

> Hi all,
>
> As discussed in SPARK-29900, Spark currently has two different relation
> resolution behaviors:
>
>1. Look up temp view first, then table/persistent view
>2. Look up table/persistent view
>
> The first behavior is used in SELECT, INSERT and a few commands that
> support temp views such as DESCRIBE TABLE, etc. The second behavior is used
> in most commands. Thus, it is hard to predict which relation resolution
> rule is being applied for a given command.
>
> I want to propose a consistent relation resolution behavior in which temp
> views are always looked up first before table/persistent view, as
> described more in detail in this doc: consistent relation resolution
> proposal
> 
> .
>
> Note that this proposal is a breaking change, but the impact should be
> minimal since this applies only when there are temp views and tables with
> the same name.
>
> Any feedback will be appreciated.
>
> I also want to thank Wenchen Fan, Ryan Blue, Burak Yavuz, and Dongjoon
> Hyun for guidance and suggestion.
>
> Regards,
> Terry
>
>
> 
>


Re: [DISCUSS] PostgreSQL dialect

2019-12-04 Thread Yuanjian Li
Thanks all of you for joining the discussion.
The PR is given in https://github.com/apache/spark/pull/26763, all the
PostgreSQL dialect related PRs are linked in the description.
Hoping the authors could help in reviewing.

Best,
Yuanjian

Driesprong, Fokko  于2019年12月1日周日 下午7:24写道:

> +1 (non-binding)
>
> Cheers, Fokko
>
> Op do 28 nov. 2019 om 03:47 schreef Dongjoon Hyun  >:
>
>> +1
>>
>> Bests,
>> Dongjoon.
>>
>> On Tue, Nov 26, 2019 at 3:52 PM Takeshi Yamamuro 
>> wrote:
>>
>>> Yea, +1, that looks pretty reasonable to me.
>>> > Here I'm proposing to hold off the PostgreSQL dialect. Let's remove it
>>> from the codebase before it's too late. Curently we only have 3 features
>>> under PostgreSQL dialect:
>>> I personally think we could at least stop work about the Dialect until
>>> 3.0 released.
>>>
>>>
>>> On Wed, Nov 27, 2019 at 2:41 AM Gengliang Wang <
>>> gengliang.w...@databricks.com> wrote:
>>>
 +1 with the practical proposal.
 To me, the major concern is that the code base becomes complicated,
 while the PostgreSQL dialect has very limited features. I tried introducing
 one big flag `spark.sql.dialect` and isolating related code in #25697
 , but it seems hard to be
 clean.
 Furthermore, the PostgreSQL dialect configuration overlaps with the
 ANSI mode, which can be confusing sometimes.

 Gengliang

 On Tue, Nov 26, 2019 at 8:57 AM Xiao Li  wrote:

> +1
>
>
>> One particular negative effect has been that new postgresql tests add
>> well over an hour to tests,
>
>
> Adding postgresql tests is for improving the test coverage of Spark
> SQL. We should continue to do this by importing more test cases. The
> quality of Spark highly depends on the test coverage. We can further
> paralyze the test execution to reduce the test time.
>
> Migrating PostgreSQL workloads to Spark SQL
>
>
> This should not be our current focus. In the near future, it is
> impossible to be fully compatible with PostgreSQL. We should focus on
> adding features that are useful to Spark community. PostgreSQL is a good
> reference, but we do not need to blindly follow it. We already closed
> multiple related JIRAs that try to add some PostgreSQL features that are
> not commonly used.
>
> Cheers,
>
> Xiao
>
>
> On Tue, Nov 26, 2019 at 8:30 AM Maciej Szymkiewicz <
> mszymkiew...@gmail.com> wrote:
>
>> I think it is important to distinguish between two different concepts:
>>
>>- Adherence to standards and their well established
>>implementations.
>>- Enabling migrations from some product X to Spark.
>>
>> While these two problems are related, there are independent and one
>> can be achieved without the other.
>>
>>- The former approach doesn't imply that all features of SQL
>>standard (or its specific implementation) are provided. It is 
>> sufficient
>>that commonly used features that are implemented, are standard 
>> compliant.
>>Therefore if end user applies some well known pattern, thing will 
>> work as
>>expected. I
>>
>>In my personal opinion that's something that is worth the
>>required development resources, and in general should happen within 
>> the
>>project.
>>
>>
>>- The latter one is more complicated. First of all the premise
>>that one can "migrate PostgreSQL workloads to Spark" seems to be 
>> flawed.
>>While both Spark and PostgreSQL evolve, and probably have more in 
>> common
>>today, than a few years ago, they're not even close enough to pretend 
>> that
>>one can be replacement for the other. In contrast, existing 
>> compatibility
>>layers between major vendors make sense, because feature disparity
>>(at least when it comes to core functionality) is usually
>>minimal. And that doesn't even touch the problem that PostgreSQL 
>> provides
>>extensively used extension points that enable broad and evolving 
>> ecosystem
>>(what should we do about continuous queries? Should Structured 
>> Streaming
>>provide some compatibility layer as well?).
>>
>>More realistically Spark could provide a compatibility layer with
>>some analytical tools that itself provide some PostgreSQL 
>> compatibility,
>>but these are not always fully compatible with upstream PostgreSQL, 
>> nor
>>necessarily follow the latest PostgreSQL development.
>>
>>Furthermore compatibility layer can be, within certain limits
>>(i.e. availability of required primitives), maintained as a separate
>>project, without putting more strain on existing resources. 
>> Effectively
>>what we care about here is if we can 

Re: SQL test failures in PR builder?

2019-12-04 Thread Shane Knapp
++yin huai for more insight in to the NewSparkPullRequestBuilder job...

tbh, i never (or still) really understand the exact use for that job,
except that it's triggered by https://spark-prs.appspot.com/

shane


On Wed, Dec 4, 2019 at 3:34 PM Sean Owen  wrote:
>
> BTW does anyone know why there are two PR builder jobs? I'm confused
> about why different ones would execute.
>
> Yes I see NewSparkPullRequestBuilder failing on a variety of PRs.
> I don't think it has anything to do with Hive; these PRs touch
> different parts of code but all not related to this failure.
>
> On Wed, Dec 4, 2019 at 12:40 PM Dongjoon Hyun  wrote:
> >
> > Hi, Sean.
> >
> > It seems that there is no failure on your other SQL PR.
> >
> > https://github.com/apache/spark/pull/26748
> >
> > Does the sequential failure happen only at `NewSparkPullRequestBuilder`?
> > Since `NewSparkPullRequestBuilder` is not the same with 
> > `SparkPullRequestBuilder`,
> > there might be a root cause inside it if it happens only at 
> > `NewSparkPullRequestBuilder`.
> >
> > For `org.apache.hive.service.ServiceException: Failed to Start HiveServer2`,
> > I've observed them before, but the root cause might be different from this 
> > one.
> >
> > BTW, to reduce the scope of investigation, could you try with `[hive-1.2]` 
> > tag in your PR?
> >
> > Bests,
> > Dongjoon.
> >
> >
> > On Wed, Dec 4, 2019 at 6:29 AM Sean Owen  wrote:
> >>
> >> I'm seeing consistent failures in the PR builder when touching SQL code:
> >>
> >> https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4960/testReport/
> >>
> >>  
> >> org.apache.spark.sql.hive.thriftserver.SparkMetadataOperationSuite.Spark's 
> >> own GetSchemasOperation(SparkGetSchemasOperation)14 ms2
> >>  
> >> org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.(It
> >>  is not a test it is a sbt.testing.SuiteSelector)
> >>
> >> Looks like this has failed about 6 builds in the past few days. Has anyone 
> >> seen this / has a clue what's causing it? errors are like ...
> >>
> >> java.sql.SQLException: No suitable driver found for 
> >> jdbc:hive2://localhost:13694/?a=avalue;b=bvalue#c=cvalue;d=dvalue
> >>
> >>
> >> Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: class 
> >> org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not 
> >> org.apache.hadoop.hive.metastore.MetaStoreFilterHook



--
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SQL test failures in PR builder?

2019-12-04 Thread Sean Owen
BTW does anyone know why there are two PR builder jobs? I'm confused
about why different ones would execute.

Yes I see NewSparkPullRequestBuilder failing on a variety of PRs.
I don't think it has anything to do with Hive; these PRs touch
different parts of code but all not related to this failure.

On Wed, Dec 4, 2019 at 12:40 PM Dongjoon Hyun  wrote:
>
> Hi, Sean.
>
> It seems that there is no failure on your other SQL PR.
>
> https://github.com/apache/spark/pull/26748
>
> Does the sequential failure happen only at `NewSparkPullRequestBuilder`?
> Since `NewSparkPullRequestBuilder` is not the same with 
> `SparkPullRequestBuilder`,
> there might be a root cause inside it if it happens only at 
> `NewSparkPullRequestBuilder`.
>
> For `org.apache.hive.service.ServiceException: Failed to Start HiveServer2`,
> I've observed them before, but the root cause might be different from this 
> one.
>
> BTW, to reduce the scope of investigation, could you try with `[hive-1.2]` 
> tag in your PR?
>
> Bests,
> Dongjoon.
>
>
> On Wed, Dec 4, 2019 at 6:29 AM Sean Owen  wrote:
>>
>> I'm seeing consistent failures in the PR builder when touching SQL code:
>>
>> https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4960/testReport/
>>
>>  org.apache.spark.sql.hive.thriftserver.SparkMetadataOperationSuite.Spark's 
>> own GetSchemasOperation(SparkGetSchemasOperation)14 ms2
>>  
>> org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.(It 
>> is not a test it is a sbt.testing.SuiteSelector)
>>
>> Looks like this has failed about 6 builds in the past few days. Has anyone 
>> seen this / has a clue what's causing it? errors are like ...
>>
>> java.sql.SQLException: No suitable driver found for 
>> jdbc:hive2://localhost:13694/?a=avalue;b=bvalue#c=cvalue;d=dvalue
>>
>>
>> Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: class 
>> org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not 
>> org.apache.hadoop.hive.metastore.MetaStoreFilterHook

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread Ben Sidhom
Hey Imran (and everybody who made it to the sync today):

Thanks for the comments. Responses below:

Scheduling and re-executing tasks
>> Allow coordination between the service and the Spark DAG scheduler as to
>> whether a given block/partition needs to be recomputed when a task fails or
>> when shuffle block data cannot be read. Having such coordination is
>> important, e.g., for suppressing recomputation after aborted executors or
>> for forcing late recomputation if the service internally acts as a cache.
>> One catchall solution is to have the shuffle manager provide an indication
>> of whether shuffle data is external to executors (or nodes). Another
>> option: allow the shuffle manager (likely on the driver) to be queried for
>> the existence of shuffle data for a given executor ID (or perhaps map task,
>> reduce task, etc). Note that this is at the level of data the scheduler is
>> aware of (i.e., map/reduce partitions) rather than block IDs, which are
>> internal details for some shuffle managers.
>
>
> sounds reasonable, and I think @Matt Cheah  mentioned something like this
> has come up with their work on SPARK-25299 and was going to be added even
> for that work.  (of course, need to look at the actual proposal closely and
> how it impacts the scheduler.)


While this is something that was discussed before, it is not something that
is *currently* in the scope of SPARK-25299. Given the number of parties who
are doing async data pushes (either as a backup, as in the case of the
proposal in SPARK-25299, or as the sole mechanism of data distribution), I
expect this to be an issue at the forefront for many people. I have not yet
written a specific proposal for how this should be done. Rather, I wanted
to gauge how many others see this as an important issue and figure out the
most reasonable solutions for the community as a whole. It sounds like
people have been getting by this using hacks so far. I would be curious to
hear what does and does not work well and which solutions we would be OK
with in Spark upstream.


ShuffleManager API
>> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
>> service knows that data is still active. This is one way to enable
>> time-/job-scoped data because a disaggregated shuffle service cannot rely
>> on robust communication with Spark and in general has a distinct lifecycle
>> from the Spark deployment(s) it talks to. This would likely take the form
>> of a callback on ShuffleManager itself, but there are other approaches.
>
>

I believe this can already be done, but maybe its much uglier than it needs
> to be (though I don't recall the details off the top of my head).


As far as I'm aware, this would need to be added out-of-band, e.g., by the
ShuffleManager itself firing off its own heartbeat thread(s) (on the
driver, executors, or both). While obviously this is possible, it's also
prone to leaks and puts more burden on shuffle implementations. In fact, I
don't have a robust understanding of the lifecycle of the ShuffleManager
object itself. IIRC (from some ad-hoc tests I did a while back), a new one
is spawned on each executor itself (as opposed to being instantiated once
on the driver and deserialized onto executors). If executor
(ShuffleManager) instances do not receive shutdown hooks, shuffle
implementations may be prone to resource leaks. Worse, if the behavior of
ShuffleManager instantiation is not stable between Spark releases, there
may be correctness issues due to intializers/constructors running in
unexpected ways. Then you have the ShuffleManager instance used for
registration. As far as I can tell, this runs on the driver, but might this
be migrated between machines (either now or in future Spark releases),
e.g., in cluster mode?

If this were taken care of by the Spark scheduler rather than the shuffle
manager itself, we could avoid an entire class of subtle issues. My
off-the-cuff suggestion above was to expose a callback on the
ShuffleManager that allows implementations to define their own heartbeat
logic. That could then be invoked by the scheduler when and where
appropriate (along with any other lifecycle callbacks we might add).

Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
>> connections/streams/file handles as well as provide commit semantics).
>> SPARK-25299 adds commit semantics to the internal data storage layer, but
>> this is applicable to all shuffle managers at a higher level and should
>> apply equally to the ShuffleWriter.
>
>
> ShuffleWriter has a
>
>> def stop(success: Boolean): Option[MapStatus]
>
>  I would need more info about why that isn't enough.  (But if there is a
> need for it, yes this makes sense.)


That's probably fine for most purposes. However, that stop hook only exists
on shuffle writers. What about on readers? In any case, each instance
reader/writer instance appears to only be invoked once for reading or
writing. If ShuffleManagers can assume that behavior is 

Re: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread Imran Rashid
Hi Ben,

in general everything you're proposing sounds reasonable.  For me, at
least, I'd need more details on most of the points before I fully
understand them, but I'm definitely in favor of the general goal for making
spark support fully disaggregated shuffle.  Of course, I also want to make
sure it can be done in a way that involves the least risky changes to spark
itself and we can continue to support.

One very-high level point which I think is worth keeping in mind for the
wider community following this -- the key difference between what you are
proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing
shuffle implementation, which leverages local disk.  Your goal is to better
support shuffling all data via some external service, which avoids shuffle
data hitting executors local disks entirely.  This was already possible, to
some extent, even before SPARK-25299 with the ShuffleManager api; but as
you note, there are shortcomings which need to be addressed.  (Historical
note: that api wasn't designed with totally distributed shuffle services in
mind, it was to support hash- vs. sort-based shuffle, all still on spark's
executors.)

One thing that I thought you would have needed, but you didn't mention
here, is changes to the scheduler to add an extra step between the
shuffle-write & shuffle-read stages, if it needs to do any work to
reorganize data, I think I have heard this come up in prior discussions.

A couple of inline comments below:

On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom 
wrote:

> Proposal
> Scheduling and re-executing tasks
>
> Allow coordination between the service and the Spark DAG scheduler as to
> whether a given block/partition needs to be recomputed when a task fails or
> when shuffle block data cannot be read. Having such coordination is
> important, e.g., for suppressing recomputation after aborted executors or
> for forcing late recomputation if the service internally acts as a cache.
> One catchall solution is to have the shuffle manager provide an indication
> of whether shuffle data is external to executors (or nodes). Another
> option: allow the shuffle manager (likely on the driver) to be queried for
> the existence of shuffle data for a given executor ID (or perhaps map task,
> reduce task, etc). Note that this is at the level of data the scheduler is
> aware of (i.e., map/reduce partitions) rather than block IDs, which are
> internal details for some shuffle managers.
>

sounds reasonable, and I think @Matt Cheah   mentioned
something like this has come up with their work on SPARK-25299 and was
going to be added even for that work.  (of course, need to look at the
actual proposal closely and how it impacts the scheduler.)

> ShuffleManager API
>
> Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the
> service knows that data is still active. This is one way to enable
> time-/job-scoped data because a disaggregated shuffle service cannot rely
> on robust communication with Spark and in general has a distinct lifecycle
> from the Spark deployment(s) it talks to. This would likely take the form
> of a callback on ShuffleManager itself, but there are other approaches.
>

I believe this can already be done, but maybe its much uglier than it needs
to be (though I don't recall the details off the top of my head).


> Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle
> connections/streams/file handles as well as provide commit semantics).
> SPARK-25299 adds commit semantics to the internal data storage layer, but
> this is applicable to all shuffle managers at a higher level and should
> apply equally to the ShuffleWriter.
>

ShuffleWriter has a

def stop(success: Boolean): Option[MapStatus]

 I would need more info about why that isn't enough.  (But if there is a
need for it, yes this makes sense.)

> Serialization
>
> Allow serializers to be used more flexibly and efficiently. For example,
> have serializers support writing an arbitrary number of objects into an
> existing OutputStream or ByteBuffer. This enables objects to be serialized
> to direct buffers where doing so makes sense. More importantly, it allows
> arbitrary metadata/framing data to be wrapped around individual objects
> cheaply. Right now, that’s only possible at the stream level. (There are
> hacks around this, but this would enable more idiomatic use in efficient
> shuffle implementations.)
>

I don't really understand how this is different from the existing
SerializationStream -- probably a small example would clarify.


> Have serializers indicate whether they are deterministic. This provides
> much of the value of a shuffle service because it means that reducers do
> not need to spill to disk when reading/merging/combining inputs--the data
> can be grouped by the service, even without the service understanding data
> types or byte representations. Alternative (less preferable since it would
> break Java serialization, for example): require all 

Re: SQL test failures in PR builder?

2019-12-04 Thread Dongjoon Hyun
Hi, Sean.

It seems that there is no failure on your other SQL PR.

https://github.com/apache/spark/pull/26748

Does the sequential failure happen only at `NewSparkPullRequestBuilder`?
Since `NewSparkPullRequestBuilder` is not the same with
`SparkPullRequestBuilder`,
there might be a root cause inside it if it happens only at
`NewSparkPullRequestBuilder`.

For `org.apache.hive.service.ServiceException: Failed to Start HiveServer2`,
I've observed them before, but the root cause might be different from this
one.

BTW, to reduce the scope of investigation, could you try with `[hive-1.2]`
tag in your PR?

Bests,
Dongjoon.


On Wed, Dec 4, 2019 at 6:29 AM Sean Owen  wrote:

> I'm seeing consistent failures in the PR builder when touching SQL code:
>
>
> https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4960/testReport/
>
>  org.apache.spark.sql.hive.thriftserver.SparkMetadataOperationSuite.Spark's
> own GetSchemasOperation(SparkGetSchemasOperation)14 ms2
>  org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.(It
> is not a test it is a sbt.testing.SuiteSelector)
>
> Looks like this has failed about 6 builds in the past few days. Has anyone
> seen this / has a clue what's causing it? errors are like ...
>
> java.sql.SQLException: No suitable driver found for 
> jdbc:hive2://localhost:13694/?a=avalue;b=bvalue#c=cvalue;d=dvalue
>
>
> Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: class 
> org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not 
> org.apache.hadoop.hive.metastore.MetaStoreFilterHook
>
>


SQL test failures in PR builder?

2019-12-04 Thread Sean Owen
I'm seeing consistent failures in the PR builder when touching SQL code:

https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4960/testReport/

 org.apache.spark.sql.hive.thriftserver.SparkMetadataOperationSuite.Spark's
own GetSchemasOperation(SparkGetSchemasOperation)14 ms2
 org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite.(It
is not a test it is a sbt.testing.SuiteSelector)

Looks like this has failed about 6 builds in the past few days. Has anyone
seen this / has a clue what's causing it? errors are like ...

java.sql.SQLException: No suitable driver found for
jdbc:hive2://localhost:13694/?a=avalue;b=bvalue#c=cvalue;d=dvalue


Caused by: sbt.ForkMain$ForkError: java.lang.RuntimeException: class
org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl not
org.apache.hadoop.hive.metastore.MetaStoreFilterHook


Re: Enabling fully disaggregated shuffle on Spark

2019-12-04 Thread Saisai Shao
Hi Ben and Felix, I'm also interested in this. Would you please add me to
the invite, thanks a lot.

Best regards,
Saisai

Greg Lee  于2019年12月2日周一 下午11:34写道:

> Hi Felix & Ben,
>
> This is Li Hao from Baidu, same team with Linhong.
>
> As mentioned in Linhong’s email, independent disaggregated shuffle service
> is also our solution and continuous exploring direction for  improving
> stability of Hadoop MR and Spark in the production environment. We would
> love to hear about this topic from community and share our experience .
>
> Please add me to this event, thanks.
>
> Best Regards
> Li Hao
>
> Liu,Linhong  于2019年11月29日周五 下午5:09写道:
>
>> Hi Felix & Ben,
>>
>> This is Linhong from Baidu based in Beijing, and we are internally using
>> a disaggregated shuffle service (we call it DCE) as well. We launched this
>> in production 3 years ago for Hadoop shuffle. Last year we migrated spark
>> shuffle to the same DCE shuffle service and stability improved a lot (we
>> can handle more than 100T shuffle now).
>>
>> It would be nice if there is a Spark shuffle API support fully
>> disaggregated shuffle and my team and I are very glad to share our
>> experience and help on this topic.
>>
>> So, if It’s possible, please add me to this event.
>>
>>
>>
>> Thanks,
>>
>> Liu, Linhong
>>
>>
>>
>> *From: *Aniket Mokashi 
>> *Date: *Thursday, November 21, 2019 at 2:12 PM
>> *To: *Felix Cheung 
>> *Cc: *Ben Sidhom , John Zhuge <
>> jzh...@apache.org>, bo yang , Amogh Margoor <
>> amo...@qubole.com>, Ryan Blue , Spark Dev List <
>> dev@spark.apache.org>, Christopher Crosbie ,
>> Griselda Cuevas , Holden Karau ,
>> Mayank Ahuja , Kalyan Sivakumar ,
>> "alfo...@fb.com" , Felix Cheung , Matt
>> Cheah , "Yifei Huang (PD)" 
>> *Subject: *Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> Felix - please add me to this event.
>>
>>
>>
>> Ben - should we move this proposal to a doc and open it up for
>> edits/comments.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung 
>> wrote:
>>
>> Great!
>>
>>
>>
>> Due to number of constraints I won’t be sending link directly here but
>> please r me and I will add you.
>>
>>
>>
>>
>> --
>>
>> *From:* Ben Sidhom 
>> *Sent:* Wednesday, November 20, 2019 9:10:01 AM
>> *To:* John Zhuge 
>> *Cc:* bo yang ; Amogh Margoor ;
>> Ryan Blue ; Ben Sidhom ;
>> Spark Dev List ; Christopher Crosbie <
>> crosb...@google.com>; Griselda Cuevas ; Holden Karau <
>> hol...@pigscanfly.ca>; Mayank Ahuja ; Kalyan
>> Sivakumar ; alfo...@fb.com ; Felix
>> Cheung ; Matt Cheah ; Yifei Huang
>> (PD) 
>> *Subject:* Re: Enabling fully disaggregated shuffle on Spark
>>
>>
>>
>> That sounds great!
>>
>>
>>
>> On Wed, Nov 20, 2019 at 9:02 AM John Zhuge  wrote:
>>
>> That will be great. Please send us the invite.
>>
>>
>>
>> On Wed, Nov 20, 2019 at 8:56 AM bo yang  wrote:
>>
>> Cool, thanks Ryan, John, Amogh for the reply! Great to see you
>> interested! Felix will have a Spark Scalability & Reliability Sync
>> meeting on Dec 4 1pm PST. We could discuss more details there. Do you want
>> to join?
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor  wrote:
>>
>> We at Qubole are also looking at disaggregating shuffle on Spark. Would
>> love to collaborate and share learnings.
>>
>>
>>
>> Regards,
>>
>> Amogh
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:09 PM John Zhuge  wrote:
>>
>> Great work, Bo! Would love to hear the details.
>>
>>
>>
>>
>>
>> On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue 
>> wrote:
>>
>> I'm interested in remote shuffle services as well. I'd love to hear about
>> what you're using in production!
>>
>>
>>
>> rb
>>
>>
>>
>> On Tue, Nov 19, 2019 at 2:43 PM bo yang  wrote:
>>
>> Hi Ben,
>>
>>
>>
>> Thanks for the writing up! This is Bo from Uber. I am in Felix's team in
>> Seattle, and working on disaggregated shuffle (we called it remote shuffle
>> service, RSS, internally). We have put RSS into production for a while, and
>> learned a lot during the work (tried quite a few techniques to improve the
>> remote shuffle performance). We could share our learning with the
>> community, and also would like to hear feedback/suggestions on how to
>> further improve remote shuffle performance. We could chat more details if
>> you or other people are interested.
>>
>>
>>
>> Best,
>>
>> Bo
>>
>>
>>
>> On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom 
>> wrote:
>>
>> I would like to start a conversation about extending the Spark shuffle
>> manager surface to support fully disaggregated shuffle implementations.
>> This is closely related to the work in SPARK-25299
>> , which is focused on
>> refactoring the shuffle manager API (and in particular, SortShuffleManager)
>> to use a pluggable storage backend. The motivation for that SPIP is further
>> enabling Spark on Kubernetes.
>>
>>
>>
>> The motivation for this proposal is enabling full externalized
>> (disaggregated) shuffle service implementations. (Facebook’s Cosco
>> shuffle
>>