Re: Utilizing YARN AM RPC port field

2016-06-15 Thread Mingyu Kim
FYI, I just filed https://issues.apache.org/jira/browse/SPARK-15974.

 

Mingyu

 

From: Mingyu Kim <m...@palantir.com>
Date: Tuesday, June 14, 2016 at 2:13 PM
To: Steve Loughran <ste...@hortonworks.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Matt Cheah 
<mch...@palantir.com>
Subject: Re: Utilizing YARN AM RPC port field

 

Thanks for the pointers, Steve!

 

The first option sounds like a the most light-weight and non-disruptive option 
among them. So, we can add a configuration that enables socket initialization, 
Spark AM will create a ServerSocket if the socket init is enabled and set it on 
SparkContext

 

If there are no objections, I can file a bug and find time to tackle it myself. 

 

Mingyu

 

From: Steve Loughran <ste...@hortonworks.com>
Date: Tuesday, June 14, 2016 at 4:55 AM
To: Mingyu Kim <m...@palantir.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Matt Cheah 
<mch...@palantir.com>
Subject: Re: Utilizing YARN AM RPC port field

 

 

On 14 Jun 2016, at 01:30, Mingyu Kim <m...@palantir.com> wrote:

 

Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

 

It's a recurrent irritation of mine that you can't ever change the HTTP/RPC 
ports of a YARN AM after launch; it creates a complex startup state where you 
can't register until your IPC endpoints are up.

 

Tactics

 

-Create a socket on an empty port, register it, hand off the port to the RPC 
setup code as the chosen port. Ideally, support a range to scan, so that 
systems which only open a specific range of ports, e.g. 6500-6800 can have 
those ports only scanned. We've done this in other projects.

 

-serve up the port binding info via a REST API off the AM web; clients hit the 
(HEAD/GET only RM Proxy), ask for the port, work on it. Nonstandard; could be 
extensible with other binding information. (TTL of port caching, )

 

-Use the YARN-913 ZK based registry to register/lookup bindings. This is used 
in various YARN apps to register service endpoints (RPC, Rest); there's work 
ongoing for DNS support. this would allow you to use DNS against a specific DNS 
server to get the endpoints. Works really well with containerized deployments 
where the apps come up with per-container IPaddresses and fixed ports.

Although you couldn't get the latter into the spark-yarn codeitself (needs 
Hadoop 2.6+), you can plug in support via the extension point implemented in 
SPARK-11314., I've actually thought of doing that for a while...just been too 
busy.

 

-Just fix the bit of the YARN api that forces you to know your endpoints in 
advance. People will appreciate it, though it will take a while to trickle 
downstream.

 

 

 

 



smime.p7s
Description: S/MIME cryptographic signature


Re: Utilizing YARN AM RPC port field

2016-06-14 Thread Mingyu Kim
Thanks for the pointers, Steve!

 

The first option sounds like a the most light-weight and non-disruptive option 
among them. So, we can add a configuration that enables socket initialization, 
Spark AM will create a ServerSocket if the socket init is enabled and set it on 
SparkContext

 

If there are no objections, I can file a bug and find time to tackle it myself. 

 

Mingyu

 

From: Steve Loughran <ste...@hortonworks.com>
Date: Tuesday, June 14, 2016 at 4:55 AM
To: Mingyu Kim <m...@palantir.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Matt Cheah 
<mch...@palantir.com>
Subject: Re: Utilizing YARN AM RPC port field

 

 

On 14 Jun 2016, at 01:30, Mingyu Kim <m...@palantir.com> wrote:

 

Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

 

It's a recurrent irritation of mine that you can't ever change the HTTP/RPC 
ports of a YARN AM after launch; it creates a complex startup state where you 
can't register until your IPC endpoints are up.

 

Tactics

 

-Create a socket on an empty port, register it, hand off the port to the RPC 
setup code as the chosen port. Ideally, support a range to scan, so that 
systems which only open a specific range of ports, e.g. 6500-6800 can have 
those ports only scanned. We've done this in other projects.

 

-serve up the port binding info via a REST API off the AM web; clients hit the 
(HEAD/GET only RM Proxy), ask for the port, work on it. Nonstandard; could be 
extensible with other binding information. (TTL of port caching, )

 

-Use the YARN-913 ZK based registry to register/lookup bindings. This is used 
in various YARN apps to register service endpoints (RPC, Rest); there's work 
ongoing for DNS support. this would allow you to use DNS against a specific DNS 
server to get the endpoints. Works really well with containerized deployments 
where the apps come up with per-container IPaddresses and fixed ports.

Although you couldn't get the latter into the spark-yarn codeitself (needs 
Hadoop 2.6+), you can plug in support via the extension point implemented in 
SPARK-11314., I've actually thought of doing that for a while...just been too 
busy.

 

-Just fix the bit of the YARN api that forces you to know your endpoints in 
advance. People will appreciate it, though it will take a while to trickle 
downstream.

 

 

 

 



smime.p7s
Description: S/MIME cryptographic signature


Utilizing YARN AM RPC port field

2016-06-13 Thread Mingyu Kim
Hi all,

 

YARN provides a way for AppilcationMaster to register a RPC port so that a 
client outside the YARN cluster can reach the application for any RPCs, but 
Spark’s YARN AMs simply register a dummy port number of 0. (See 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala#L74)
 This is useful for the long-running Spark application usecases where jobs are 
submitted via a form of RPC to an already started Spark context running in YARN 
cluster mode. Spark job server 
(https://github.com/spark-jobserver/spark-jobserver) and Livy 
(https://github.com/cloudera/hue/tree/master/apps/spark/java) are good 
open-source examples of these usecases. The current work-around is to have the 
Spark AM make a call back to a configured URL with the port number of the RPC 
server for the client to communicate with the AM.

 

Utilizing YARN AM RPC port allows the port number reporting to be done in a 
secure way (i.e. With AM RPC port field and Kerberized YARN cluster, you don’t 
need to re-invent a way to verify the authenticity of the port number 
reporting.) and removes the callback from YARN cluster back to a client, which 
means you can operate YARN in a low-trust environment and run other client 
applications behind a firewall.

 

A couple of proposals for utilizing YARN AM RPC port I have are, (Note that you 
cannot simply pre-configure the port number and pass it to Spark AM via 
configuration because of potential port conflicts on the YARN node)

 

· Start-up an empty Jetty server during Spark AM initialization, set 
the port number when registering AM with RM, and pass a reference to the Jetty 
server into the Spark application (e.g. through SparkContext) for the 
application to dynamically add servlet/resources to the Jetty server.

· Have an optional static method in the main class (e.g. 
initializeRpcPort()) which optionally sets up a RPC server and returns the RPC 
port. Spark AM can call this method, register the port number to RM and 
continue on with invoking the main method. I don’t see this making a good API, 
though.

 

I’m curious to hear what other people think. Would this be useful for anyone? 
What do you think about the proposals? Please feel free to suggest other ideas. 
Thanks!

 

Mingyu



smime.p7s
Description: S/MIME cryptographic signature


Re: Spark 1.6.1

2016-02-02 Thread Mingyu Kim
Cool, thanks!

Mingyu

From:  Michael Armbrust <mich...@databricks.com>
Date:  Tuesday, February 2, 2016 at 10:48 AM
To:  Mingyu Kim <m...@palantir.com>
Cc:  Romi Kuntsman <r...@totango.com>, Hamel Kothari
<hamelkoth...@gmail.com>, Ted Yu <yuzhih...@gmail.com>,
"dev@spark.apache.org" <dev@spark.apache.org>, Punya Biswal
<pbis...@palantir.com>, Robert Kruszewski <robe...@palantir.com>
Subject:  Re: Spark 1.6.1

I'm waiting for a few last fixes to be merged.  Hoping to cut an RC in the
next few days.

On Tue, Feb 2, 2016 at 10:43 AM, Mingyu Kim <m...@palantir.com> wrote:
> Hi all,
> 
> Is there an estimated timeline for 1.6.1 release? Just wanted to check how the
> release is coming along. Thanks!
> 
> Mingyu
> 
> From: Romi Kuntsman <r...@totango.com>
> Date: Tuesday, February 2, 2016 at 3:16 AM
> To: Michael Armbrust <mich...@databricks.com>
> Cc: Hamel Kothari <hamelkoth...@gmail.com>, Ted Yu <yuzhih...@gmail.com>,
> "dev@spark.apache.org" <dev@spark.apache.org>
> Subject: Re: Spark 1.6.1
> 
> Hi Michael,
> What about the memory leak bug?
> https://issues.apache.org/jira/browse/SPARK-11293
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_b
> rowse_SPARK-2D11293=CwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=e
> nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6fg
> OSdJ7rt2Wo=R_B4rDig-0VPE5Q4YeLEs2HUIg-A8St1OtDjD89d_zY=>
> Even after the memory rewrite in 1.6.0, it still happens in some cases.
> Will it be fixed for 1.6.1?
> Thanks,
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.totango.com_=CwMFaQ
> =izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcY
> QoV8giPASqXB84=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6fgOSdJ7rt2Wo=Z4TgGF0h7oetD4O
> 6u_3qjrYbe0ZtW2g_In7V8tkByPg=>
> 
> On Mon, Feb 1, 2016 at 9:59 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>> We typically do not allow changes to the classpath in maintenance releases.
>> 
>> On Mon, Feb 1, 2016 at 8:16 AM, Hamel Kothari <hamelkoth...@gmail.com> wrote:
>>> I noticed that the Jackson dependency was bumped to 2.5 in master for
>>> something spark-streaming related. Is there any reason that this upgrade
>>> can't be included with 1.6.1?
>>> 
>>> According to later comments on this thread:
>>> https://issues.apache.org/jira/browse/SPARK-8332
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira
>>> _browse_SPARK-2D8332=CwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&
>>> r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tI8Pjfii7XuX3Suiky8mImD7S5Bo
>>> Aq6fgOSdJ7rt2Wo=i-ngQFHfxOmgkYx_5NiCaHdIlm7zi2LYpUxm9I3RfR4=>  and my
>>> personal experience using with Spark with Jackson 2.5 hasn't caused any
>>> issues but it does have some useful new features. It should be fully
>>> backwards compatible according to the Jackson folks.
>>> 
>>> On Mon, Feb 1, 2016 at 10:29 AM Ted Yu <yuzhih...@gmail.com> wrote:
>>>> SPARK-12624 has been resolved.
>>>> According to Wenchen, SPARK-12783 is fixed in 1.6.0 release.
>>>> 
>>>> Are there other blockers for Spark 1.6.1 ?
>>>> 
>>>> Thanks
>>>> 
>>>> On Wed, Jan 13, 2016 at 5:39 PM, Michael Armbrust <mich...@databricks.com>
>>>> wrote:
>>>>> Hey All, 
>>>>> 
>>>>> While I'm not aware of any critical issues with 1.6.0, there are several
>>>>> corner cases that users are hitting with the Dataset API that are fixed in
>>>>> branch-1.6.  As such I'm considering a 1.6.1 release.
>>>>> 
>>>>> At the moment there are only two critical issues targeted for 1.6.1:
>>>>>  - SPARK-12624 - When schema is specified, we should treat undeclared
>>>>> fields as null (in Python)
>>>>>  - SPARK-12783 - Dataset map serialization error
>>>>> 
>>>>> When these are resolved I'll likely begin the release process.  If there
>>>>> are any other issues that we should wait for please contact me.
>>>>> 
>>>>> Michael
>>>> 
>> 
> 





smime.p7s
Description: S/MIME cryptographic signature


Re: Spark 1.6.1

2016-02-02 Thread Mingyu Kim
Hi all,

Is there an estimated timeline for 1.6.1 release? Just wanted to check how
the release is coming along. Thanks!

Mingyu

From:  Romi Kuntsman 
Date:  Tuesday, February 2, 2016 at 3:16 AM
To:  Michael Armbrust 
Cc:  Hamel Kothari , Ted Yu ,
"dev@spark.apache.org" 
Subject:  Re: Spark 1.6.1

Hi Michael,
What about the memory leak bug?
https://issues.apache.org/jira/browse/SPARK-11293

Even after the memory rewrite in 1.6.0, it still happens in some cases.
Will it be fixed for 1.6.1?
Thanks,

Romi Kuntsman, Big Data Engineer
http://www.totango.com


On Mon, Feb 1, 2016 at 9:59 PM, Michael Armbrust 
wrote:
> We typically do not allow changes to the classpath in maintenance releases.
> 
> On Mon, Feb 1, 2016 at 8:16 AM, Hamel Kothari  wrote:
>> I noticed that the Jackson dependency was bumped to 2.5 in master for
>> something spark-streaming related. Is there any reason that this upgrade
>> can't be included with 1.6.1?
>> 
>> According to later comments on this thread:
>> https://issues.apache.org/jira/browse/SPARK-8332
>> > browse_SPARK-2D8332=CwMFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=
>> ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tI8Pjfii7XuX3Suiky8mImD7S5BoAq6
>> fgOSdJ7rt2Wo=i-ngQFHfxOmgkYx_5NiCaHdIlm7zi2LYpUxm9I3RfR4=>  and my
>> personal experience using with Spark with Jackson 2.5 hasn't caused any
>> issues but it does have some useful new features. It should be fully
>> backwards compatible according to the Jackson folks.
>> 
>> On Mon, Feb 1, 2016 at 10:29 AM Ted Yu  wrote:
>>> SPARK-12624 has been resolved.
>>> According to Wenchen, SPARK-12783 is fixed in 1.6.0 release.
>>> 
>>> Are there other blockers for Spark 1.6.1 ?
>>> 
>>> Thanks
>>> 
>>> On Wed, Jan 13, 2016 at 5:39 PM, Michael Armbrust 
>>> wrote:
 Hey All, 
 
 While I'm not aware of any critical issues with 1.6.0, there are several
 corner cases that users are hitting with the Dataset API that are fixed in
 branch-1.6.  As such I'm considering a 1.6.1 release.
 
 At the moment there are only two critical issues targeted for 1.6.1:
  - SPARK-12624 - When schema is specified, we should treat undeclared
 fields as null (in Python)
  - SPARK-12783 - Dataset map serialization error
 
 When these are resolved I'll likely begin the release process.  If there
 are any other issues that we should wait for please contact me.
 
 Michael
>>> 
> 





smime.p7s
Description: S/MIME cryptographic signature


Re: And.eval short circuiting

2015-09-18 Thread Mingyu Kim
That sounds good. I think the optimizer should not change the behavior of 
execution and reordering the filters can easily result in errors as exemplified 
below. I agree that the optimizer should not reorder the filters for 
correctness. Please correct me if I have an incorrect assumption about the 
guarantees of the optimizer.

Is there a bug filed that tracks the change you suggested below, btw? I’d like 
to follow the issue, if there’s one.

Thanks,
Mingyu

From:  Reynold Xin
Date:  Wednesday, September 16, 2015 at 1:17 PM
To:  Zack Sampson
Cc:  "dev@spark.apache.org", Mingyu Kim, Peter Faiman, Matt Cheah, Michael 
Armbrust
Subject:  Re: And.eval short circuiting

This is "expected" in the sense that DataFrame operations can get re-ordered 
under the hood by the optimizer. For example, if the optimizer deems it is 
cheaper to apply the 2nd filter first, it might re-arrange the filters. In 
reality, it doesn't do that. I think this is too confusing and violates 
principle of least astonishment, so we should fix it. 

I discussed more with Michael offline, and think we can add a rule for the 
physical filter operator to replace the general AND/OR/equality/etc with a 
special version that treats null as false. This rule needs to be carefully 
written because it should only apply to subtrees of AND/OR/equality/etc (e.g. 
it shouldn't rewrite children of isnull).


On Tue, Sep 15, 2015 at 1:09 PM, Zack Sampson <zsamp...@palantir.com> wrote:
I see. We're having problems with code like this (forgive my noob scala):
val df = Seq(("moose","ice"), (null,"fire")).toDF("animals", "elements")
df
  .filter($"animals".rlike(".*"))
  .filter(callUDF({(value: String) => value.length > 2}, BooleanType, 
$"animals"))
.collect()
This code throws a NPE because:
* Catalyst combines the filters with an AND
* the first filter passes returns null on the first input
* the second filter tries to read the length of that null

This feels weird. Reading that code, I wouldn't expect null to be passed to the 
second filter. Even weirder is that if you call collect() after the first 
filter you won't see nulls, and if you write the data to disk and reread it, 
the NPE won't happen.

It's bewildering! Is this the intended behavior?
From: Reynold Xin [r...@databricks.com]
Sent: Monday, September 14, 2015 10:14 PM
To: Zack Sampson
Cc: dev@spark.apache.org
Subject: Re: And.eval short circuiting

rxin=# select null and true;
 ?column? 
--
 
(1 row)

rxin=# select null and false;
 ?column? 
--
 f
(1 row)


null and false should return false.


On Mon, Sep 14, 2015 at 9:12 PM, Zack Sampson <zsamp...@palantir.com> wrote:
It seems like And.eval can avoid calculating right.eval if left.eval returns 
null. Is there a reason it's written like it is? 

override def eval(input: Row): Any = {
  val l = left.eval(input)
  if (l == false) {
false
  } else {
val r = right.eval(input)
if (r == false) {
  false
} else {
  if (l != null && r != null) {
true
  } else {
null
  }
}
  }
}





smime.p7s
Description: S/MIME cryptographic signature


Re: And.eval short circuiting

2015-09-18 Thread Mingyu Kim
I filed SPARK-10703. Thanks!

Mingyu

From:  Reynold Xin
Date:  Thursday, September 17, 2015 at 11:22 PM
To:  Mingyu Kim
Cc:  Zack Sampson, "dev@spark.apache.org", Peter Faiman, Matt Cheah, Michael 
Armbrust
Subject:  Re: And.eval short circuiting

Please file a ticket and cc me. Thanks. 


On Thu, Sep 17, 2015 at 11:20 PM, Mingyu Kim <m...@palantir.com> wrote:
That sounds good. I think the optimizer should not change the behavior of 
execution and reordering the filters can easily result in errors as exemplified 
below. I agree that the optimizer should not reorder the filters for 
correctness. Please correct me if I have an incorrect assumption about the 
guarantees of the optimizer.

Is there a bug filed that tracks the change you suggested below, btw? I’d like 
to follow the issue, if there’s one.

Thanks,
Mingyu

From: Reynold Xin
Date: Wednesday, September 16, 2015 at 1:17 PM
To: Zack Sampson
Cc: "dev@spark.apache.org", Mingyu Kim, Peter Faiman, Matt Cheah, Michael 
Armbrust 

Subject: Re: And.eval short circuiting

This is "expected" in the sense that DataFrame operations can get re-ordered 
under the hood by the optimizer. For example, if the optimizer deems it is 
cheaper to apply the 2nd filter first, it might re-arrange the filters. In 
reality, it doesn't do that. I think this is too confusing and violates 
principle of least astonishment, so we should fix it. 

I discussed more with Michael offline, and think we can add a rule for the 
physical filter operator to replace the general AND/OR/equality/etc with a 
special version that treats null as false. This rule needs to be carefully 
written because it should only apply to subtrees of AND/OR/equality/etc (e.g. 
it shouldn't rewrite children of isnull).


On Tue, Sep 15, 2015 at 1:09 PM, Zack Sampson <zsamp...@palantir.com> wrote:
I see. We're having problems with code like this (forgive my noob scala):
val df = Seq(("moose","ice"), (null,"fire")).toDF("animals", "elements")
df
  .filter($"animals".rlike(".*"))
  .filter(callUDF({(value: String) => value.length > 2}, BooleanType, 
$"animals"))
.collect()
This code throws a NPE because:
* Catalyst combines the filters with an AND
* the first filter passes returns null on the first input
* the second filter tries to read the length of that null

This feels weird. Reading that code, I wouldn't expect null to be passed to the 
second filter. Even weirder is that if you call collect() after the first 
filter you won't see nulls, and if you write the data to disk and reread it, 
the NPE won't happen.

It's bewildering! Is this the intended behavior?
From: Reynold Xin [r...@databricks.com]
Sent: Monday, September 14, 2015 10:14 PM
To: Zack Sampson
Cc: dev@spark.apache.org
Subject: Re: And.eval short circuiting

rxin=# select null and true;
 ?column? 
--
 
(1 row)

rxin=# select null and false;
 ?column? 
--
 f
(1 row)


null and false should return false.


On Mon, Sep 14, 2015 at 9:12 PM, Zack Sampson <zsamp...@palantir.com> wrote:
It seems like And.eval can avoid calculating right.eval if left.eval returns 
null. Is there a reason it's written like it is? 

override def eval(input: Row): Any = {
  val l = left.eval(input)
  if (l == false) {
false
  } else {
val r = right.eval(input)
if (r == false) {
  false
} else {
  if (l != null && r != null) {
true
  } else {
null
  }
}
  }
}






smime.p7s
Description: S/MIME cryptographic signature


Re: [discuss] new Java friendly InputSource API

2015-04-24 Thread Mingyu Kim
I see. So, the difference is that the InputSource is instantiated on the driver 
side and gets sent to the executors, whereas Hadoop’s InputFormats are 
instantiated via reflection on the executors. That makes sense. Thanks for the 
clarification!

Mingyu

From: Reynold Xin r...@databricks.commailto:r...@databricks.com
Date: Thursday, April 23, 2015 at 11:09 AM
To: Mingyu Kim m...@palantir.commailto:m...@palantir.com
Cc: Soren Macbeth so...@yieldbot.commailto:so...@yieldbot.com, Punyashloka 
Biswal punya.bis...@gmail.commailto:punya.bis...@gmail.com, 
dev@spark.apache.orgmailto:dev@spark.apache.org 
dev@spark.apache.orgmailto:dev@spark.apache.org
Subject: Re: [discuss] new Java friendly InputSource API

In the ctor of InputSource (I'm also considering adding an explicit initialize 
call), the implementation of InputSource can execute arbitrary code. The state 
in it will also be serialized and passed onto the executors.

Yes - technically you can hijack getSplits in Hadoop InputFormat to do the same 
thing, and then put a reference of the state into every Split. But that's kind 
of awkward. Hadoop relies on the giant Configuration object to pass state over.



On Thu, Apr 23, 2015 at 11:02 AM, Mingyu Kim 
m...@palantir.commailto:m...@palantir.com wrote:
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, Soren Macbeth 
so...@yieldbot.commailto:so...@yieldbot.com wrote:

I'm also super interested in this. Flambo (our clojure DSL) wraps the java
api and it would be great to have this.

On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin 
r...@databricks.commailto:r...@databricks.com wrote:

 It can reuse. That's a good point and we should document it in the API
 contract.


 On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal 
 punya.bis...@gmail.commailto:punya.bis...@gmail.com
 wrote:

  Reynold, thanks for this! At Palantir we're heavy users of the Java
APIs
  and appreciate being able to stop hacking around with fake ClassTags
:)
 
  Regarding this specific proposal, is the contract of RecordReader#get
  intended to be that it returns a fresh object each time? Or is it
allowed
  to mutate a fixed object and return a pointer to it each time?
 
  Put another way, is a caller supposed to clone the output of get() if
 they
  want to use it later?
 
  Punya
 
  On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin 
  r...@databricks.commailto:r...@databricks.com
wrote:
 
  I created a pull request last night for a new InputSource API that is
  essentially a stripped down version of the RDD API for providing data
 into
  Spark. Would be great to hear the community's feedback.
 
  Spark currently has two de facto input source API:
  1. RDD
  2. Hadoop MapReduce InputFormat
 
  Neither of the above is ideal:
 
  1. RDD: It is hard for Java developers to implement RDD, given the
  implicit
  class tags. In addition, the RDD API depends on Scala's runtime
library,
  which does not preserve binary compatibility across Scala versions.
If a
  developer chooses Java to implement an input source, it would be
great
 if
  that input source can be binary compatible in years to come.
 
  2. Hadoop InputFormat: The Hadoop InputFormat API is overly
restrictive.
  For example, it forces key-value semantics, and does not support
running
  arbitrary code on the driver side (an example of why this is useful
is
  broadcast). In addition, it is somewhat awkward to tell developers
that
 in
  order to implement an input source for Spark, they should learn the
 Hadoop
  MapReduce API first.
 
 
  My patch creates a new InputSource interface, described by:
 
  - an array of InputPartition that specifies the data partitioning
  - a RecordReader that specifies how data on each partition can be
read
 
  This interface is similar to Hadoop's InputFormat, except that there
is
 no
  explicit key/value separation.
 
 
  JIRA ticket:
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
ra_browse_SPARK-2D7025d=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
nmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=WO8We1dUoSercyCHNAKk
tWH_nMrqD5TUhek8mTSCfFss=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnce=
  Pull request:
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_pull_5603d=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=en
nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=WO8We1dUoSercyCHNAKktWH_nMrqD
5TUhek8mTSCfFss=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpge=
 
 





Re: [discuss] new Java friendly InputSource API

2015-04-23 Thread Mingyu Kim
Hi Reynold,

You mentioned that the new API allows arbitrary code to be run on the
driver side, but it¹s not very clear to me how this is different from what
Hadoop API provides. In your example of using broadcast, did you mean
broadcasting something in InputSource.getPartitions() and having
InputPartitions use the broadcast variables? Isn¹t that already possible
with Hadoop's InputFormat.getSplits()?

Thanks,
Mingyu





On 4/21/15, 4:33 PM, Soren Macbeth so...@yieldbot.com wrote:

I'm also super interested in this. Flambo (our clojure DSL) wraps the java
api and it would be great to have this.

On Tue, Apr 21, 2015 at 4:10 PM, Reynold Xin r...@databricks.com wrote:

 It can reuse. That's a good point and we should document it in the API
 contract.


 On Tue, Apr 21, 2015 at 4:06 PM, Punyashloka Biswal 
 punya.bis...@gmail.com
 wrote:

  Reynold, thanks for this! At Palantir we're heavy users of the Java
APIs
  and appreciate being able to stop hacking around with fake ClassTags
:)
 
  Regarding this specific proposal, is the contract of RecordReader#get
  intended to be that it returns a fresh object each time? Or is it
allowed
  to mutate a fixed object and return a pointer to it each time?
 
  Put another way, is a caller supposed to clone the output of get() if
 they
  want to use it later?
 
  Punya
 
  On Tue, Apr 21, 2015 at 4:35 PM Reynold Xin r...@databricks.com
wrote:
 
  I created a pull request last night for a new InputSource API that is
  essentially a stripped down version of the RDD API for providing data
 into
  Spark. Would be great to hear the community's feedback.
 
  Spark currently has two de facto input source API:
  1. RDD
  2. Hadoop MapReduce InputFormat
 
  Neither of the above is ideal:
 
  1. RDD: It is hard for Java developers to implement RDD, given the
  implicit
  class tags. In addition, the RDD API depends on Scala's runtime
library,
  which does not preserve binary compatibility across Scala versions.
If a
  developer chooses Java to implement an input source, it would be
great
 if
  that input source can be binary compatible in years to come.
 
  2. Hadoop InputFormat: The Hadoop InputFormat API is overly
restrictive.
  For example, it forces key-value semantics, and does not support
running
  arbitrary code on the driver side (an example of why this is useful
is
  broadcast). In addition, it is somewhat awkward to tell developers
that
 in
  order to implement an input source for Spark, they should learn the
 Hadoop
  MapReduce API first.
 
 
  My patch creates a new InputSource interface, described by:
 
  - an array of InputPartition that specifies the data partitioning
  - a RecordReader that specifies how data on each partition can be
read
 
  This interface is similar to Hadoop's InputFormat, except that there
is
 no
  explicit key/value separation.
 
 
  JIRA ticket: 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
ra_browse_SPARK-2D7025d=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
nmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=WO8We1dUoSercyCHNAKk
tWH_nMrqD5TUhek8mTSCfFss=xUHYpQoU3NlV__I37IUkVwf94zzgAvtIj6N6uy2vwnce=
  Pull request:
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_pull_5603d=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=en
nQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=WO8We1dUoSercyCHNAKktWH_nMrqD
5TUhek8mTSCfFss=qoAlpURPOSkRgXxtlXHChqVHjm3yiPFgERk4LwKHLpge=
 
 



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
The concern is really just the runtime overhead and memory footprint of
Java-serializing an already-serialized byte array again. We originally
noticed this when we were using RDD.toLocalIterator() which serializes the
entire 64MB partition. We worked around this issue by kryo-serializing and
snappy-compressing the partition on the executor side before returning it
back to the driver, but this operation just felt redundant.

Your explanation about reporting the time taken makes it clearer why it¹s
designed this way. Since the byte array for the serialized task result
shouldn¹t account for the majority of memory footprint anyways, I¹m okay
with leaving it as is, then.

Thanks,
Mingyu





On 3/4/15, 5:07 PM, Patrick Wendell pwend...@gmail.com wrote:

Hey Mingyu,

I think it's broken out separately so we can record the time taken to
serialize the result. Once we serializing it once, the second
serialization should be really simple since it's just wrapping
something that has already been turned into a byte buffer. Do you see
a specific issue with serializing it twice?

I think you need to have two steps if you want to record the time
taken to serialize the result, since that needs to be sent back to the
driver when the task completes.

- Patrick

On Wed, Mar 4, 2015 at 4:01 PM, Mingyu Kim m...@palantir.com wrote:
 Hi all,

 It looks like the result of task is serialized twice, once by
serializer (I.e. Java/Kryo depending on configuration) and once again by
closure serializer (I.e. Java). To link the actual code,

 The first one: 
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L213d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=cSKekTNmnB0g54h6-FaF-zOL46UZC_1_LdKK3p9Q0aAe=
 The second one: 
https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_sp
ark_blob_master_core_src_main_scala_org_apache_spark_executor_Executor.sc
ala-23L226d=AwIFAwc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJ
q47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=dw_fNvxBZ1DixNDGBTXRZBKn36QFyH-9
WMY_2Z07ulAs=PFoz0HyINd2XuiqkHPgyMsOh9eFkCwXOdl9zdxfBwxMe=

 This serializes the value, which is the result of task run twice,
which affects things like collect(), takeSample(), and
toLocalIterator(). Would it make sense to simply serialize the
DirectTaskResult once using the regular serializer (as opposed to
closure serializer)? Would it cause problems when the Accumulator values
are not Kryo-serializable?

 Alternatively, if we can assume that Accumator values are small, we can
closure-serialize those, put the serialized byte array in
DirectTaskResult with the raw task result value, and serialize
DirectTaskResult.

 What do people think?

 Thanks,
 Mingyu


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Task result is serialized twice by serializer and closure serializer

2015-03-04 Thread Mingyu Kim
Hi all,

It looks like the result of task is serialized twice, once by serializer (I.e. 
Java/Kryo depending on configuration) and once again by closure serializer 
(I.e. Java). To link the actual code,

The first one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L213
The second one: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L226

This serializes the “value”, which is the result of task run twice, which 
affects things like collect(), takeSample(), and toLocalIterator(). Would it 
make sense to simply serialize the DirectTaskResult once using the regular 
“serializer” (as opposed to closure serializer)? Would it cause problems when 
the Accumulator values are not Kryo-serializable?

Alternatively, if we can assume that Accumator values are small, we can 
closure-serialize those, put the serialized byte array in DirectTaskResult with 
the raw task result “value”, and serialize DirectTaskResult.

What do people think?

Thanks,
Mingyu


The default CDH4 build uses avro-mapred hadoop1

2015-02-20 Thread Mingyu Kim
Hi all,

Related to https://issues.apache.org/jira/browse/SPARK-3039, the default CDH4 
build, which is built with mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests 
clean package”, pulls in avro-mapred hadoop1, as opposed to avro-mapred 
hadoop2. This ends up in the same error as mentioned in the linked bug. (pasted 
below).

The right solution would be to create a hadoop-2.0 profile that sets 
avro.mapred.classifier to hadoop2, and to build CDH4 build with “-Phadoop-2.0” 
option.

What do people think?

Mingyu

——

java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
   at 
org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47)
   at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:56)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)



Re: The default CDH4 build uses avro-mapred hadoop1

2015-02-20 Thread Mingyu Kim
Thanks for the explanation.

To be clear, I meant to speak for any hadoop 2 releases before 2.2, which
have profiles in Spark. I referred to CDH4, since that¹s the only Hadoop
2.0/2.1 version Spark ships a prebuilt package for.

I understand the hesitation of making a code change if Spark doesn¹t plan
to support Hadoop 2.0/2.1 in general. (Please note, this is not specific
to CDH4) If so, can I propose alternative options until Spark moves to
only support hadoop2?

- Build the CDH4 package with ³-Davro.mapred.classifier=hadoop2², and
update http://spark.apache.org/docs/latest/building-spark.html for all
³2.0.*² examples.
- Build the CDH4 package as is, but note known issues clearly in the
³download² page.
- Simply do not ship CDH4 prebuilt package, and let people figure it out
themselves. Preferably, note in documentation that
³-Davro.mapred.classifier=hadoop2² should be used for all hadoop ³2.0.*²
builds.

Please let me know what you think!

Mingyu





On 2/20/15, 2:34 AM, Sean Owen so...@cloudera.com wrote:

True, although a number of other little issues make me, personally,
not want to continue down this road:

- There are already a lot of build profiles to try to cover Hadoop
versions
- I don't think it's quite right to have vendor-specific builds in
Spark to begin with
- We should be moving to only support Hadoop 2 soon IMHO anyway
- CDH4 is EOL in a few months I think

On Fri, Feb 20, 2015 at 8:30 AM, Mingyu Kim m...@palantir.com wrote:
 Hi all,

 Related to 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_ji
ra_browse_SPARK-2D3039d=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oO
nmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=s1MfvBlt11h2xojQItkw
aeh094ttUKTu9K5F-lA6DJYs=Sb2SVubKkvdjaLer3K-b_Z0RfeC1fm-CP4A-Uh6nvEQe=
, the default CDH4 build, which is built with mvn
-Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package², pulls in
avro-mapred hadoop1, as opposed to avro-mapred hadoop2. This ends up in
the same error as mentioned in the linked bug. (pasted below).

 The right solution would be to create a hadoop-2.0 profile that sets
avro.mapred.classifier to hadoop2, and to build CDH4 build with
³-Phadoop-2.0² option.

 What do people think?

 Mingyu

 ‹‹

 java.lang.IncompatibleClassChangeError: Found interface
org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected
at 
org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyIn
putFormat.java:47)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
at 
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at 
org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
a:615)
at java.lang.Thread.run(Thread.java:745)



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Streaming partitions to driver for use in .toLocalIterator

2015-02-18 Thread Mingyu Kim
Another alternative would be to compress the partition in memory in a
streaming fashion instead of calling .toArray on the iterator. Would it be
an easier mitigation to the problem? Or, is it hard to compress the rows
one by one without materializing the full partition in memory using the
compression algo Spark uses currently?

Mingyu





On 2/18/15, 1:01 PM, Imran Rashid iras...@cloudera.com wrote:

This would be pretty tricky to do -- the issue is that right now
sparkContext.runJob has you pass in a function from a partition to *one*
result object that gets serialized and sent back: Iterator[T] = U, and
that idea is baked pretty deep into a lot of the internals, DAGScheduler,
Task, Executors, etc.

Maybe another possibility worth considering: should we make it easy to go
from N partitions to 2N partitions (or any other multiple obviously)
without requiring a shuffle?  for that matter, you should also be able to
go from 2N to N without a shuffle as well.  That change is also somewhat
involved, though.

Both are in theory possible, but I imagine they'd need really compelling
use cases.

An alternative would be to write your RDD to some other data store (eg,
hdfs) which has better support for reading data in a streaming fashion,
though you would probably be unhappy with the overhead.



On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash and...@andrewash.com wrote:

 Hi Spark devs,

 I'm creating a streaming export functionality for RDDs and am having
some
 trouble with large partitions.  The RDD.toLocalIterator() call pulls
over a
 partition at a time to the driver, and then streams the RDD out from
that
 partition before pulling in the next one.  When you have large
partitions
 though, you can OOM the driver, especially when multiple of these
exports
 are happening in the same SparkContext.

 One idea I had was to repartition the RDD so partitions are smaller, but
 it's hard to know a priori what the partition count should be, and I'd
like
 to avoid paying the shuffle cost if possible -- I think repartition to a
 higher partition count forces a shuffle.

 Is it feasible to rework this so the executor - driver transfer in
 .toLocalIterator is a steady stream rather than a partition at a time?

 Thanks!
 Andrew



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Spark master OOMs with exception stack trace stored in JobProgressListener (SPARK-4906)

2014-12-19 Thread Mingyu Kim
Hi,

I just filed a bug 
SPARK-4906https://issues.apache.org/jira/browse/SPARK-4906, regarding Spark 
master OOMs. If I understand correctly, the UI states for all running 
applications are kept in memory retained by JobProgressListener, and when there 
are a lot of exception stack traces, this UI states can take up a significant 
amount of heap. This seems very bad especially for long-running applications.

Can you correct me if I’m misunderstanding anything? If my understanding is 
correct, is there any work being done to make sure the UI states don’t grow 
indefinitely over time? Would it make sense to spill some states to disk or 
work with what spark.eventLog is doing so Spark master doesn’t need to keep 
things in memory?

Thanks,
Mingyu


Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run against a 1.0.1 cluster

2014-08-15 Thread Mingyu Kim
Thanks for your response. I think I misinterpreted the
stability/compatibility guarantee with 1.0 release. It seems like the
compatibility is only at the API level.

This is interesting because it means any system/product that is built on top
of Spark and uses Spark with a long-running SparkContext connecting to the
cluster over network, will need to make sure it has the exact same version
of Spark jar as the cluster, even to the patch version. This would be
analogous to having to compile Spark against a very specific version of
Hadoop, as opposed to currently being able to use the Spark package with
CDH4 against most of the CDH4 Hadoop clusters.

Is it correct that Spark is focusing and prioritizing around the
spark-submit use cases than the aforementioned use cases? I just wanted to
better understand the future direction/prioritization of spark.

Thanks,
Mingyu

From:  Patrick Wendell pwend...@gmail.com
Date:  Thursday, August 14, 2014 at 6:32 PM
To:  Gary Malouf malouf.g...@gmail.com
Cc:  Mingyu Kim m...@palantir.com, dev@spark.apache.org
dev@spark.apache.org
Subject:  Re: [SPARK-3050] Spark program running with 1.0.2 jar cannot run
against a 1.0.1 cluster

I commented on the bug. For driver mode, you'll need to get the
corresponding version of spark-submit for Spark 1.0.2.


On Thu, Aug 14, 2014 at 3:43 PM, Gary Malouf malouf.g...@gmail.com wrote:
 To be clear, is it 'compiled' against 1.0.2 or it packaged with it?
 
 
 On Thu, Aug 14, 2014 at 6:39 PM, Mingyu Kim m...@palantir.com wrote:
 
  I ran a really simple code that runs with Spark 1.0.2 jar and connects to
  a Spark 1.0.1 cluster, but it fails with java.io.InvalidClassException. I
  filed the bug at https://issues.apache.org/jira/browse/SPARK-3050
 https://urldefense.proofpoint.com/v1/url?u=https://issues.apache.org/jira/br
 owse/SPARK-3050k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0Ar=UKDOcu6qL3KsoZhpOohNBR1uc
 PNmWnbd3eEJ9hVUdMk%3D%0Am=qvQ59wZwD7EuezjTuLzmNTRUamDRDnI7%2F0%2BnULtXk4k%3D
 %0As=b7abf7638a3e6fac2ddac9d8f0ca52f1a92945465abfb2e2d996a96d2301fec5 .
 
  I assumed the minor and patch releases shouldn¹t break compatibility. Is
  that correct?
 
  Thanks,
  Mingyu
 





smime.p7s
Description: S/MIME cryptographic signature


[SPARK-3050] Spark program running with 1.0.2 jar cannot run against a 1.0.1 cluster

2014-08-14 Thread Mingyu Kim
I ran a really simple code that runs with Spark 1.0.2 jar and connects to a
Spark 1.0.1 cluster, but it fails with java.io.InvalidClassException. I
filed the bug at https://issues.apache.org/jira/browse/SPARK-3050.

I assumed the minor and patch releases shouldn¹t break compatibility. Is
that correct?

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature