Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

2019-06-05 Thread Matt Cheah
Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on 
this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and 
to clarify our criteria for success in the project. Some of the APIs have also 
been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,
Would any PMC be willing to become the shepherd for this SPIP?
Is there any more feedback regarding this proposal?
What would we need to do to take this to a voting phase and to begin proposing 
our work against upstream Spark?
 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" 
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan 
Cc: Bo Yang , Ilan Filonenko , Imran Rashid 
, Justin Uang , Liang Tang 
, Marcelo Vanzin , Matei Zaharia 
, Matt Cheah , Min Shen 
, Reynold Xin , Ryan Blue 
, Vinoo Ganesh , Will Manning 
, "b...@fb.com" , "dev@spark.apache.org" 
, "fel...@uber.com" , 
"f...@linkedin.com" , "tgraves...@gmail.com" 
, "yez...@linkedin.com" , 
"yue...@memverge.com" 
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the 
points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There 
aren't strict requirements on how other ShuffleManager implementations must 
behave, so it seems impractical to design an API that could also satisfy those 
unknown requirements. However, we do believe that the API is rather generic, 
using OutputStreams for writes and InputStreams for reads, and indexing the 
data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat 
the data in the same chunks and want an interface for storage, then they can 
also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each 
shuffle task is deterministic, which meant that even if a later mapper overrode 
a previous committed mapper's value, it's still the same contents. Having 
searched some tickets and reading 
https://github.com/apache/spark/pull/22112/files more carefully, I think there 
are problems with our original thought if the writer writes all attempts of a 
task to the same location. One example is if the writer implementation writes 
each partition to the remote host in a sequence of chunks. In such a situation, 
a reducer might read data half written by the original task and half written by 
the running speculative task, which will not be the correct contents if the 
mapper output is unordered. Therefore, writes by a single mapper might have to 
be transactioned, which is not clear from the API, and seems rather complex to 
reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to 
add an additional attemptId to the storage data index (which can be stored 
within the MapStatus) to solve the problem of concurrent writes. This would 
also make it more clear that the writer should use attempt ID as an index to 
ensure that writes from speculative tasks don't interfere with one another (we 
can add that to the API docs as well).

 

From: Mridul Muralidharan 
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" 
Cc: Bo Yang , Ilan Filonenko , Imran Rashid 
, Justin Uang , Liang Tang 
, Marcelo Vanzin , Matei Zaharia 
, Matt Cheah , Min Shen 
, Reynold Xin , Ryan Blue 
, Vinoo Ganesh , Will Manning 
, "b...@fb.com" , "dev@spark.apache.org" 
, "fel...@uber.com" , 
"f...@linkedin.com" , "tgraves...@gmail.com" 
, "yez...@linkedin.com" , 
"yue...@memverge.com" 
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things 
come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing 
implementation, a clean design which does not tie into existing shuffle 
implementation would be preferable (if it can be done without over 
engineering). Shuffle implementation can change and there are custom 
implementations and experiments which differ quite a bit from what comes with 
Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in 
spark, implicitly due to task scheduler logic, you won’t have conflicts at an 
executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish 
different tasks for same partition would be necessary for nontrivial 
implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward 
to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD)  wrote:

Hi everyone,

For the past several months, we have been working on an API 

Re: Closing a SparkSession stops the SparkContext

2019-06-05 Thread Vinoo Ganesh
Picking up this email thread again around point #1 below, filed 
https://issues.apache.org/jira/browse/SPARK-27958 and put up a PR (still have 
to write tests) https://github.com/apache/spark/pull/24807 just to begin the 
conversation.


From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Wednesday, April 3, 2019 at 13:14
To: Vinoo Ganesh 
Cc: Sean Owen , Arun Mahadevan , 
"dev@spark.apache.org" 
Subject: Re: Closing a SparkSession stops the SparkContext

For #1, do we agree on the behavior? I think that closing a SparkSession should 
not close the SparkContext unless it is the only session. Evidently, that's not 
what happens and I consider the current the current behavior a bug.

For more context, we're working on the new catalog APIs and how to guarantee 
consistent operations. Self-joining a table, for example, should use the same 
version of the table for both scans, and that state should be specific to a 
session, not global. These plans assume that SparkSession represents a session 
of interactions, along with a reasonable life-cycle. If that life-cycle 
includes closing all sessions when you close any session, then we can't really 
use sessions for this.

rb

On Wed, Apr 3, 2019 at 9:35 AM Vinoo Ganesh 
mailto:vgan...@palantir.com>> wrote:

Yeah, so I think there are 2 separate issues here:



  1.  The coupling of the SparkSession + SparkContext in their current form 
seem unnatural
  2.  The current memory leak, which I do believe is a case where the session 
is added onto the spark context, but is only needed by the session (but would 
appreciate a sanity check here). Meaning, it may make sense to investigate an 
API change.



Thoughts?



On 4/2/19, 15:13, "Sean Owen" mailto:sro...@gmail.com>> wrote:

> @Sean – To the point that Ryan made, it feels wrong that stopping a 
session force stops the global context. Building in the logic to only stop the 
context when the last session is stopped also feels like a solution, but the 
best way I can think about doing this involves storing the global list of every 
available SparkSession, which may be difficult.



I tend to agree it would be more natural for the SparkSession to have

its own lifecycle 'stop' method that only stops/releases its own

resources. But is that the source of the problem here? if the state

you're trying to free is needed by the SparkContext, it won't help. If

it happens to be in the SparkContext but is state only needed by one

SparkSession and that there isn't any way to clean up now, that's a

compelling reason to change the API.  Is that the situation? The only

downside is making the user separately stop the SparkContext then.

From: Vinoo Ganesh mailto:vgan...@palantir.com>>
Date: Tuesday, April 2, 2019 at 13:24
To: Arun Mahadevan mailto:ar...@apache.org>>, Ryan Blue 
mailto:rb...@netflix.com>>
Cc: Sean Owen mailto:sro...@gmail.com>>, 
"dev@spark.apache.org" 
mailto:dev@spark.apache.org>>
Subject: Re: Closing a SparkSession stops the SparkContext

// Merging threads

Thanks everyone for your thoughts. I’m very much in sync with Ryan here.

@Sean – To the point that Ryan made, it feels wrong that stopping a session 
force stops the global context. Building in the logic to only stop the context 
when the last session is stopped also feels like a solution, but the best way I 
can think about doing this involves storing the global list of every available 
SparkSession, which may be difficult.

@Arun – If the intention is not to be able to clear and create new sessions, 
then what specific is the intended use case of Sessions? 
https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
 
[databricks.com]
 describes SparkSessions as time bounded interactions which implies that old 
ones should be clear-able an news ones create-able in lockstep without adverse 
effect?

From: Arun Mahadevan mailto:ar...@apache.org>>
Date: Tuesday, April 2, 2019 at 12:31
To: Ryan Blue mailto:rb...@netflix.com>>
Cc: Vinoo Ganesh mailto:vgan...@palantir.com>>, Sean Owen 
mailto:sro...@gmail.com>>, 
"dev@spark.apache.org" 
mailto:dev@spark.apache.org>>
Subject: Re: Closing a SparkSession stops the SparkContext

I am not sure how would it cause a leak though. When a spark session or the 
underlying context is stopped it should clean up everything. The getOrCreate is 
supposed to return the active thread local or the global session. May be if you 
keep creating new sessions after explicitly clearing the default and the local 
sessions and keep leaking the sessions it could happen, but I don't think 

CI build issue with MiMa tests

2019-06-05 Thread Martin Junghanns

Hi all,

While introducing new modules for Spark Graph 
(https://github.com/apache/spark/pull/24490),
builds started to turn red because of failing MiMa (binary 
compatibility) tests.


We had one successful build, however, all subsequent builds fail:

Successful run: 
https://github.com/apache/spark/pull/24490#issuecomment-498075761
First failed run: 
https://github.com/apache/spark/pull/24490#issuecomment-498104960


Both builds run on the same commit: 
https://github.com/apache/spark/commit/bf7db46d7dc548ffecb84d5696281951a46e5ae7


The build log contains:

[error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/mima 
-Phadoop-2.7 -Pkubernetes -Phive-thriftserver -Pkinesis-asl -Pyarn 
-Pspark-ganglia-lgpl -Phive -Pmesos ; received return code 1


I tried to reproduce this locally, but got return code 0 without any 
output when running


'dev/mima'

and

'dev/mima -Phadoop-2.7 -Pkubernetes -Phive-thriftserver -Pkinesis-asl 
-Pyarn -Pspark-ganglia-lgpl -Phive -Pmesos'


Any ideas what could cause CI to change its behaviour between builds? 
Are we missing something in the way we introduced the modules? Any help 
would be appreciated.


Thanks,

Martin