[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195287202
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
--- End diff --

Is it possible to merge UploadStream and RpcRequest into a class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195284967
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+return requestId;
+  }
+
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
--- End diff --

This  `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated 
twice. Move it to a separate new method .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-31 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r192279111
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

What about incorporating parameter `message` into parameter `streamData`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r191628993
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

It's not necessary to add a parameter.  Change the message parameter to 
InputStream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14658: [WIP][SPARK-5928][SPARK-6238] Remote Shuffle Blocks cann...

2017-12-26 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/14658
  
Spark 2.2 has fixed this issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...

2017-07-26 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17882


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [WIP][SPARK-20079][yarn] Re registration of AM hangs spa...

2017-07-26 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
I'm very sorry, I haven't taken the time to update this code recently.
@vanzin , thank you for your work. I'll close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14995: [Test Only][SPARK-6235][CORE]Address various 2G limits

2017-07-01 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/14995
  
I did not do much testing, but I think it can be used in the production 
environment
the url: 
https://github.com/witgo/spark/tree/SPARK-6235_Address_various_2G_limits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-19486][CORE](try 3) Investigate using multiple th...

2017-06-21 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
@jiangxb1987 ,Yes do you have any questions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...

2017-06-14 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r121972913
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend(
   }
 
   /**
-   * Reset the state of SchedulerBackend to the initial state. This is 
happened when AM is failed
-   * and re-registered itself to driver after a failure. The stale state 
in driver should be
-   * cleaned.
-   */
-  override protected def reset(): Unit = {
--- End diff --

I think `ExecutorAllocationManager#reset` is still necessary, but the 
following code should be removed
```scala
initializing = true
numExecutorsTarget = initialNumExecutors
numExecutorsToAdd = 1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...

2017-06-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r120652302
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -68,6 +68,8 @@ private[spark] abstract class YarnSchedulerBackend(
   // Flag to specify whether this schedulerBackend should be reset.
   private var shouldResetOnAmRegister = false
 
+  private var lastRequestExecutors = RequestExecutors(-1, -1, Map.empty, 
Set.empty)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [SPARK-20079][yarn] Re registration of AM hangs spark cl...

2017-06-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
@vanzin Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...

2017-06-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r120644588
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend(
   }
 
   /**
-   * Reset the state of SchedulerBackend to the initial state. This is 
happened when AM is failed
-   * and re-registered itself to driver after a failure. The stale state 
in driver should be
-   * cleaned.
-   */
-  override protected def reset(): Unit = {
--- End diff --

From the current code, we reset the state of `ExecutorAllocationManager` is 
not correct. Iy causes the `RetrieveLastRequestExecutors` message to not work 
properly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [WIP][SPARK-20079][try 2][yarn] Re registration of AM ha...

2017-06-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
@jerryshao @vanzin 
Would you take some time to review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...

2017-05-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/18008
  
@JoshRosen   I see, Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...

2017-05-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/18008
  
@JoshRosen , what's the tool in your screenshot?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][try 2][yarn] Re registration o...

2017-05-06 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17882

[WIP][SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster 
in yarn-client mode.

See #17480 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-20079_try2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17882.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17882


commit fd4c486725ae49d068e67088a185fb4cc229e21f
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-03-30T14:17:49Z

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.

commit fa27d7f6da49659c8aa6efe55a3e457e883eb17a
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-04T04:33:45Z

review commits

commit 6341d31c2961a08db2f36339f5ebe8c814eeb4c7
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-07T10:32:29Z

review commits

commit e992df93e7222d5d2bd66d9a2c19984c9b241fd5
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-23T04:56:58Z

Delete "initializing = true" in ExecutorAllocationManager.reset

commit 917cf43ffaaeb20347df3c7e480cb75ae87dca83
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-05-06T13:28:28Z

Add msg: RetrieveLastAllocatedExecutorNumber




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-22 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r112825043
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
--- End diff --

@jerryshao  @vanzin
 I think that deleting the `initializing = true` is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-04-19 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
OK, I will do the work at weekends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-04-18 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
@vanzin 
Sorry, I do not understand what you mean. Do you submit a new PR to your 
own ideas? If you can, I will close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r53390
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

OK. I've got it, thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110804557
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

The following code should have a similar function?

```scala
numExecutorsTarget = initialNumExecutors // The default value is 0
numExecutorsToAdd = 1
```
The incoming parameters of the client.requestTotalExecutors method are 
1,2,4,8,16...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110796578
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

@jerryshao  Can you explain the following  comments? I do not understand.
```scala
 if (initializing) {
  // Do not change our target while we are still initializing,
  // Otherwise the first job may have to ramp up unnecessarily
  0
} else if (maxNeeded < numExecutorsTarget) {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...

2017-04-08 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17567
  
 OK, I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...

2017-04-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17567
  
LGTM.  
Are there any performance test reports?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110361779
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled, resetting the initializing 
field may cause
+ * it to not be set to false in yarn.
--- End diff --

Currently this method will only be called in yarn-client mode when AM 
re-registers after a failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r109575470
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+if (maxNumExecutorsNeeded() == 0) {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-03-31 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
@jerryshao  Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-03-31 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
The ExecutorAllocationManager.reset method is called when re-registering 
AM, which sets the ExecutorAllocationManager.initializing field true. When this 
field is true, the Driver does not start a new executor from the AM request. 
The following two cases will cause the field to False

1. executor idle for some time.
2. There are new stages to be submitted

If the stage after the submission, AM was killed and restart, the above two 
cases will not appear.
1. When AM is killed, the yarn will kill all running containers. All 
execuotr will be lost and no executor will be idle.
2. No surviving executor, resulting in the current stage will never be 
completed, DAG will not submit a new stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: SPARK-20079: Re registration of AM hangs spark cl...

2017-03-30 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17480

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.

When there is some need of task scheduling, `ExecutorAllocationManager` 
instances do not reset the `initializing` field 

## How was this patch tested?
Unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-20079

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17480


commit b91dfeb4fea445727f6b5430aa947f35a287d56d
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-03-30T14:17:49Z

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-30 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17329


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-25 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108049460
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Sorry,I didn't get your idea. Can you write some code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-25 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108032378
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Suppose there are E Executor in the cluster, a shuffle process has M Map 
task, R reduce task, in the master branch will be created:

1. Up to M * R FileSegmentManagedBuffer instances
2. Up to 2 * M * R NoSuchElementException instances

in this PR will be created:

1. Up to M * R FileSegmentManagedBuffer instances
2. Up to 2 * NoSuchElementException instances (ExternalShuffleBlockResolver 
and IndexShuffleBlockResolver are created once executor starts and They call 
the new constructor to create a FileSegmentManagedBuffer instance)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-24 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108027203
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

This branch [SPARK-19991_try2 
](https://github.com/witgo/spark/commits/SPARK-19991_try2) needs `244.45` s in 
my test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-24 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107841105
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Yes,  But the above code does not optimize performance, 
`FileSegmentManagedBuffer.convertToNetty` method  is also called only once  in 
the master branch code . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-23 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107706851
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Like the following code?

```java
  public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
this.lazyFileDescriptor = conf.lazyFileDescriptor();
this.memoryMapBytes = conf.memoryMapBytes();
this.file = file;
this.offset = offset;
this.length = length;
  }
```

the code `conf.lazyFileDescriptor();` or `conf.memoryMapBytes();`  creates 
a NoSuchElementException instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-22 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107572297
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

That will change a lot of code, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-18 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r106781598
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Oh, do you have a better idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17329: [SPARK-19991]FileSegmentManagedBuffer performance improv...

2017-03-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17329
  
```java

public class HadoopConfigProvider extends ConfigProvider {
  private final Configuration conf;

  public HadoopConfigProvider(Configuration conf) {
this.conf = conf;
  }

  @Override
  public String get(String name) {
String value = conf.get(name);
// When do not set the value of spark.storage.memoryMapThreshold or 
spark.shuffle.io.lazyFD,
   //  When the value of `value` is null
if (value == null) {
  throw new NoSuchElementException(name);
}
return value;
  }

  @Override
  public Iterable<Map.Entry<String, String>> getAll() {
return conf;
  }

}




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-16 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17329

[SPARK-19991]FileSegmentManagedBuffer performance improvement

FileSegmentManagedBuffer performance improvement.


## What changes were proposed in this pull request?

When we do not set the value of the configuration items 
`spark.storage.memoryMapThreshold` and `spark.shuffle.io.lazyFD`, 
each call to the cFileSegmentManagedBuffer.nioByteBuffer or 
FileSegmentManagedBuffer.createInputStream method creates a 
NoSuchElementException instance. This is a more time-consuming operation.

In the use case, this PR can improve the performance of about 3.5%

The test code:

``` scala

(1 to 10).foreach { i =>
  val numPartition = 1
  val rdd = sc.parallelize(0 until 
numPartition).repartition(numPartition).flatMap { t =>
(0 until numPartition).map(r => r * numPartition + t)
  }.repartition(numPartition)
  val serializeStart = System.currentTimeMillis()
  rdd.sum()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}


```

and `spark-defaults.conf` file:

```
spark.master  yarn-client
spark.executor.instances  20
spark.driver.memory   64g
spark.executor.memory 30g
spark.executor.cores  5
spark.default.parallelism 100 
spark.sql.shuffle.partitions  100
spark.serializer  
org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize0
spark.ui.enabled  false 
spark.driver.extraJavaOptions -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M 
spark.executor.extraJavaOptions   -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking  true
spark.cleaner.referenceTracking.blocking.shuffle  true

```

The test results are as follows

| [SPARK-19991](https://github.com/witgo/spark/tree/SPARK-19991) 
|https://github.com/apache/spark/commit/68ea290b3aa89b2a539d13ea2c18bdb5a651b2bf|
|---| --- | 
|226.09 s| 235.21 s|

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-19991

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17329


commit abcfc79991ecd1d5cef2cd1e275b872695ba19d9
Author: Guoqiang Li <liguoqia...@huawei.com>
Date:   2017-03-17T03:19:37Z

FileSegmentManagedBuffer performance improvement




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-08 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
@kayousterhout The test report has been updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  

Added the multi-threaded code for serialization `TaskDescription` .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
ping @kayousterhout  @squito


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...

2017-03-04 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17116


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  

[SPARK-18890_20170303](https://github.com/witgo/spark/commits/SPARK-18890_20170303)
 `s code  is older but the test case running time is 5.2 s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
Yes, maybe a multithreaded serialization task code can have a better 
performance, let me close the PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-02 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/15505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17139: [WIP][SPARK-18890][CORE](try 3) Move task seriali...

2017-03-02 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17139

[WIP][SPARK-18890][CORE](try 3) Move task serialization from the 
TaskSetManager to the CoarseGrainedSchedulerBackend

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18890

In the case of stage has a lot of tasks, this PR can improve the scheduling 
performance of ~~15%~~

The test code:

``` scala

val rdd = sc.parallelize(0 until 100).repartition(10)
rdd.localCheckpoint().count()
rdd.sum()
(1 to 10).foreach{ i=>
  val serializeStart = System.currentTimeMillis()
  rdd.sum()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}

```

and `spark-defaults.conf` file:

```
spark.master  yarn-client
spark.executor.instances  20
spark.driver.memory   64g
spark.executor.memory 30g
spark.executor.cores  5
spark.default.parallelism 100 
spark.sql.shuffle.partitions  100
spark.serializer  
org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize0
spark.ui.enabled  false 
spark.driver.extraJavaOptions -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M 
spark.executor.extraJavaOptions   -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking  true
spark.cleaner.referenceTracking.blocking.shuffle  true

```

The test results are as follows

**The table is out of date, to be updated**

| [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | 
[941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce)
 |
| --- | --- |
| 17.116 s | 21.764 s |
## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-18890-multi-threading

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17139


commit bfa285b1bd677e0c0b8a57ceda4433bb8ae072e9
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-03-02T14:50:54Z

Move task serialization from the TaskSetManager to the 
CoarseGrainedSchedulerBackend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
Do not know which pr causes the run time of this test case to be reduced 
from 21.764 s to 9.566 s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout 
Test results have been updated:

| [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) 
|https://github.com/apache/spark/commit/db0ddce523bb823cba996e92ef36ceca31492d2c|
| --- | --- |
| 9.427 s | 9.566 s |


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-01 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout  It takes some time to update the test report.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103636619
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
--- End diff --

@kayousterhout 
@squito 
Refactoring code in https://github.com/apache/spark/pull/17116


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...

2017-03-01 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17116

[SPARK-18890][CORE](try 2) Move task serialization from the TaskSetManager 
to the CoarseGrainedSchedulerBackend

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18890

In the case of stage has a lot of tasks, this PR can improve the scheduling 
performance of ~~15%~~

The test code:

``` scala

val rdd = sc.parallelize(0 until 100).repartition(10)
rdd.localCheckpoint().count()
rdd.sum()
(1 to 10).foreach{ i=>
  val serializeStart = System.currentTimeMillis()
  rdd.sum()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}

```

and `spark-defaults.conf` file:

```
spark.master  yarn-client
spark.executor.instances  20
spark.driver.memory   64g
spark.executor.memory 30g
spark.executor.cores  5
spark.default.parallelism 100 
spark.sql.shuffle.partitions  100
spark.serializer  
org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize0
spark.ui.enabled  false 
spark.driver.extraJavaOptions -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M 
spark.executor.extraJavaOptions   -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking  true
spark.cleaner.referenceTracking.blocking.shuffle  true

```

The test results are as follows

**The table is out of date, to be updated**

| [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | 
[941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce)
 |
| --- | --- |
| 17.116 s | 21.764 s |
## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-18890

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17116


commit 84550a73b3f786b2ac569500a95ac38dc6f44657
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-08T11:18:59Z

Move task serialization from the TaskSetManager to the 
CoarseGrainedSchedulerBackend

commit 39aa22e8b95a25cd250eeafeb5ec800dfa794896
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-11T06:05:53Z

review commits

commit d562727dfd554c78ea17e590841a1e74b9b4f9aa
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-13T02:10:03Z

add test "Scheduler aborts stages that have unserializable partition"

commit fc6789e027e8ea935ad392cfca90dd318a6d9e57
Author: Imran Rashid <iras...@cloudera.com>
Date:   2017-01-13T21:42:44Z

refactor

commit 1edcf2a7e65e7c9373782824a71ec87909e88097
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-16T01:10:44Z

create all the serialized tasks to make sure they all work

commit 79dda74ab27eb9a2630921816305e489aef4f72e
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-22T03:28:04Z

review commits

commit 0b20da4c0f79d89b0689e19c6b5e3fcdf8b360fb
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-25T01:09:20Z

add lock on the scheduler object

commit b5de21f510697d42a9c7f7f255d20a41641a5122
Author: Kay Ousterhout <kayousterh...@gmail.com>
Date:   2017-02-07T00:38:48Z

Consolidate TaskDescrition constructors.

This commit also does all task serializion in the encode() method,
so now the encode() method just takes the TaskDescription as an
input parameter.

commit 819a88cb74a41c446a442ff91fb14f1093025f77
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-02-07T15:21:11Z

Refactor the taskDesc serialization code

commit 900884b82f67d2f51f73aff49b671b2dcb450264
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-02-09T16:58:15Z

Add ut: serialization task errors do not affect each other

commit 0812fc9067b9a0652de64e5c0539eaed9d8f243d
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-02-25T17:17:03Z

askWithRetry => askSync

commit 0dae93a1edd2644cb63656ae56d41adaa59cf5e3
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-02-27T10:19:13Z

fix the import ordering in TaskDescription.scala

commit 9dab121247cf8b30912f016c404279acd0

[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631341
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -164,17 +164,18 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
   index = 0,
   addedFiles = Map[String, Long](),
   addedJars = Map[String, Long](),
-  properties = new Properties,
-  serializedTask)
+  properties = new Properties)
   }
 
-  private def runTaskAndGetFailReason(taskDescription: TaskDescription): 
TaskFailedReason = {
+  private def runTaskAndGetFailReason(
+taskDescription: TaskDescription,
+serializedTask: ByteBuffer): TaskFailedReason = {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631404
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -130,7 +152,7 @@ private[spark] object TaskDescription {
 // Create a sub-buffer for the serialized task into its own buffer (to 
be deserialized later).
 val serializedTask = byteBuffer.slice()
 
-new TaskDescription(taskId, attemptNumber, executorId, name, index, 
taskFiles, taskJars,
-  properties, serializedTask)
+(new TaskDescription(taskId, attemptNumber, executorId, name, index, 
taskFiles, taskJars,
+  properties), serializedTask)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631322
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,8 +55,26 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
-
+// Task object corresponding to the TaskDescription. This is only 
defined on the master; on
+// the worker, the Task object is handled separately from the 
TaskDescription so that it can
+// deserialized after the TaskDescription is deserialized.
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631360
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
+scheduler: TaskSchedulerImpl,
+task: TaskDescription,
+abortSet: HashSet[TaskSetManager],
+maxRpcMessageSize: Long): ByteBuffer = {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631278
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631373
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -195,6 +197,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   makeOffers()
 }
 
+  // Only be used for testing.
+  case ReviveOffers =>
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631354
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ---
@@ -82,9 +88,15 @@ private[spark] class LocalEndpoint(
 
   def reviveOffers() {
 val offers = IndexedSeq(new WorkerOffer(localExecutorId, 
localExecutorHostname, freeCores))
+val abortTaskSet = new HashSet[TaskSetManager]()
 for (task <- scheduler.resourceOffers(offers).flatten) {
-  freeCores -= scheduler.CPUS_PER_TASK
-  executor.launchTask(executorBackend, task)
+  val buffer = prepareSerializedTask(scheduler, task,
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631300
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
+scheduler: TaskSchedulerImpl,
+task: TaskDescription,
+abortSet: HashSet[TaskSetManager],
+maxRpcMessageSize: Long): ByteBuffer = {
+var serializedTask: ByteBuffer = null
+if (abortSet.isEmpty || !getTaskSetManager(scheduler, 
task.taskId).exists(_.isZombie)) {
+  try {
+serializedTask = TaskDescription.encode(task)
+  } catch {
+case NonFatal(e) =>
+  abortTaskSetManager(scheduler, task.taskId,
+s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
+  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => 
abortSet.add(t))
+  }
+}
+
+if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
+  val msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
+"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+"spark.rpc.message.maxSize or using broadcast variables for large 
values."
+  abortTaskSetManager(scheduler, task.taskId,
+msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+  getTaskSetManager(scheduler, task.taskId).foreach(t => 
abortSet.add(t))
+  serializedTask = null
+} else if (serializedTask != null) {
+  emittedTaskSizeWarning(scheduler, serializedTask, task.taskId)
+}
+serializedTask
+  }
+
+  private def emittedTaskSizeWarning(
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631294
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
+scheduler: TaskSchedulerImpl,
+task: TaskDescription,
+abortSet: HashSet[TaskSetManager],
+maxRpcMessageSize: Long): ByteBuffer = {
+var serializedTask: ByteBuffer = null
+if (abortSet.isEmpty || !getTaskSetManager(scheduler, 
task.taskId).exists(_.isZombie)) {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631260
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631249
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,8 +55,26 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
-
+// Task object corresponding to the TaskDescription. This is only 
defined on the master; on
+// the worker, the Task object is handled separately from the 
TaskDescription so that it can
+// deserialized after the TaskDescription is deserialized.
+@transient private val task: Task[_] = null) extends Logging {
+
+  /**
+   * Serializes the task for this TaskDescription and returns the 
serialized task.
+   *
+   * This method should only be used on the master (to serialize a task to 
send to a worker).
--- End diff --

Done




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-01 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103631254
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-28 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103627873
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
--- End diff --

I don't know why the old code ignored the exception, just copied it here. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-28 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103627459
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
--- End diff --

OK I will think over your suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-28 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103622591
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -621,6 +615,80 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
+  } catch {
+case e: Exception => logError("Exception in error callback", e)
+  }
+}
+  }
+
+  private def isZombieTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Unit = scheduler.synchronized {
+!scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie)
+  }
+
+  private def getTaskSetManager(
+scheduler: TaskSchedulerImpl,
+taskId: Long): Option[TaskSetManager] = scheduler.synchronized {
+scheduler.taskIdToTaskSetManager.get(taskId)
+  }
+
+  private[scheduler] def prepareSerializedTask(
+scheduler: TaskSchedulerImpl,
+task: TaskDescription,
+abortSet: HashSet[TaskSetManager],
+maxRpcMessageSize: Long): ByteBuffer = {
+var serializedTask: ByteBuffer = null
+if (abortSet.isEmpty || !getTaskSetManager(scheduler, 
task.taskId).exists(_.isZombie)) {
--- End diff --

This is an optimization code, in most cases abortSet is empty and the 
`!getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)` is not called.
 I changed it into a more readable code.

see 
https://github.com/apache/spark/pull/15505/commits/2f4b3f955cad0fe7546de54737c19500b90ad67d



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-28 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103622493
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -454,33 +452,15 @@ private[spark] class TaskSetManager(
   currentLocalityIndex = getLocalityIndex(taskLocality)
   lastLaunchTime = curTime
 }
-// Serialize and return the task
-val serializedTask: ByteBuffer = try {
-  ser.serialize(task)
-} catch {
-  // If the task cannot be serialized, then there's no point to 
re-attempt the task,
-  // as it will always fail. So just abort the whole task-set.
-  case NonFatal(e) =>
-val msg = s"Failed to serialize task $taskId, not attempting 
to retry it."
-logError(msg, e)
-abort(s"$msg Exception during serialization: $e")
-throw new TaskNotSerializableException(e)
-}
-if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024 &&
-  !emittedTaskSizeWarning) {
-  emittedTaskSizeWarning = true
-  logWarning(s"Stage ${task.stageId} contains a task of very large 
size " +
-s"(${serializedTask.limit / 1024} KB). The maximum recommended 
task size is " +
-s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
-}
+
 addRunningTask(taskId)
 
 // We used to log the time it takes to serialize the task, but 
task size is already
 // a good proxy to task serialization time.
 // val timeTaken = clock.getTime() - startTime
 val taskName = s"task ${info.id} in stage ${taskSet.id}"
 logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
-  s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit} bytes)")
--- End diff --

taskLocality only exists here. How about  change 
CoarseGrainedSchedulerBackend (line 273) to 
``` scala
logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} 
hostname: " +
  s"${executorData.executorHost}, serializedTask: ${serializedTask.limit} 
bytes.")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-02-27 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-27 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r103174824
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -23,7 +23,10 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, Map}
+import scala.util.control.NonFatal
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.TaskNotSerializableException
--- End diff --

I'm sorry, I forgot that. It's done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-02-25 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout I think the latest code is ready to merge into the master 
branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistices to ...

2017-02-23 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r102891969
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala ---
@@ -39,16 +40,18 @@ private[spark] sealed trait MapStatus {
* necessary for correctness, since block fetchers are allowed to skip 
zero-size blocks.
*/
   def getSizeForBlock(reduceId: Int): Long
+
+  def numberOfOutput: Int
--- End diff --

The number of output may be greater than 2G?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-02-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
Okay, this may take some time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99848516
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
-  for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
+  val serializedTasks = tasks.flatten.map { task =>
+var serializedTask: ByteBuffer = null
+try {
+  serializedTask = TaskDescription.encode(task, 
task.serializedTask)
+  if (serializedTask.limit >= maxRpcMessageSize) {
+val msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
+  "spark.rpc.message.maxSize (%d bytes). Consider increasing " 
+
+  "spark.rpc.message.maxSize or using broadcast variables for 
large values."
+abortTaskSetManager(scheduler, task.taskId,
+  msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+serializedTask = null
+  } else if (serializedTask.limit > 
TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
+
scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning).
+  foreach { taskSetMgr =>
+taskSetMgr.emittedTaskSizeWarning = true
+val stageId = taskSetMgr.taskSet.stageId
+logWarning(s"Stage $stageId contains a task of very large 
size " +
+  s"(${serializedTask.limit / 1024} KB). The maximum 
recommended task size is " +
+  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+  }
   }
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
 }
-else {
+(task, serializedTask)
+  }
+
+  if (!serializedTasks.exists(b => b._2 eq null)) {
--- End diff --

OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16806: [WIP][SPARK-18890][CORE] Move task serialization ...

2017-02-07 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/16806


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16806: [WIP][SPARK-18890][CORE] Move task serialization ...

2017-02-04 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/16806

[WIP][SPARK-18890][CORE] Move task serialization from the TaskSetManager to 
the CoarseGrainedSchedulerBackend

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18890

In the case of stage has a lot of tasks, this PR can improve the scheduling 
performance of ~~15%~~

The test code:

``` scala

val rdd = sc.parallelize(0 until 100).repartition(10)
rdd.localCheckpoint().count()
rdd.sum()
(1 to 10).foreach{ i=>
  val serializeStart = System.currentTimeMillis()
  rdd.sum()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}

```

and `spark-defaults.conf` file:

```
spark.master  yarn-client
spark.executor.instances  20
spark.driver.memory   64g
spark.executor.memory 30g
spark.executor.cores  5
spark.default.parallelism 100 
spark.sql.shuffle.partitions  100
spark.serializer  
org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize0
spark.ui.enabled  false 
spark.driver.extraJavaOptions -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M 
spark.executor.extraJavaOptions   -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking  true
spark.cleaner.referenceTracking.blocking.shuffle  true

```

The test results are as follows

**The table is out of date, to be updated**

| [SPARK-17931](https://github.com/witgo/spark/tree/SPARK-17931) | 
[941b3f9](https://github.com/apache/spark/commit/941b3f9aca59e62c078508a934f8c2221ced96ce)
 |
| --- | --- |
| 17.116 s | 21.764 s |
## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-18890-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16806.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16806


commit ab5a763375e5b9308e55acbedfb1e7bf2cb739de
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-08T11:18:59Z

Move task serialization from the TaskSetManager to the 
CoarseGrainedSchedulerBackend

commit 292a8bcf09fce3826b658c18c5d923379346fe52
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-11T06:05:53Z

review commits

commit 469586efd4abf47a5f891a6a4b72bba83e608aaf
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-01-13T02:10:03Z

add test "Scheduler aborts stages that have unserializable partition"

commit 8f7edc6c16c25aae6fae4f6dc6fa76eca8f06fd6
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-02-04T14:07:51Z

Refactor the serialization TaskDescription code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-04 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99463382
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
--- End diff --

If we do not deal with the  problem (2), it should only lead to performance 
degradation 
when a serialization error occurred?
or add the following code

```scala

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  val abortSet = new mutable.HashSet[TaskSetManager]()
  for (task <- tasks.flatten) {
var serializedTask: ByteBuffer = null
if (abortSet.isEmpty ||
  
!scheduler.taskIdToTaskSetManager.get(task.taskId).exists(_.isZombie)) {
  try {
serializedTask = TaskDescription.encode(task)
  } catch {
case NonFatal(e) =>
  abortTaskSetManager(scheduler, task.taskId,
s"Failed to serialize task ${task.taskId}, not attempting 
to retry it.", Some(e))
  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t 
=> abortSet.add(t))
  }
}

if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
  val msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for 
large values."
  abortTaskSetManager(scheduler, task.taskId,
msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => 
abortSet.add(t))
} else if (serializedTask != null) {
  if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {

scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning).
  foreach { taskSetMgr =>
taskSetMgr.emittedTaskSizeWarning = true
val stageId = taskSetMgr.taskSet.stageId
logWarning(s"Stage $stageId contains a task of very large 
size " +
  s"(${serializedTask.limit / 1024} KB). The maximum 
recommended task size is " +
  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
  }
  }
  val executorData = executorDataMap(task.executorId)
  executorData.freeCores -= scheduler.CPUS_PER_TASK

  logDebug(s"Launching task ${task.taskId} on executor id: 
${task.executorId} " +
s" hostname: ${executorData.executorHost}.")

  executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))
}

  }
}
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-04 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99462808
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
-  for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
+  val serializedTasks = tasks.flatten.map { task =>
+var serializedTask: ByteBuffer = null
+try {
+  serializedTask = TaskDescription.encode(task, 
task.serializedTask)
+  if (serializedTask.limit >= maxRpcMessageSize) {
+val msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
+  "spark.rpc.message.maxSize (%d bytes). Consider increasing " 
+
+  "spark.rpc.message.maxSize or using broadcast variables for 
large values."
+abortTaskSetManager(scheduler, task.taskId,
+  msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+serializedTask = null
+  } else if (serializedTask.limit > 
TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
+
scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning).
+  foreach { taskSetMgr =>
+taskSetMgr.emittedTaskSizeWarning = true
+val stageId = taskSetMgr.taskSet.stageId
+logWarning(s"Stage $stageId contains a task of very large 
size " +
+  s"(${serializedTask.limit / 1024} KB). The maximum 
recommended task size is " +
+  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+  }
   }
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
 }
-else {
+(task, serializedTask)
+  }
+
+  if (!serializedTasks.exists(b => b._2 eq null)) {
--- End diff --

I will reset the relevant code.  The current  has the potential to affect 
other taskSet scheduling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-04 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99462431
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -66,7 +100,8 @@ private[spark] object TaskDescription {
 }
   }
 
-  def encode(taskDescription: TaskDescription): ByteBuffer = {
+  @throws[TaskNotSerializableException]
+  def encode(taskDescription: TaskDescription, serializedTask: 
ByteBuffer): ByteBuffer = {
--- End diff --

Oh, I'm already confused, should I remove  the commit 
https://github.com/apache/spark/pull/15505/commits/5e03e2cf9786af6d3ea555f6f61b6a60b23f1c2c?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-04 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99462387
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
-  for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
+  val serializedTasks = tasks.flatten.map { task =>
+var serializedTask: ByteBuffer = null
+try {
+  serializedTask = TaskDescription.encode(task, 
task.serializedTask)
+  if (serializedTask.limit >= maxRpcMessageSize) {
+val msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
+  "spark.rpc.message.maxSize (%d bytes). Consider increasing " 
+
+  "spark.rpc.message.maxSize or using broadcast variables for 
large values."
+abortTaskSetManager(scheduler, task.taskId,
+  msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+serializedTask = null
+  } else if (serializedTask.limit > 
TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
+
scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning).
+  foreach { taskSetMgr =>
+taskSetMgr.emittedTaskSizeWarning = true
+val stageId = taskSetMgr.taskSet.stageId
+logWarning(s"Stage $stageId contains a task of very large 
size " +
+  s"(${serializedTask.limit / 1024} KB). The maximum 
recommended task size is " +
+  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+  }
   }
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
 }
-else {
+(task, serializedTask)
+  }
+
+  if (!serializedTasks.exists(b => b._2 eq null)) {
--- End diff --

@kayousterhout 
I made a mistake,   task 2 will be re-scheduled when  
checkSpeculatableTasks is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99458252
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -51,8 +54,39 @@ private[spark] class TaskDescription(
 val index: Int,// Index within this task's TaskSet
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
-val properties: Properties,
-val serializedTask: ByteBuffer) {
+val properties: Properties) extends Logging {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int, // Index within this task's TaskSet
+  addedFiles: Map[String, Long],
+  addedJars: Map[String, Long],
+  properties: Properties,
+  task: Task[_]) {
+  this(taskId, attemptNumber, executorId, name, index,
+addedFiles, addedJars, properties)
+  task_ = task
+  }
+
+   def serializedTask: ByteBuffer = {
--- End diff --

OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99458165
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -51,8 +54,39 @@ private[spark] class TaskDescription(
 val index: Int,// Index within this task's TaskSet
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
-val properties: Properties,
-val serializedTask: ByteBuffer) {
+val properties: Properties) extends Logging {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int, // Index within this task's TaskSet
+  addedFiles: Map[String, Long],
+  addedJars: Map[String, Long],
+  properties: Properties,
+  task: Task[_]) {
+  this(taskId, attemptNumber, executorId, name, index,
+addedFiles, addedJars, properties)
+  task_ = task
--- End diff --

I think TaskDescription has two constructors that are more readable, and we 
can add a comment about the constructor with the `task: Task [_]` parameter is 
only be called in driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99458022
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -66,7 +100,8 @@ private[spark] object TaskDescription {
 }
   }
 
-  def encode(taskDescription: TaskDescription): ByteBuffer = {
+  @throws[TaskNotSerializableException]
+  def encode(taskDescription: TaskDescription, serializedTask: 
ByteBuffer): ByteBuffer = {
--- End diff --

The previous version had only one parameter, and the two parameter  in  
here is more readable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-02-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r99457658
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -243,27 +245,42 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
-  for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
+  val serializedTasks = tasks.flatten.map { task =>
+var serializedTask: ByteBuffer = null
+try {
+  serializedTask = TaskDescription.encode(task, 
task.serializedTask)
+  if (serializedTask.limit >= maxRpcMessageSize) {
+val msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
+  "spark.rpc.message.maxSize (%d bytes). Consider increasing " 
+
+  "spark.rpc.message.maxSize or using broadcast variables for 
large values."
+abortTaskSetManager(scheduler, task.taskId,
+  msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+serializedTask = null
+  } else if (serializedTask.limit > 
TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
+
scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning).
+  foreach { taskSetMgr =>
+taskSetMgr.emittedTaskSizeWarning = true
+val stageId = taskSetMgr.taskSet.stageId
+logWarning(s"Stage $stageId contains a task of very large 
size " +
+  s"(${serializedTask.limit / 1024} KB). The maximum 
recommended task size is " +
+  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
+  }
   }
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
 }
-else {
+(task, serializedTask)
+  }
+
+  if (!serializedTasks.exists(b => b._2 eq null)) {
--- End diff --

Yes, task 2 can only wait for the next makeOffers to be called


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-24 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r97695455
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -602,6 +619,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 Future.successful(false)
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+
+  // abort TaskSetManager without exception
+  private[scheduler] def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = {
+scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
--- End diff --

You said the problem But we ensure that the PR will not be able to 
introduce this issue. Although I just abstracted some code into a method only, 
did not change the order of its implementation, we insured, I still add a
 lock on the scheduler object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-01-21 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@squito
My understanding is that the TaskSchedulerImpl class contains many 
synchronized statements (synchronized the methods). If a synchronized 
statements execution time is very long, it will block other synchronized 
statements,  this causes reduced performance in the TaskSchedulerImpl instance. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r97211797
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -602,6 +619,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 Future.successful(false)
 }
 
-private[spark] object CoarseGrainedSchedulerBackend {
+private[spark] object CoarseGrainedSchedulerBackend extends Logging {
   val ENDPOINT_NAME = "CoarseGrainedScheduler"
+  // abort TaskSetManager without exception
+  def abortTaskSetManager(
+  scheduler: TaskSchedulerImpl,
+  taskId: Long,
+  msg: => String,
+  exception: Option[Throwable] = None): Unit = {
+  scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr =>
+  try {
+taskSetMgr.abort(msg, exception)
--- End diff --

`taskSetMgr.abort` is thread safety, It looks fine from the calling code.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96108161
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
--- End diff --

Yes,  it's a big issue. 
We can first verify how much performance is lost by we first have to create 
all the serialized tasks to make sure they all work.  Maybe no performance 
degradation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96106896
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
+null
 }
-else {
+
+if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
+  val msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
+"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+"spark.rpc.message.maxSize or using broadcast variables for 
large values."
+  abortTaskSetManager(scheduler, task.taskId,
+msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+} else if (serializedTask != null) {
+  if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {
--- End diff --

There are two cases when this nested if can be combined into the else if.

1. The following code to appear twice

``` scala
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} 
" +
s" hostname: ${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))

```
2. Use the return value of the if statement to avoid code duplication

```scala
  val launchTask = if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
  false
} else if (serializedTask != null &&
  serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
  true
} else {
  true
}
if (launchTask) {
  val executorData = executorDataMap(task.executorId)
  executorData.freeCores -= scheduler.CPUS_PER_TASK

  logDebug(s"Launching task ${task.taskId} on executor id: 
${task.executorId} " +
s" hostname: ${executorData.executorHost}.")

  executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))
}

```
The existing code is more concise than the above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96104686
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

@squito 
squito@389fec5  I think is appropriate. Can I merge your  commit 
(squito@389fec5) into this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95933009
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

Another implementation:

https://github.com/witgo/spark/commit/4fbf30a568ed61982e17757f9df3c35cb9d64871


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95753220
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

How about this?

``` scala
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int,// Index within this task's TaskSet
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val properties: Properties,
private var serializedTask_ : ByteBuffer) extends  Logging {

  def this(
  taskId: Long,
  attemptNumber: Int,
  executorId: String,
  name: String,
  index: Int, // Index within this task's TaskSet
  addedFiles: Map[String, Long],
  addedJars: Map[String, Long],
  properties: Properties,
  task: Task[_]) {
  this(taskId, attemptNumber, executorId, name, index,
addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer])
  task_ = task
  }

  private var task_ : Task[_] = null

  private  def serializedTask: ByteBuffer = {
if (serializedTask_ == null) {
  // This is where we serialize the task on the driver before sending 
it to the executor.
  // This is not done when creating the TaskDescription so we can 
postpone this serialization
  // to later in the scheduling process -- particularly,
  // so it can happen in another thread by the 
CoarseGrainedSchedulerBackend.
  // On the executors, this will already be populated by decode
  serializedTask_ = try {
ByteBuffer.wrap(Utils.serialize(task_))
  } catch {
case NonFatal(e) =>
  val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
  logError(msg, e)
  throw new TaskNotSerializableException(e)
  }
}
serializedTask_
  }

  def getTask[_](loader: ClassLoader): Task[_] = {
if (task_ == null) {
  task_ = Utils.deserialize(serializedTask, 
loader).asInstanceOf[Task[_]]
}
return task_
  }

  override def toString: String = "TaskDescription(TID=%d, 
index=%d)".format(taskId, index)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-11 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95732717
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -245,6 +245,16 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
 val serializedTask = TaskDescription.encode(task)
+if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {
--- End diff --

The size of serializedTask exceeds `maxRpcMessageSize` will cause RPC can 
not send it to executor,  but  the size of serializedTask exceeds 
`TaskSetManager.TASK_SIZE_TO_WARN_KB` does not cause the problem.
This code is added to keep consistent with the previous code.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-11 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95731120
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("unserializable partition") {
+val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner {
+  override def numPartitions = 1
+
+  override def getPartition(key: Any) = 1
+
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream): Unit = {
+throw new NotSerializableException()
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {}
+})
+
+// Submit a map stage by itself
+submitMapStage(shuffleDep)
+assert(failure.getMessage.startsWith(
+  "Job aborted due to stage failure: Task not serializable"))
+sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+assert(sparkListener.failedStages.contains(0))
+assert(sparkListener.failedStages.size === 1)
+assertDataStructuresEmpty()
--- End diff --

As above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-11 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95731056
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("unserializable partition") {
--- End diff --

The test "unserializable task" in 507 line only verify the task. This test 
should be retained, I will add a new test in CoarseGrainedSchedulerBackendSuite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-11 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95723533
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
--- End diff --

@kayousterhout  
I need some time to write the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95492638
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int, // Index within this task's TaskSet
+  addedFiles: Map[String, Long],
+  addedJars: Map[String, Long],
+  properties: Properties,
+  task: Task[_]) {
+this(taskId, attemptNumber, executorId, name, index,
+  addedFiles, addedJars, properties, null, task)
+  }
+
+  lazy val serializedTask: ByteBuffer = {
+if (serializedTask_ == null) {
+  serializedTask_ = try {
+ByteBuffer.wrap(Utils.serialize(task_))
+  } catch {
+case NonFatal(e) =>
+  val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
+  logError(msg, e)
+  throw new TaskNotSerializableException(e)
+  }
+}
+serializedTask_
--- End diff --

Ok, I agree with your, I will modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95305125
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index 
=== 1)
   }
 
-  test("do not emit warning when serialized task is small") {
--- End diff --

Ok, I will add this test case back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-01-09 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@squito In the local mode, the performance is relatively less important, we 
only guarantee that there will be no performance degradation on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   >