Re: Integrating ML/DL frameworks with Spark

2018-05-23 Thread Xiangrui Meng
Hi all,

Thanks for your feedback! I uploaded a SPIP doc for the barrier scheduling
feature at https://issues.apache.org/jira/browse/SPARK-24374. Please take a
look and leave your comments there. I had some offline discussion with +Xingbo
Jiang <xingbo.ji...@databricks.com> to help me design the APIs. He is quite
familiar with Spark job scheduler and he will share some design ideas on
the JIRA.

I will work on SPIPs for the other two proposals: 1) fast data exchange, 2)
accelerator-aware scheduling. I definitely need some help for the second
one because I'm not familiar with YARN/Mesos/k8s.

Best,
Xiangrui

On Sun, May 20, 2018 at 8:19 PM Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Very cool. We would be very interested in this.
>
> What is the plan forward to make progress in each of the three areas?
>
>
> --
> *From:* Bryan Cutler <cutl...@gmail.com>
> *Sent:* Monday, May 14, 2018 11:37:20 PM
> *To:* Xiangrui Meng
> *Cc:* Reynold Xin; dev
>
> *Subject:* Re: Integrating ML/DL frameworks with Spark
> Thanks for starting this discussion, I'd also like to see some
> improvements in this area and glad to hear that the Pandas UDFs / Arrow
> functionality might be useful.  I'm wondering if from your initial
> investigations you found anything lacking from the Arrow format or possible
> improvements that would simplify the data representation?  Also, while data
> could be handed off in a UDF, would it make sense to also discuss a more
> formal way to externalize the data in a way that would also work for the
> Scala API?
>
> Thanks,
> Bryan
>
> On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng <m...@databricks.com> wrote:
>
>> Shivaram: Yes, we can call it "gang scheduling" or "barrier
>> synchronization". Spark doesn't support it now. The proposal is to have a
>> proper support in Spark's job scheduler, so we can integrate well with
>> MPI-like frameworks.
>>
>>
>> On Tue, May 8, 2018 at 11:17 AM Nan Zhu <zhunanmcg...@gmail.com> wrote:
>>
>>> .how I skipped the last part
>>>
>>> On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>>> Yes, Nan, totally agree. To be on the same page, that's exactly what I
>>>> wrote wasn't it?
>>>>
>>>> On Tue, May 8, 2018 at 11:14 AM Nan Zhu <zhunanmcg...@gmail.com> wrote:
>>>>
>>>>> besides that, one of the things which is needed by multiple frameworks
>>>>> is to schedule tasks in a single wave
>>>>>
>>>>> i.e.
>>>>>
>>>>> if some frameworks like xgboost/mxnet requires 50 parallel workers,
>>>>> Spark is desired to provide a capability to ensure that either we run 50
>>>>> tasks at once, or we should quit the complete application/job after some
>>>>> timeout period
>>>>>
>>>>> Best,
>>>>>
>>>>> Nan
>>>>>
>>>>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <r...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> I think that's what Xiangrui was referring to. Instead of retrying a
>>>>>> single task, retry the entire stage, and the entire stage of tasks need 
>>>>>> to
>>>>>> be scheduled all at once.
>>>>>>
>>>>>>
>>>>>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>>>>>> shiva...@eecs.berkeley.edu> wrote:
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>>- Fault tolerance and execution model: Spark assumes
>>>>>>>>>fine-grained task recovery, i.e. if something fails, only that 
>>>>>>>>> task is
>>>>>>>>>rerun. This doesn’t match the execution model of distributed ML/DL
>>>>>>>>>frameworks that are typically MPI-based, and rerunning a single 
>>>>>>>>> task would
>>>>>>>>>lead to the entire system hanging. A whole stage needs to be 
>>>>>>>>> re-run.
>>>>>>>>>
>>>>>>>>> This is not only useful for integrating with 3rd-party frameworks,
>>>>>>>> but also useful for scaling MLlib algorithms. One of my earliest 
>>>>>>>> attempts
>>>>>>>> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-1485>). But we ended
>>>>>>>> up with some compromised solutions. With the new execution model, we 
>>>>>>>> can
>>>>>>>> set up a hybrid cluster and do all-reduce properly.
>>>>>>>>
>>>>>>>>
>>>>>>> Is there a particular new execution model you are referring to or do
>>>>>>> we plan to investigate a new execution model ?  For the MPI-like model, 
>>>>>>> we
>>>>>>> also need gang scheduling (i.e. schedule all tasks at once or none of 
>>>>>>> them)
>>>>>>> and I dont think we have support for that in the scheduler right now.
>>>>>>>
>>>>>>>>
>>>>>>>>> --
>>>>>>>>
>>>>>>>> Xiangrui Meng
>>>>>>>>
>>>>>>>> Software Engineer
>>>>>>>>
>>>>>>>> Databricks Inc. [image: http://databricks.com]
>>>>>>>> <http://databricks.com/>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com] <http://databricks.com/>
>>
>
> --

Xiangrui Meng

Software Engineer

Databricks Inc. [image: http://databricks.com] <http://databricks.com/>


Re: Integrating ML/DL frameworks with Spark

2018-05-20 Thread Felix Cheung
Very cool. We would be very interested in this.

What is the plan forward to make progress in each of the three areas?



From: Bryan Cutler <cutl...@gmail.com>
Sent: Monday, May 14, 2018 11:37:20 PM
To: Xiangrui Meng
Cc: Reynold Xin; dev
Subject: Re: Integrating ML/DL frameworks with Spark

Thanks for starting this discussion, I'd also like to see some improvements in 
this area and glad to hear that the Pandas UDFs / Arrow functionality might be 
useful.  I'm wondering if from your initial investigations you found anything 
lacking from the Arrow format or possible improvements that would simplify the 
data representation?  Also, while data could be handed off in a UDF, would it 
make sense to also discuss a more formal way to externalize the data in a way 
that would also work for the Scala API?

Thanks,
Bryan

On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng 
<m...@databricks.com<mailto:m...@databricks.com>> wrote:
Shivaram: Yes, we can call it "gang scheduling" or "barrier synchronization". 
Spark doesn't support it now. The proposal is to have a proper support in 
Spark's job scheduler, so we can integrate well with MPI-like frameworks.


On Tue, May 8, 2018 at 11:17 AM Nan Zhu 
<zhunanmcg...@gmail.com<mailto:zhunanmcg...@gmail.com>> wrote:
.how I skipped the last part

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote 
wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu 
<zhunanmcg...@gmail.com<mailto:zhunanmcg...@gmail.com>> wrote:
besides that, one of the things which is needed by multiple frameworks is to 
schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is 
desired to provide a capability to ensure that either we run 50 tasks at once, 
or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single 
task, retry the entire stage, and the entire stage of tasks need to be 
scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman 
<shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> wrote:


  *   Fault tolerance and execution model: Spark assumes fine-grained task 
recovery, i.e. if something fails, only that task is rerun. This doesn’t match 
the execution model of distributed ML/DL frameworks that are typically 
MPI-based, and rerunning a single task would lead to the entire system hanging. 
A whole stage needs to be re-run.

This is not only useful for integrating with 3rd-party frameworks, but also 
useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib 
was to implement All-Reduce primitive 
(SPARK-1485<https://issues.apache.org/jira/browse/SPARK-1485>). But we ended up 
with some compromised solutions. With the new execution model, we can set up a 
hybrid cluster and do all-reduce properly.

Is there a particular new execution model you are referring to or do we plan to 
investigate a new execution model ?  For the MPI-like model, we also need gang 
scheduling (i.e. schedule all tasks at once or none of them) and I dont think 
we have support for that in the scheduler right now.

--

Xiangrui Meng

Software Engineer

Databricks Inc. [http://databricks.com] <http://databricks.com/>



--

Xiangrui Meng

Software Engineer

Databricks Inc. [http://databricks.com] <http://databricks.com/>



Re: Integrating ML/DL frameworks with Spark

2018-05-17 Thread Daniel Galvez
Hi all,

Paul Ogilvie pointed this thread out to me; we overlapped a little at LinkedIn. 
It’s good to see that this kind of discussion is going on!

I have some thoughts regarding the discussion going on:

- Practically speaking, one of the lowest hanging fruit is the ability for 
Spark to request GPUs (and in general, devices). I would be happy to implement 
this myself, if I were given the go-ahead. I’m familiar with only YARN, not the 
Mesos or Kubernetes resource schedulers, though. It would be best to be 
forward-looking and think about how to request arbitrary linux devices rather 
than just GPUs.

- The discussion here regarding ML/DL seems to focus on DL in particular, and 
the DL discussion seems to focus vaguely on data-parallel deep learning 
training.This is probably a fine starting point.

- It is generally challenging to utilize a GPU fully in each kernel call, but 
there are solutions like CUDA MPS to virtualize a physical GPU as many smaller 
GPUs. However, each physical GPU is still represented as a single character 
device, e.g., /dev/nvidia0. This does not mesh well with YARN’s GPU isolation 
by putting each executor in its own cgroup, with only specific *physical* 
character devices whitelisted. Alas. Supporting CUDA MPS would be good to keep 
in mind for inference workloads. I could elaborate if desired.

- For things like all-reduce to work well, you need to keep in mind your I/O 
bandwidth. This means that you need to keep in mind your “topology” of your 
compute devices (be they CPU, GPUs, FPGAs, IPUs, or whatever). I’m not sure if 
Spark is already aware of this at the ethernet level, forgive me. But I am 
certain that it is not aware of this at the PCIe level. Ring all-reduce does 
this automatically for you in some sense when it creates its “ring", but only 
if you give it control of your full topology, which is the traditional MPI 
style (i.e., you’re normally not sharing a node with other jobs with MPI). 
Secondly, Infiniband connections exist for GPUs to talk directly to one another 
via what is called “GPUDirect", effectively bypassing the CPU and running at 
the highest bandwidth possible today. This is a very popular approach, and not 
something that Spark would seemingly be able to touch. So I question Spark’s 
ability to have a hand in large-scale distributed training of deep learning 
models.

- I would want to know more about claims of UDFs being slow. For perspective, 
PCI express Gen 3 (Gen 4 is not out yet…) has 12 GB/s bandwidth effectively. 
Split among 4 GPUs, you have 3 GB/s. In high performance computing, this is 
always considered the bottleneck.

Anyway, this is something I’m particularly interested in. Feel free to poke me 
if you want me to answer a specific question.

Sincerely,
Daniel

On 2018/05/09 23:31:10, Xiangrui Meng  wrote: 
> Shivaram: Yes, we can call it "gang scheduling" or "barrier
> synchronization". Spark doesn't support it now. The proposal is to have a
> proper support in Spark's job scheduler, so we can integrate well with
> MPI-like frameworks.
> 
> On Tue, May 8, 2018 at 11:17 AM Nan Zhu  wrote:
> 
> > .how I skipped the last part
> >
> > On Tue, May 8, 2018 at 11:16 AM, Reynold Xin  wrote:
> >
> >> Yes, Nan, totally agree. To be on the same page, that's exactly what I
> >> wrote wasn't it?
> >>
> >> On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:
> >>
> >>> besides that, one of the things which is needed by multiple frameworks
> >>> is to schedule tasks in a single wave
> >>>
> >>> i.e.
> >>>
> >>> if some frameworks like xgboost/mxnet requires 50 parallel workers,
> >>> Spark is desired to provide a capability to ensure that either we run 50
> >>> tasks at once, or we should quit the complete application/job after some
> >>> timeout period
> >>>
> >>> Best,
> >>>
> >>> Nan
> >>>
> >>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin 
> >>> wrote:
> >>>
>  I think that's what Xiangrui was referring to. Instead of retrying a
>  single task, retry the entire stage, and the entire stage of tasks need 
>  to
>  be scheduled all at once.
> 
> 
>  On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>  shiva...@eecs.berkeley.edu> wrote:
> 
> >
> >>
> >>>- Fault tolerance and execution model: Spark assumes
> >>>fine-grained task recovery, i.e. if something fails, only that 
> >>> task is
> >>>rerun. This doesn’t match the execution model of distributed ML/DL
> >>>frameworks that are typically MPI-based, and rerunning a single 
> >>> task would
> >>>lead to the entire system hanging. A whole stage needs to be 
> >>> re-run.
> >>>
> >>> This is not only useful for integrating with 3rd-party frameworks,
> >> but also useful for scaling MLlib algorithms. One of my earliest 
> >> attempts
> >> in Spark MLlib was to 

Re: Integrating ML/DL frameworks with Spark

2018-05-15 Thread Bryan Cutler
Thanks for starting this discussion, I'd also like to see some improvements
in this area and glad to hear that the Pandas UDFs / Arrow functionality
might be useful.  I'm wondering if from your initial investigations you
found anything lacking from the Arrow format or possible improvements that
would simplify the data representation?  Also, while data could be handed
off in a UDF, would it make sense to also discuss a more formal way to
externalize the data in a way that would also work for the Scala API?

Thanks,
Bryan

On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng  wrote:

> Shivaram: Yes, we can call it "gang scheduling" or "barrier
> synchronization". Spark doesn't support it now. The proposal is to have a
> proper support in Spark's job scheduler, so we can integrate well with
> MPI-like frameworks.
>
>
> On Tue, May 8, 2018 at 11:17 AM Nan Zhu  wrote:
>
>> .how I skipped the last part
>>
>> On Tue, May 8, 2018 at 11:16 AM, Reynold Xin  wrote:
>>
>>> Yes, Nan, totally agree. To be on the same page, that's exactly what I
>>> wrote wasn't it?
>>>
>>> On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:
>>>
 besides that, one of the things which is needed by multiple frameworks
 is to schedule tasks in a single wave

 i.e.

 if some frameworks like xgboost/mxnet requires 50 parallel workers,
 Spark is desired to provide a capability to ensure that either we run 50
 tasks at once, or we should quit the complete application/job after some
 timeout period

 Best,

 Nan

 On Tue, May 8, 2018 at 11:10 AM, Reynold Xin 
 wrote:

> I think that's what Xiangrui was referring to. Instead of retrying a
> single task, retry the entire stage, and the entire stage of tasks need to
> be scheduled all at once.
>
>
> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>>
>>>
- Fault tolerance and execution model: Spark assumes
fine-grained task recovery, i.e. if something fails, only that task 
 is
rerun. This doesn’t match the execution model of distributed ML/DL
frameworks that are typically MPI-based, and rerunning a single 
 task would
lead to the entire system hanging. A whole stage needs to be re-run.

 This is not only useful for integrating with 3rd-party frameworks,
>>> but also useful for scaling MLlib algorithms. One of my earliest 
>>> attempts
>>> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>>> ). But we ended
>>> up with some compromised solutions. With the new execution model, we can
>>> set up a hybrid cluster and do all-reduce properly.
>>>
>>>
>> Is there a particular new execution model you are referring to or do
>> we plan to investigate a new execution model ?  For the MPI-like model, 
>> we
>> also need gang scheduling (i.e. schedule all tasks at once or none of 
>> them)
>> and I dont think we have support for that in the scheduler right now.
>>
>>>
 --
>>>
>>> Xiangrui Meng
>>>
>>> Software Engineer
>>>
>>> Databricks Inc. [image: http://databricks.com]
>>> 
>>>
>>
>>

>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] 
>


Re: Integrating ML/DL frameworks with Spark

2018-05-09 Thread Xiangrui Meng
Shivaram: Yes, we can call it "gang scheduling" or "barrier
synchronization". Spark doesn't support it now. The proposal is to have a
proper support in Spark's job scheduler, so we can integrate well with
MPI-like frameworks.

On Tue, May 8, 2018 at 11:17 AM Nan Zhu  wrote:

> .how I skipped the last part
>
> On Tue, May 8, 2018 at 11:16 AM, Reynold Xin  wrote:
>
>> Yes, Nan, totally agree. To be on the same page, that's exactly what I
>> wrote wasn't it?
>>
>> On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:
>>
>>> besides that, one of the things which is needed by multiple frameworks
>>> is to schedule tasks in a single wave
>>>
>>> i.e.
>>>
>>> if some frameworks like xgboost/mxnet requires 50 parallel workers,
>>> Spark is desired to provide a capability to ensure that either we run 50
>>> tasks at once, or we should quit the complete application/job after some
>>> timeout period
>>>
>>> Best,
>>>
>>> Nan
>>>
>>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin 
>>> wrote:
>>>
 I think that's what Xiangrui was referring to. Instead of retrying a
 single task, retry the entire stage, and the entire stage of tasks need to
 be scheduled all at once.


 On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
 shiva...@eecs.berkeley.edu> wrote:

>
>>
>>>- Fault tolerance and execution model: Spark assumes
>>>fine-grained task recovery, i.e. if something fails, only that task 
>>> is
>>>rerun. This doesn’t match the execution model of distributed ML/DL
>>>frameworks that are typically MPI-based, and rerunning a single task 
>>> would
>>>lead to the entire system hanging. A whole stage needs to be re-run.
>>>
>>> This is not only useful for integrating with 3rd-party frameworks,
>> but also useful for scaling MLlib algorithms. One of my earliest attempts
>> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>> ). But we ended up
>> with some compromised solutions. With the new execution model, we can set
>> up a hybrid cluster and do all-reduce properly.
>>
>>
> Is there a particular new execution model you are referring to or do
> we plan to investigate a new execution model ?  For the MPI-like model, we
> also need gang scheduling (i.e. schedule all tasks at once or none of 
> them)
> and I dont think we have support for that in the scheduler right now.
>
>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com]
>> 
>>
>
>
>>>
> --

Xiangrui Meng

Software Engineer

Databricks Inc. [image: http://databricks.com] 


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Nan Zhu
.how I skipped the last part

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin  wrote:

> Yes, Nan, totally agree. To be on the same page, that's exactly what I
> wrote wasn't it?
>
> On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:
>
>> besides that, one of the things which is needed by multiple frameworks is
>> to schedule tasks in a single wave
>>
>> i.e.
>>
>> if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
>> is desired to provide a capability to ensure that either we run 50 tasks at
>> once, or we should quit the complete application/job after some timeout
>> period
>>
>> Best,
>>
>> Nan
>>
>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:
>>
>>> I think that's what Xiangrui was referring to. Instead of retrying a
>>> single task, retry the entire stage, and the entire stage of tasks need to
>>> be scheduled all at once.
>>>
>>>
>>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>

>
>>- Fault tolerance and execution model: Spark assumes fine-grained
>>task recovery, i.e. if something fails, only that task is rerun. This
>>doesn’t match the execution model of distributed ML/DL frameworks 
>> that are
>>typically MPI-based, and rerunning a single task would lead to the 
>> entire
>>system hanging. A whole stage needs to be re-run.
>>
>> This is not only useful for integrating with 3rd-party frameworks,
> but also useful for scaling MLlib algorithms. One of my earliest attempts
> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> ). But we ended up
> with some compromised solutions. With the new execution model, we can set
> up a hybrid cluster and do all-reduce properly.
>
>
 Is there a particular new execution model you are referring to or do we
 plan to investigate a new execution model ?  For the MPI-like model, we
 also need gang scheduling (i.e. schedule all tasks at once or none of them)
 and I dont think we have support for that in the scheduler right now.

>
>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com]
> 
>


>>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Reynold Xin
Yes, Nan, totally agree. To be on the same page, that's exactly what I
wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu  wrote:

> besides that, one of the things which is needed by multiple frameworks is
> to schedule tasks in a single wave
>
> i.e.
>
> if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
> is desired to provide a capability to ensure that either we run 50 tasks at
> once, or we should quit the complete application/job after some timeout
> period
>
> Best,
>
> Nan
>
> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:
>
>> I think that's what Xiangrui was referring to. Instead of retrying a
>> single task, retry the entire stage, and the entire stage of tasks need to
>> be scheduled all at once.
>>
>>
>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>>

>- Fault tolerance and execution model: Spark assumes fine-grained
>task recovery, i.e. if something fails, only that task is rerun. This
>doesn’t match the execution model of distributed ML/DL frameworks that 
> are
>typically MPI-based, and rerunning a single task would lead to the 
> entire
>system hanging. A whole stage needs to be re-run.
>
> This is not only useful for integrating with 3rd-party frameworks, but
 also useful for scaling MLlib algorithms. One of my earliest attempts in
 Spark MLlib was to implement All-Reduce primitive (SPARK-1485
 ). But we ended up
 with some compromised solutions. With the new execution model, we can set
 up a hybrid cluster and do all-reduce properly.


>>> Is there a particular new execution model you are referring to or do we
>>> plan to investigate a new execution model ?  For the MPI-like model, we
>>> also need gang scheduling (i.e. schedule all tasks at once or none of them)
>>> and I dont think we have support for that in the scheduler right now.
>>>

> --

 Xiangrui Meng

 Software Engineer

 Databricks Inc. [image: http://databricks.com] 

>>>
>>>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Nan Zhu
besides that, one of the things which is needed by multiple frameworks is
to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark
is desired to provide a capability to ensure that either we run 50 tasks at
once, or we should quit the complete application/job after some timeout
period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin  wrote:

> I think that's what Xiangrui was referring to. Instead of retrying a
> single task, retry the entire stage, and the entire stage of tasks need to
> be scheduled all at once.
>
>
> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>>
>>>
- Fault tolerance and execution model: Spark assumes fine-grained
task recovery, i.e. if something fails, only that task is rerun. This
doesn’t match the execution model of distributed ML/DL frameworks that 
 are
typically MPI-based, and rerunning a single task would lead to the 
 entire
system hanging. A whole stage needs to be re-run.

 This is not only useful for integrating with 3rd-party frameworks, but
>>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>>> ). But we ended up
>>> with some compromised solutions. With the new execution model, we can set
>>> up a hybrid cluster and do all-reduce properly.
>>>
>>>
>> Is there a particular new execution model you are referring to or do we
>> plan to investigate a new execution model ?  For the MPI-like model, we
>> also need gang scheduling (i.e. schedule all tasks at once or none of them)
>> and I dont think we have support for that in the scheduler right now.
>>
>>>
 --
>>>
>>> Xiangrui Meng
>>>
>>> Software Engineer
>>>
>>> Databricks Inc. [image: http://databricks.com] 
>>>
>>
>>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Reynold Xin
I think that's what Xiangrui was referring to. Instead of retrying a single
task, retry the entire stage, and the entire stage of tasks need to be
scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

>
>>
>>>- Fault tolerance and execution model: Spark assumes fine-grained
>>>task recovery, i.e. if something fails, only that task is rerun. This
>>>doesn’t match the execution model of distributed ML/DL frameworks that 
>>> are
>>>typically MPI-based, and rerunning a single task would lead to the entire
>>>system hanging. A whole stage needs to be re-run.
>>>
>>> This is not only useful for integrating with 3rd-party frameworks, but
>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>> ). But we ended up
>> with some compromised solutions. With the new execution model, we can set
>> up a hybrid cluster and do all-reduce properly.
>>
>>
> Is there a particular new execution model you are referring to or do we
> plan to investigate a new execution model ?  For the MPI-like model, we
> also need gang scheduling (i.e. schedule all tasks at once or none of them)
> and I dont think we have support for that in the scheduler right now.
>
>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com] 
>>
>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Naveen Swamy
I am committer on the MXNet project and very interested in working on
Integrating with Spark.
I am wondering how would training proceed in case of
1)  training is done on one host with multiple GPUs -- I don't know if
Spark's capabilities can leveraged here
2) distributed training with data parallelism -- how can we leverage
Spark's map reduce model to fit distributed training. model of execution
here is more of iterative in nature.

Please let me know.

Thanks, Naveen



On Tue, May 8, 2018 at 8:53 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

>
>>
>>>- Fault tolerance and execution model: Spark assumes fine-grained
>>>task recovery, i.e. if something fails, only that task is rerun. This
>>>doesn’t match the execution model of distributed ML/DL frameworks that 
>>> are
>>>typically MPI-based, and rerunning a single task would lead to the entire
>>>system hanging. A whole stage needs to be re-run.
>>>
>>> This is not only useful for integrating with 3rd-party frameworks, but
>> also useful for scaling MLlib algorithms. One of my earliest attempts in
>> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
>> ). But we ended up
>> with some compromised solutions. With the new execution model, we can set
>> up a hybrid cluster and do all-reduce properly.
>>
>>
> Is there a particular new execution model you are referring to or do we
> plan to investigate a new execution model ?  For the MPI-like model, we
> also need gang scheduling (i.e. schedule all tasks at once or none of them)
> and I dont think we have support for that in the scheduler right now.
>
>>
>>> --
>>
>> Xiangrui Meng
>>
>> Software Engineer
>>
>> Databricks Inc. [image: http://databricks.com] 
>>
>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Shivaram Venkataraman
>
>
>
>>- Fault tolerance and execution model: Spark assumes fine-grained
>>task recovery, i.e. if something fails, only that task is rerun. This
>>doesn’t match the execution model of distributed ML/DL frameworks that are
>>typically MPI-based, and rerunning a single task would lead to the entire
>>system hanging. A whole stage needs to be re-run.
>>
>> This is not only useful for integrating with 3rd-party frameworks, but
> also useful for scaling MLlib algorithms. One of my earliest attempts in
> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> ). But we ended up with
> some compromised solutions. With the new execution model, we can set up a
> hybrid cluster and do all-reduce properly.
>
>
Is there a particular new execution model you are referring to or do we
plan to investigate a new execution model ?  For the MPI-like model, we
also need gang scheduling (i.e. schedule all tasks at once or none of them)
and I dont think we have support for that in the scheduler right now.

>
>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] 
>


Re: Integrating ML/DL frameworks with Spark

2018-05-08 Thread Jörn Franke
Hi,

You misunderstood me. I exactly wanted to say that Spark should be aware of 
them. So I agree with you. The point is to have also the yarn GPU/fpga 
scheduling as an option aside a potential spark GPU/fpga scheduler.

For the other proposal - yes the interfaces are slow, but one has to think in 
which part they need to be improved for optimal performance ml framework, Spark 
or in both. My gut feeling is in both. 

Best regards

Best regards

> On 8. May 2018, at 07:11, Reynold Xin  wrote:
> 
> I don't think it's sufficient to have them in YARN (or any other services) 
> without Spark aware of them. If Spark is not aware of them, then there is no 
> way to really efficiently utilize these accelerators when you run anything 
> that require non-accelerators (which is almost 100% of the cases in real 
> world workloads).
> 
> For the other two, the point is not to implement all the ML/DL algorithms in 
> Spark, but make Spark integrate well with ML/DL frameworks. Otherwise you 
> will have the problems I described (super low performance when exchanging 
> data between Spark and ML/DL frameworks, and hanging issues with MPI-based 
> programs).
> 
> 
>> On Mon, May 7, 2018 at 10:05 PM Jörn Franke  wrote:
>> Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA 
>> scheduling, so it might be worth to have the last point generic that not 
>> only the Spark scheduler, but all supported schedulers can use GPU.
>> 
>> For the other 2 points I just wonder if it makes sense to address this in 
>> the ml frameworks themselves or in Spark.
>> 
>>> On 8. May 2018, at 06:59, Xiangrui Meng  wrote:
>>> 
>>> Thanks Reynold for summarizing the offline discussion! I added a few 
>>> comments inline. -Xiangrui
>>> 
 On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:
 Hi all,
 
 Xiangrui and I were discussing with a heavy Apache Spark user last week on 
 their experiences integrating machine learning (and deep learning) 
 frameworks with Spark and some of their pain points. Couple things were 
 obvious and I wanted to share our learnings with the list.
 
 (1) Most organizations already use Spark for data plumbing and want to be 
 able to run their ML part of the stack on Spark as well (not necessarily 
 re-implementing all the algorithms but by integrating various frameworks 
 like tensorflow, mxnet with Spark).
 
 (2) The integration is however painful, from the systems perspective:
 
 Performance: data exchange between Spark and other frameworks are slow, 
 because UDFs across process boundaries (with native code) are slow. This 
 works much better now with Pandas UDFs (given a lot of the ML/DL 
 frameworks are in Python). However, there might be some low hanging fruit 
 gaps here.
>>> The Arrow support behind Pands UDFs can be reused to exchange data with 
>>> other frameworks. And one possibly performance improvement is to support 
>>> pipelining when supplying data to other frameworks. For example, while 
>>> Spark is pumping data from external sources into TensorFlow, TensorFlow 
>>> starts the computation on GPUs. This would significant improve speed and 
>>> resource utilization.
 Fault tolerance and execution model: Spark assumes fine-grained task 
 recovery, i.e. if something fails, only that task is rerun. This doesn’t 
 match the execution model of distributed ML/DL frameworks that are 
 typically MPI-based, and rerunning a single task would lead to the entire 
 system hanging. A whole stage needs to be re-run.
>>> This is not only useful for integrating with 3rd-party frameworks, but also 
>>> useful for scaling MLlib algorithms. One of my earliest attempts in Spark 
>>> MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up 
>>> with some compromised solutions. With the new execution model, we can set 
>>> up a hybrid cluster and do all-reduce properly.
>>>  
 Accelerator-aware scheduling: The DL frameworks leverage GPUs and 
 sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t 
 aware of those resources, leading to either over-utilizing the 
 accelerators or under-utilizing the CPUs.
 
 The good thing is that none of these seem very difficult to address (and 
 we have already made progress on one of them). Xiangrui has graciously 
 accepted the challenge to come up with solutions and SPIP to these.
 
>>> 
>>> I will do more home work, exploring existing JIRAs or creating new JIRAs 
>>> for the proposal. We'd like to hear your feedback and past efforts along 
>>> those directions if they were not fully captured by our JIRA.
>>>  
 Xiangrui - please also chime in if I didn’t capture everything. 
 
 
>>> -- 
>>> Xiangrui Meng
>>> Software Engineer
>>> Databricks Inc. 


Re: Integrating ML/DL frameworks with Spark

2018-05-07 Thread Reynold Xin
I don't think it's sufficient to have them in YARN (or any other services)
without Spark aware of them. If Spark is not aware of them, then there is
no way to really efficiently utilize these accelerators when you run
anything that require non-accelerators (which is almost 100% of the cases
in real world workloads).

For the other two, the point is not to implement all the ML/DL algorithms
in Spark, but make Spark integrate well with ML/DL frameworks. Otherwise
you will have the problems I described (super low performance when
exchanging data between Spark and ML/DL frameworks, and hanging issues with
MPI-based programs).


On Mon, May 7, 2018 at 10:05 PM Jörn Franke  wrote:

> Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA
> scheduling, so it might be worth to have the last point generic that not
> only the Spark scheduler, but all supported schedulers can use GPU.
>
> For the other 2 points I just wonder if it makes sense to address this in
> the ml frameworks themselves or in Spark.
>
> On 8. May 2018, at 06:59, Xiangrui Meng  wrote:
>
> Thanks Reynold for summarizing the offline discussion! I added a few
> comments inline. -Xiangrui
>
> On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:
>
>> Hi all,
>>
>> Xiangrui and I were discussing with a heavy Apache Spark user last week
>> on their experiences integrating machine learning (and deep learning)
>> frameworks with Spark and some of their pain points. Couple things were
>> obvious and I wanted to share our learnings with the list.
>>
>> (1) Most organizations already use Spark for data plumbing and want to be
>> able to run their ML part of the stack on Spark as well (not necessarily
>> re-implementing all the algorithms but by integrating various frameworks
>> like tensorflow, mxnet with Spark).
>>
>> (2) The integration is however painful, from the systems perspective:
>>
>>
>>- Performance: data exchange between Spark and other frameworks are
>>slow, because UDFs across process boundaries (with native code) are slow.
>>This works much better now with Pandas UDFs (given a lot of the ML/DL
>>frameworks are in Python). However, there might be some low hanging fruit
>>gaps here.
>>
>> The Arrow support behind Pands UDFs can be reused to exchange data with
> other frameworks. And one possibly performance improvement is to support
> pipelining when supplying data to other frameworks. For example, while
> Spark is pumping data from external sources into TensorFlow, TensorFlow
> starts the computation on GPUs. This would significant improve speed and
> resource utilization.
>
>>
>>- Fault tolerance and execution model: Spark assumes fine-grained
>>task recovery, i.e. if something fails, only that task is rerun. This
>>doesn’t match the execution model of distributed ML/DL frameworks that are
>>typically MPI-based, and rerunning a single task would lead to the entire
>>system hanging. A whole stage needs to be re-run.
>>
>> This is not only useful for integrating with 3rd-party frameworks, but
> also useful for scaling MLlib algorithms. One of my earliest attempts in
> Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> ). But we ended up with
> some compromised solutions. With the new execution model, we can set up a
> hybrid cluster and do all-reduce properly.
>
>
>>
>>- Accelerator-aware scheduling: The DL frameworks leverage GPUs and
>>sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t
>>aware of those resources, leading to either over-utilizing the 
>> accelerators
>>or under-utilizing the CPUs.
>>
>>
>> The good thing is that none of these seem very difficult to address (and
>> we have already made progress on one of them). Xiangrui has graciously
>> accepted the challenge to come up with solutions and SPIP to these.
>>
>>
> I will do more home work, exploring existing JIRAs or creating new JIRAs
> for the proposal. We'd like to hear your feedback and past efforts along
> those directions if they were not fully captured by our JIRA.
>
>
>> Xiangrui - please also chime in if I didn’t capture everything.
>>
>>
>> --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] 
>
>


Re: Integrating ML/DL frameworks with Spark

2018-05-07 Thread Jörn Franke
Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA scheduling, 
so it might be worth to have the last point generic that not only the Spark 
scheduler, but all supported schedulers can use GPU.

For the other 2 points I just wonder if it makes sense to address this in the 
ml frameworks themselves or in Spark.

> On 8. May 2018, at 06:59, Xiangrui Meng  wrote:
> 
> Thanks Reynold for summarizing the offline discussion! I added a few comments 
> inline. -Xiangrui
> 
>> On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:
>> Hi all,
>> 
>> Xiangrui and I were discussing with a heavy Apache Spark user last week on 
>> their experiences integrating machine learning (and deep learning) 
>> frameworks with Spark and some of their pain points. Couple things were 
>> obvious and I wanted to share our learnings with the list.
>> 
>> (1) Most organizations already use Spark for data plumbing and want to be 
>> able to run their ML part of the stack on Spark as well (not necessarily 
>> re-implementing all the algorithms but by integrating various frameworks 
>> like tensorflow, mxnet with Spark).
>> 
>> (2) The integration is however painful, from the systems perspective:
>> 
>> Performance: data exchange between Spark and other frameworks are slow, 
>> because UDFs across process boundaries (with native code) are slow. This 
>> works much better now with Pandas UDFs (given a lot of the ML/DL frameworks 
>> are in Python). However, there might be some low hanging fruit gaps here.
> The Arrow support behind Pands UDFs can be reused to exchange data with other 
> frameworks. And one possibly performance improvement is to support pipelining 
> when supplying data to other frameworks. For example, while Spark is pumping 
> data from external sources into TensorFlow, TensorFlow starts the computation 
> on GPUs. This would significant improve speed and resource utilization.
>> Fault tolerance and execution model: Spark assumes fine-grained task 
>> recovery, i.e. if something fails, only that task is rerun. This doesn’t 
>> match the execution model of distributed ML/DL frameworks that are typically 
>> MPI-based, and rerunning a single task would lead to the entire system 
>> hanging. A whole stage needs to be re-run.
> This is not only useful for integrating with 3rd-party frameworks, but also 
> useful for scaling MLlib algorithms. One of my earliest attempts in Spark 
> MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up 
> with some compromised solutions. With the new execution model, we can set up 
> a hybrid cluster and do all-reduce properly.
>  
>> Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes 
>> FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of 
>> those resources, leading to either over-utilizing the accelerators or 
>> under-utilizing the CPUs.
>> 
>> The good thing is that none of these seem very difficult to address (and we 
>> have already made progress on one of them). Xiangrui has graciously accepted 
>> the challenge to come up with solutions and SPIP to these.
>> 
> 
> I will do more home work, exploring existing JIRAs or creating new JIRAs for 
> the proposal. We'd like to hear your feedback and past efforts along those 
> directions if they were not fully captured by our JIRA.
>  
>> Xiangrui - please also chime in if I didn’t capture everything. 
>> 
>> 
> -- 
> Xiangrui Meng
> Software Engineer
> Databricks Inc. 


Re: Integrating ML/DL frameworks with Spark

2018-05-07 Thread Xiangrui Meng
Thanks Reynold for summarizing the offline discussion! I added a few
comments inline. -Xiangrui

On Mon, May 7, 2018 at 5:37 PM Reynold Xin  wrote:

> Hi all,
>
> Xiangrui and I were discussing with a heavy Apache Spark user last week on
> their experiences integrating machine learning (and deep learning)
> frameworks with Spark and some of their pain points. Couple things were
> obvious and I wanted to share our learnings with the list.
>
> (1) Most organizations already use Spark for data plumbing and want to be
> able to run their ML part of the stack on Spark as well (not necessarily
> re-implementing all the algorithms but by integrating various frameworks
> like tensorflow, mxnet with Spark).
>
> (2) The integration is however painful, from the systems perspective:
>
>
>- Performance: data exchange between Spark and other frameworks are
>slow, because UDFs across process boundaries (with native code) are slow.
>This works much better now with Pandas UDFs (given a lot of the ML/DL
>frameworks are in Python). However, there might be some low hanging fruit
>gaps here.
>
> The Arrow support behind Pands UDFs can be reused to exchange data with
other frameworks. And one possibly performance improvement is to support
pipelining when supplying data to other frameworks. For example, while
Spark is pumping data from external sources into TensorFlow, TensorFlow
starts the computation on GPUs. This would significant improve speed and
resource utilization.

>
>- Fault tolerance and execution model: Spark assumes fine-grained task
>recovery, i.e. if something fails, only that task is rerun. This doesn’t
>match the execution model of distributed ML/DL frameworks that are
>typically MPI-based, and rerunning a single task would lead to the entire
>system hanging. A whole stage needs to be re-run.
>
> This is not only useful for integrating with 3rd-party frameworks, but
also useful for scaling MLlib algorithms. One of my earliest attempts in
Spark MLlib was to implement All-Reduce primitive (SPARK-1485
). But we ended up with
some compromised solutions. With the new execution model, we can set up a
hybrid cluster and do all-reduce properly.


>
>- Accelerator-aware scheduling: The DL frameworks leverage GPUs and
>sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t
>aware of those resources, leading to either over-utilizing the accelerators
>or under-utilizing the CPUs.
>
>
> The good thing is that none of these seem very difficult to address (and
> we have already made progress on one of them). Xiangrui has graciously
> accepted the challenge to come up with solutions and SPIP to these.
>
>
I will do more home work, exploring existing JIRAs or creating new JIRAs
for the proposal. We'd like to hear your feedback and past efforts along
those directions if they were not fully captured by our JIRA.


> Xiangrui - please also chime in if I didn’t capture everything.
>
>
> --

Xiangrui Meng

Software Engineer

Databricks Inc. [image: http://databricks.com]