[jira] [Commented] (SPARK-21172) EOFException reached end of stream in UnsafeRowSerializer

2020-05-24 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17115634#comment-17115634
 ] 

liupengcheng commented on SPARK-21172:
--

[~fanyunbojerry] I think you can check your remote disk or the `dmesg` logs to 
see if there are some disk errors. Have you tried the latest spark version?

> EOFException reached end of stream in UnsafeRowSerializer
> -
>
> Key: SPARK-21172
> URL: https://issues.apache.org/jira/browse/SPARK-21172
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.0.1
>Reporter: liupengcheng
>Priority: Major
>  Labels: shuffle
>
> Spark sql job failed because of the following Exception. Seems like a bug in 
> shuffle stage. 
> Shuffle read size for single task is tens of GB
> {code}
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:264)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: reached end of stream after reading 9034374 
> bytes; 1684891936 bytes expected
>   at 
> org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:255)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:253)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:253)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:259)
>   ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31202) Improve SizeEstimator for AppendOnlyMap

2020-03-20 Thread liupengcheng (Jira)
liupengcheng created SPARK-31202:


 Summary: Improve SizeEstimator for AppendOnlyMap
 Key: SPARK-31202
 URL: https://issues.apache.org/jira/browse/SPARK-31202
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.2, 3.0.0
Reporter: liupengcheng


Currently, spark's memory management depends on the size estimation for 
execution and storage. 
In our real cluster, users always meet the issue OOM due to the inaccurate size 
estimation for ` AppendOnlyMap`, that's because spark stores KV in an 
Array[AnyRef] in `AppendOnlyMap` for memory locality,  and this value can be 
CompactBuffer[_] or Array[CompactBuffer[_]] for transformation like 
cogroup/join/groupBy, but current `SizeEstimator` will still treat this special 
array as an normal array, so in many cases, we noticed a great bias between the 
estimated size and the acutal memory consuption. 
So we improved this in xiaomi:
1. Improve the estimation for AppendOnlyMap when the value type is CompactBuffer
2. Respect jvm gc stats to decide whether to spilling when doing sort/agg

In this jira, I propose to solve the first part which is improving the 
estimation for `AppendOnlyMap` when the value type is CompactBuffer



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31107) Extend FairScheduler to support pool level resource isolation

2020-03-10 Thread liupengcheng (Jira)
liupengcheng created SPARK-31107:


 Summary: Extend FairScheduler to support pool level resource 
isolation
 Key: SPARK-31107
 URL: https://issues.apache.org/jira/browse/SPARK-31107
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: liupengcheng


Currently, spark only provided two types of scheduler: FIFO & FAIR, but in sql 
high-concurrency scenarios, a few of drawbacks are exposed.

FIFO: it can easily causing congestion when large sql query occupies all the 
resources

FAIR: the taskSets of one pool may occupies all the resource due to there are 
no hard limit on the maximum usage for each pool.  this case may be frequently 
met under high workloads.

So we propose to add a maxShare argument for FairScheduler to control the 
maximum running tasks for each pool.

One thing that needs our attention is that we should handle it well to make the 
`ExecutorAllocationManager` can release resources:
 e.g. Suppose we got 100 executors, if the tasks are scheduled on all executors 
with max concurrency 50, there are cases that the executors may not idle, and 
can not be released.

One idea is to bind those executors to each pool, then we only schedule tasks 
on executors of the pool which it belongs to.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31105) Respect sql execution id when scheduling taskSets

2020-03-10 Thread liupengcheng (Jira)
liupengcheng created SPARK-31105:


 Summary: Respect sql execution id when scheduling taskSets
 Key: SPARK-31105
 URL: https://issues.apache.org/jira/browse/SPARK-31105
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: liupengcheng


Currently, spark will sort taskSets by jobId and stageId and then schedule them 
in order for FIFO schedulingMode. In OLAP senerios, especially under high 
concurrency, the taskSets are always from different sql queries and several 
jobs can be submitted for execution at one time  for one query for adaptive 
execution. But now we order those taskSets without considering the execution 
group, which may causes the query being delayed.
So I propose to consider the sql execution id when scheduling jobs.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Issue Type: Bug  (was: Improvement)

> Application failed due to failed to get MapStatuses broadcast
> -
>
> Key: SPARK-30849
> URL: https://issues.apache.org/jira/browse/SPARK-30849
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered an issue in Spark2.1. The exception is as follows:
> {noformat}
>   Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>   at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>   at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> {noformat}
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> {noformat}
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:
> !https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
> !https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Description: 
Currently, we encountered an issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:

{noformat}
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save
{noformat}


When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!







  was:
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at 

[jira] [Comment Edited] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038197#comment-17038197
 ] 

liupengcheng edited comment on SPARK-30849 at 2/17/20 9:42 AM:
---

I find an issue related to this 
[SPARK-5594|https://issues.apache.org/jira/browse/SPARK-5594]


was (Author: liupengcheng):
I find a issue related to this 
[SPARK-5594|https://issues.apache.org/jira/browse/SPARK-5594]

> Application failed due to failed to get MapStatuses broadcast
> -
>
> Key: SPARK-30849
> URL: https://issues.apache.org/jira/browse/SPARK-30849
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered a issue in Spark2.1. The exception is as follows:
> {noformat}
>   Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>   at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>   at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> {noformat}
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> {noformat}
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:
> !https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
> !https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038197#comment-17038197
 ] 

liupengcheng commented on SPARK-30849:
--

I find a issue related to this 
[SPARK-5594|https://issues.apache.org/jira/browse/SPARK-5594]

> Application failed due to failed to get MapStatuses broadcast
> -
>
> Key: SPARK-30849
> URL: https://issues.apache.org/jira/browse/SPARK-30849
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered a issue in Spark2.1. The exception is as follows:
> {noformat}
>   Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>   at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>   at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> {noformat}
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> {noformat}
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:
> !https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
> !https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Description: 
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:

{noformat}
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save
{noformat}


When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!







  was:
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at 

[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Description: 
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!https://issues.apache.org/jira/secure/attachment/12993651/image-2020-02-16-11-13-18-195.png!







  was:
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at 

[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Description: 
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:
!https://issues.apache.org/jira/secure/attachment/12993652/image-2020-02-16-11-17-32-103.png!
!!







  was:
Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by 

[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Attachment: image-2020-02-16-11-17-32-103.png

> Application failed due to failed to get MapStatuses broadcast
> -
>
> Key: SPARK-30849
> URL: https://issues.apache.org/jira/browse/SPARK-30849
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered a issue in Spark2.1. The exception is as follows:
> {noformat}
>   Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>   at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>   at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30849:
-
Attachment: image-2020-02-16-11-13-18-195.png

> Application failed due to failed to get MapStatuses broadcast
> -
>
> Key: SPARK-30849
> URL: https://issues.apache.org/jira/browse/SPARK-30849
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: image-2020-02-16-11-13-18-195.png, 
> image-2020-02-16-11-17-32-103.png
>
>
> Currently, we encountered a issue in Spark2.1. The exception is as follows:
> {noformat}
>   Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
> most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
> 8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
> java.io.IOException: org.apache.spark.SparkException: Failed to get 
> broadcast_9_piece1 of broadcast_9
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
>   at 
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at 
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
>   at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>   at 
> org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
>   at 
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
>   at 
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
>   at 
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
>   at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
> {noformat}
> I looked into the code and the logs, it seems that it's caused by the 
> mapStatuses broadcast id is sent to executor, but was invalidated immediately 
> by the driver before the real fetching of the broadcast.
> This can be described as follows:
> Let's say we have an rdd1,
> rdd2 = rdd1.repartition(100) // stage 0
> rdd3 = rdd2.map(xxx)  // stage 1
> rdd4 = rdd2.map(xxx)  // stage 2
> // and then do some join and output result
> rdd3.join(rdd4).save
> When FetchFailedException happened in stage 1, then stage 0 and stage 1 will 
> be resubmitted and re-executed, but stage 2 is still running, it's task will 
> fetch mapStatuses from driver, but the mapStatuses cache will be invalidated 
> when tasks of stage 0.1 completes and registerMapOutput.
> I checked the master branch, seems that we are fixed correctness issues on 
> `repartition`, but I think this issue may still exist? 
> Some ScreenShot:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

2020-02-17 Thread liupengcheng (Jira)
liupengcheng created SPARK-30849:


 Summary: Application failed due to failed to get MapStatuses 
broadcast
 Key: SPARK-30849
 URL: https://issues.apache.org/jira/browse/SPARK-30849
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.1.0
Reporter: liupengcheng
 Attachments: image-2020-02-16-11-13-18-195.png, 
image-2020-02-16-11-17-32-103.png

Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, 
most recent failure: Lost task 18.3 in stage 2.0 (TID 13819,  , executor 
8): java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at 
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at 
org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at 
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at 
org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the 
mapStatuses broadcast id is sent to executor, but was invalidated immediately 
by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be 
resubmitted and re-executed, but stage 2 is still running, it's task will fetch 
mapStatuses from driver, but the mapStatuses cache will be invalidated when 
tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on 
`repartition`, but I think this issue may still exist? 

Some ScreenShot:










--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30712) Estimate sizeInBytes from file metadata for parquet files

2020-02-06 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032076#comment-17032076
 ] 

liupengcheng commented on SPARK-30712:
--

[~hyukjin.kwon] SPARK-24914 seems already closed, I left some comments.

I also create another related Jira: 

[SPARK-30394|https://issues.apache.org/jira/browse/SPARK-30394]

> Estimate sizeInBytes from file metadata for parquet files
> -
>
> Key: SPARK-30712
> URL: https://issues.apache.org/jira/browse/SPARK-30712
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark will use a compressionFactor when calculating `sizeInBytes` 
> for `HadoopFsRelation`, but this is not accurate and it's hard to choose the 
> best `compressionFactor`. Sometimes, this can causing OOMs due to improper 
> BroadcastHashJoin.
> So I propose to use the rowCount in the BlockMetadata to estimate the size in 
> memory, which can be more accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30394) Skip collecting stats in DetermineTableStats rule when hive table is convertible to datasource tables

2020-02-06 Thread liupengcheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-30394:
-
Description: 
Currently, if `spark.sql.statistics.fallBackToHdfs` is enabled, then spark will 
scan hdfs files to collect table stats in `DetermineTableStats` rule. But this 
can be expensive and not accurate(only file size on disk, not accounting 
compression factor), acutually we can skip this if this hive table can be 
converted to datasource table(parquet etc.), and do better estimation in 
`HadoopFsRelation`.

BeforeSPARK-28573, the implementation will update the CatalogTableStatistics, 
which will cause the improper stats(for parquet, this size is greatly smaller 
than real size in memory) be used in joinSelection when the hive table can be 
convert to datasource table.

In our production environment, user's highly compressed parquet table can cause 
OOMs when doing `broadcastHashJoin` due to this improper stats.

  was:
Currently, if `spark.sql.statistics.fallBackToHdfs` is enabled, then spark will 
scan hdfs files to collect table stats in `DetermineTableStats` rule. But this 
can be expensive in some cases, acutually we can skip this if this hive table 
can be converted to datasource table(parquet etc.).

Before[SPARK-28573|https://issues.apache.org/jira/browse/SPARK-28573], the 
implementaion will update the CatalogTableStatistics, which will cause the 
improper stats be used in joinSelection when the hive table can be convert to 
datasource table.

In our production environment, user's highly compressed parquet table can cause 
OOMs when doing `broadcastHashJoin` due to this improper stats.


> Skip collecting stats in DetermineTableStats rule when hive table is 
> convertible to  datasource tables
> --
>
> Key: SPARK-30394
> URL: https://issues.apache.org/jira/browse/SPARK-30394
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.2, 3.0.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, if `spark.sql.statistics.fallBackToHdfs` is enabled, then spark 
> will scan hdfs files to collect table stats in `DetermineTableStats` rule. 
> But this can be expensive and not accurate(only file size on disk, not 
> accounting compression factor), acutually we can skip this if this hive table 
> can be converted to datasource table(parquet etc.), and do better estimation 
> in `HadoopFsRelation`.
> BeforeSPARK-28573, the implementation will update the CatalogTableStatistics, 
> which will cause the improper stats(for parquet, this size is greatly smaller 
> than real size in memory) be used in joinSelection when the hive table can be 
> convert to datasource table.
> In our production environment, user's highly compressed parquet table can 
> cause OOMs when doing `broadcastHashJoin` due to this improper stats.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30712) Estimate sizeInBytes from file metadata for parquet files

2020-02-06 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032066#comment-17032066
 ] 

liupengcheng commented on SPARK-30712:
--

OK, thanks! [~hyukjin.kwon].

> Estimate sizeInBytes from file metadata for parquet files
> -
>
> Key: SPARK-30712
> URL: https://issues.apache.org/jira/browse/SPARK-30712
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark will use a compressionFactor when calculating `sizeInBytes` 
> for `HadoopFsRelation`, but this is not accurate and it's hard to choose the 
> best `compressionFactor`. Sometimes, this can causing OOMs due to improper 
> BroadcastHashJoin.
> So I propose to use the rowCount in the BlockMetadata to estimate the size in 
> memory, which can be more accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30712) Estimate sizeInBytes from file metadata for parquet files

2020-02-06 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17031644#comment-17031644
 ] 

liupengcheng commented on SPARK-30712:
--

[~hyukjin.kwon] We use the rowCount info in metadata and the schema to infer 
the memory consumption of `UnsafeRow`s in memory. It works fine.

> Estimate sizeInBytes from file metadata for parquet files
> -
>
> Key: SPARK-30712
> URL: https://issues.apache.org/jira/browse/SPARK-30712
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark will use a compressionFactor when calculating `sizeInBytes` 
> for `HadoopFsRelation`, but this is not accurate and it's hard to choose the 
> best `compressionFactor`. Sometimes, this can causing OOMs due to improper 
> BroadcastHashJoin.
> So I propose to use the rowCount in the BlockMetadata to estimate the size in 
> memory, which can be more accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30712) Estimate sizeInBytes from file metadata for parquet files

2020-02-06 Thread liupengcheng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17031636#comment-17031636
 ] 

liupengcheng commented on SPARK-30712:
--

[~hyukjin.kwon] Yes, in our customed spark version, we use parquet metadata to 
compute this size, it's more accurate and work well for some tables.

I think  we still scan files to get the file size in `DetermineTableStats` Rule 
when `fallBackToHdfs` is true. If you worry about that we can just also add a 
config for this. 

Also, in many cases, we can make use of the summary-metadata of parquet files 
to speed up this estimation.

> Estimate sizeInBytes from file metadata for parquet files
> -
>
> Key: SPARK-30712
> URL: https://issues.apache.org/jira/browse/SPARK-30712
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark will use a compressionFactor when calculating `sizeInBytes` 
> for `HadoopFsRelation`, but this is not accurate and it's hard to choose the 
> best `compressionFactor`. Sometimes, this can causing OOMs due to improper 
> BroadcastHashJoin.
> So I propose to use the rowCount in the BlockMetadata to estimate the size in 
> memory, which can be more accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30713) Respect mapOutputSize in memory in adaptive execution

2020-02-03 Thread liupengcheng (Jira)
liupengcheng created SPARK-30713:


 Summary: Respect mapOutputSize in memory in adaptive execution
 Key: SPARK-30713
 URL: https://issues.apache.org/jira/browse/SPARK-30713
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: liupengcheng


Currently, Spark adaptive execution use the MapOutputStatistics information to 
adjust the plan dynamically, but this MapOutputSize does not respect the 
compression factor. So there are cases that the original SparkPlan is 
`SortMergeJoin`, but the Plan after adaptive adjustment was changed to 
`BroadcastHashJoin`, but this `BroadcastHashJoin` might causing OOMs due to 
inaccurate estimation.

 

Also, if the shuffle implementation is local shuffle(intel Spark-Adaptive 
execution impl), then in some cases, it will cause `Too large Frame` exception.

 

So I propose to respect the compression factor in adaptive execution, or use 
`dataSize` metrics in `ShuffleExchangeExec` in adaptive execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30712) Estimate sizeInBytes from file metadata for parquet files

2020-02-03 Thread liupengcheng (Jira)
liupengcheng created SPARK-30712:


 Summary: Estimate sizeInBytes from file metadata for parquet files
 Key: SPARK-30712
 URL: https://issues.apache.org/jira/browse/SPARK-30712
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: liupengcheng


Currently, Spark will use a compressionFactor when calculating `sizeInBytes` 
for `HadoopFsRelation`, but this is not accurate and it's hard to choose the 
best `compressionFactor`. Sometimes, this can causing OOMs due to improper 
BroadcastHashJoin.

So I propose to use the rowCount in the BlockMetadata to estimate the size in 
memory, which can be more accurate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30470) Uncache table in tempViews if needed on session closed

2020-01-09 Thread liupengcheng (Jira)
liupengcheng created SPARK-30470:


 Summary: Uncache table in tempViews if needed on session closed
 Key: SPARK-30470
 URL: https://issues.apache.org/jira/browse/SPARK-30470
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.2
Reporter: liupengcheng


Currently, Spark will not cleanup cached tables in tempViews produced by sql 
like following

`CACHE TABLE table1 as SELECT `

There are risks that the `uncache table` not called due to session closed 
unexpectedly, or user closed manually. Then these temp views will lost, and we 
can not visit them in other session, but the cached plan still exists in the 
`CacheManager`.

Moreover, the leaks may cause the failure of the subsequent query, one failure 
we encoutered in our production environment is as below:
{code:java}
Caused by: java.io.FileNotFoundException: File does not exist: 
/user//xx/data__db60e76d_91b8_42f3_909d_5c68692ecdd4Caused by: 
java.io.FileNotFoundException: File does not exist: 
/user//xx/data__db60e76d_91b8_42f3_909d_5c68692ecdd4It is possible the 
underlying files have been updated. You can explicitly invalidate the cache in 
Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the 
Dataset/DataFrame involved. at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:131)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0.scan_nextBatch_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0.processNext(Unknown
 Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
{code}
The above exception happens when user update the data of the table, but spark 
still use the old cached plan.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30394) Skip collecting stats in DetermineTableStats rule when hive table is convertible to datasource tables

2019-12-30 Thread liupengcheng (Jira)
liupengcheng created SPARK-30394:


 Summary: Skip collecting stats in DetermineTableStats rule when 
hive table is convertible to  datasource tables
 Key: SPARK-30394
 URL: https://issues.apache.org/jira/browse/SPARK-30394
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.2, 3.0.0
Reporter: liupengcheng


Currently, if `spark.sql.statistics.fallBackToHdfs` is enabled, then spark will 
scan hdfs files to collect table stats in `DetermineTableStats` rule. But this 
can be expensive in some cases, acutually we can skip this if this hive table 
can be converted to datasource table(parquet etc.).

Before[SPARK-28573|https://issues.apache.org/jira/browse/SPARK-28573], the 
implementaion will update the CatalogTableStatistics, which will cause the 
improper stats be used in joinSelection when the hive table can be convert to 
datasource table.

In our production environment, user's highly compressed parquet table can cause 
OOMs when doing `broadcastHashJoin` due to this improper stats.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30346) Improve logging when events dropped

2019-12-24 Thread liupengcheng (Jira)
liupengcheng created SPARK-30346:


 Summary: Improve logging when events dropped
 Key: SPARK-30346
 URL: https://issues.apache.org/jira/browse/SPARK-30346
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.2, 2.1.0, 3.0.0
Reporter: liupengcheng


Currently, spark will logging events dropped count info every 60s when events 
dropped, however, we notice that this not working as expected in our production 
environment.

We looked into the code, and find out that the concurrent update of  
`droppedEventsCounter` may cause the logging logic skipped and delayed a very 
long time.

 

 
{code:java}
def post(event: SparkListenerEvent): Unit = {
  if (stopped.get()) {
return
  }

  eventCount.incrementAndGet()
  if (eventQueue.offer(event)) {
return
  }

  eventCount.decrementAndGet()
  droppedEvents.inc()
  droppedEventsCounter.incrementAndGet()
  if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
  "This likely means one of the listeners is too slow and cannot keep up 
with " +
  "the rate at which tasks are being started by the scheduler.")
  }
  logTrace(s"Dropping event $event")

  val droppedCount = droppedEventsCounter.get
  if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
  // There may be multiple threads trying to decrease droppedEventsCounter.
  // Use "compareAndSet" to make sure only one thread can win.
  // And if another thread is increasing droppedEventsCounter, 
"compareAndSet" will fail and
  // then that thread will update it.
  if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedCount events from $name since " +
  s"${if (prevLastReportTimestamp == 0) "the application started" else 
s"$previous"}.")
  }
}
  }
}
{code}
What's more, I think we can improve this logic here to also logging the thread 
dump of dispatcher thread, which can do great help to debugging performance 
issues may cause the events dropped.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27802) SparkUI throws NoSuchElementException when inconsistency appears between `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s

2019-07-02 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16875945#comment-16875945
 ] 

liupengcheng edited comment on SPARK-27802 at 7/2/19 6:23 AM:
--

[~shahid] yes, but I checked master branch, I found that these logic was 
removed in 3.0.0 and replaced with some javascript scripts, so I'am not sure 
whether we can fix it only in versions prior to 3.0.0? I haven't looked into 
the code of 3.0.0, so I am not sure whether this issue still exists, that's why 
I haven't put an PR for it.

you can follow these steps to reproduce the issue:
 # set spark.ui.retainedDeadExecutors=0 and set spark.ui.retainedStages=1000
 # set spark.dynamicAllocation.enabled=true
 # run a spark app, and wait for complete, and let executors idle.
 # check the stage UI.


was (Author: liupengcheng):
[~shahid] yes, but I checked master branch, I found that these logic was 
removed in 3.0.0, so I'am not sure whether we can fix it only in 2.3? that's 
why I haven't put an PR for it.

you can follow these steps to reproduce the issue:
 # set spark.ui.retainedDeadExecutors=0 and set spark.ui.retainedStages=1000
 # set spark.dynamicAllocation.enabled=true
 # run a spark app, and wait for complete, and let executors idle.
 # check the stage UI.

> SparkUI throws NoSuchElementException when inconsistency appears between 
> `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s
> -
>
> Key: SPARK-27802
> URL: https://issues.apache.org/jira/browse/SPARK-27802
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: liupengcheng
>Priority: Major
>
> Recently, we hit this issue when testing spark2.3. It report the following 
> error messages when clicking on the stage UI link.
> We add more logs to print the executorId(here is 10) to debug, and finally 
> find out that it's caused by the inconsistency between the list of 
> `ExecutorStageSummaryWrapper` and the `ExecutorSummaryWrapper` in the 
> KVStore. The number of deadExecutors may exceeded threshold and being removed 
> from list of `ExecutorSummaryWrapper`, however, it may still be kept in the 
> list of `ExecutorStageSummaryWrapper` in the store.
> {code:java}
> HTTP ERROR 500
> Problem accessing /stages/stage/. Reason:
> Server Error
> Caused by:
> java.util.NoSuchElementException: 10
>   at 
> org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:83)
>   at 
> org.apache.spark.status.ElementTrackingStore.read(ElementTrackingStore.scala:95)
>   at 
> org.apache.spark.status.AppStatusStore.executorSummary(AppStatusStore.scala:70)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:99)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable.createExecutorTable(ExecutorTable.scala:92)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable.toNodeSeq(ExecutorTable.scala:75)
>   at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:478)
>   at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
>   at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
>   at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
>   at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
>   at 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:166)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
>   at 

[jira] [Created] (SPARK-28220) join foldable condition not pushed down when parent filter is totally pushed down

2019-07-01 Thread liupengcheng (JIRA)
liupengcheng created SPARK-28220:


 Summary: join foldable condition not pushed down when parent 
filter is totally pushed down
 Key: SPARK-28220
 URL: https://issues.apache.org/jira/browse/SPARK-28220
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.2, 3.0.0
Reporter: liupengcheng


We encountered a issue that join conditions not pushed down when we are running 
spark app on spark2.3, after carefully looking into the code and debugging, we 
found that it's because there is a bug in the rule `PushPredicateThroughJoin`:

It will try to push parent filter down though the join, however, when the 
parent filter is wholly pushed down through the join, the join will become the 
top node, and then the `transform` method will skip the join to apply the rule. 

 

Suppose we have two tables: table1 and table2:

table1: (a: string, b: string, c: string)

table2: (d: string)

sql as:

 
{code:java}
select * from table1 left join (select d, 'w1' as r from table2) on a = d and r 
= 'w2' where b = 2{code}
 

let's focus on the following optimizer rules:

PushPredicateThroughJoin

FodablePropagation

BooleanSimplification

PruneFilters

 

In the above case, on the first iteration of these rules:

PushPredicateThroughJoin -> 
{code:java}
select * from table1 where b=2 left join (select d, 'w1' as r from table2) on a 
= d and r = 'w2'
{code}
FodablePropagation ->
{code:java}
select * from table1 where b=2 left join (select d, 'w1' as r from table2) on a 
= d and 'w1' = 'w2'{code}
BooleanSimplification ->
{code:java}
select * from table1 where b=2 left join (select d, 'w1' as r from table2) on 
false{code}
PruneFilters -> No effective

 

After several iteration of these rules, the join condition will still never be 
pushed to the 

right hand of the left join. thus, in some case(e.g. Large right table), the 
`BroadcastNestedLoopJoin` may be slow or oom.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27802) SparkUI throws NoSuchElementException when inconsistency appears between `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s

2019-06-30 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16875945#comment-16875945
 ] 

liupengcheng commented on SPARK-27802:
--

[~shahid] yes, but I checked master branch, I found that these logic was 
removed in 3.0.0, so I'am not sure whether we can fix it only in 2.3? that's 
why I haven't put an PR for it.

you can follow these steps to reproduce the issue:
 # set spark.ui.retainedDeadExecutors=0 and set spark.ui.retainedStages=1000
 # set spark.dynamicAllocation.enabled=true
 # run a spark app, and wait for complete, and let executors idle.
 # check the stage UI.

> SparkUI throws NoSuchElementException when inconsistency appears between 
> `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s
> -
>
> Key: SPARK-27802
> URL: https://issues.apache.org/jira/browse/SPARK-27802
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Affects Versions: 2.3.2
>Reporter: liupengcheng
>Priority: Major
>
> Recently, we hit this issue when testing spark2.3. It report the following 
> error messages when clicking on the stage UI link.
> We add more logs to print the executorId(here is 10) to debug, and finally 
> find out that it's caused by the inconsistency between the list of 
> `ExecutorStageSummaryWrapper` and the `ExecutorSummaryWrapper` in the 
> KVStore. The number of deadExecutors may exceeded threshold and being removed 
> from list of `ExecutorSummaryWrapper`, however, it may still be kept in the 
> list of `ExecutorStageSummaryWrapper` in the store.
> {code:java}
> HTTP ERROR 500
> Problem accessing /stages/stage/. Reason:
> Server Error
> Caused by:
> java.util.NoSuchElementException: 10
>   at 
> org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:83)
>   at 
> org.apache.spark.status.ElementTrackingStore.read(ElementTrackingStore.scala:95)
>   at 
> org.apache.spark.status.AppStatusStore.executorSummary(AppStatusStore.scala:70)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:99)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:92)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable.createExecutorTable(ExecutorTable.scala:92)
>   at 
> org.apache.spark.ui.jobs.ExecutorTable.toNodeSeq(ExecutorTable.scala:75)
>   at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:478)
>   at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
>   at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
>   at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
>   at 
> org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
>   at 
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:166)
>   at 
> org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
>   at 
> org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
>   at 
> org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
>   at 
> org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>   at 
> org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
>   at 
> org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
>   at 
> org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
>   at org.spark_project.jetty.server.Server.handle(Server.java:539)
>   at 
> org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
>   at 
> 

[jira] [Updated] (SPARK-28195) CheckAnalysis not working for Command and report misleading error message

2019-06-27 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-28195:
-
Description: 
Currently, we encountered an issue when executing 
`InsertIntoDataSourceDirCommand`, and we found that it's query relied on 
non-exist table or view, but we finally got a misleading error message:
{code:java}
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'kr.objective_id
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:440)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:159)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:159)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:544)
at 
org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand.run(InsertIntoDataSourceDirCommand.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at 
org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:246)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3277)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3276)
at org.apache.spark.sql.Dataset.init(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:277)
... 11 more

{code}
After looking into the code, I found that it's because we support 
`runSQLOnFiles` feature since 2.3, and if the table does not exist and it's not 
a temporary table, then It will be treated as running directly on files.

`ResolveSQLOnFile` rule will analyze it, and return an `UnresolvedRelation` on 
resolve failure(it's actually not a sql on files, so it will fail when 
resolving). Due to Command has empty children, `CheckAnalysis` will skip check 
the `UnresolvedRelation` and finally we got the above misleading error message 
when executing this command.

I think maybe we should checkAnalysis for command's query plan? Or is there any 
consideration for not checking analysis for command?

Seems this issue still exists in master branch. 

  was:
Currently, we encountered an issue when executing 
`InsertIntoDataSourceDirCommand`, and we found that it's query relied on 
non-exist table or view, but we finally got a misleading error message:
{code:java}
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'kr.objective_id
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:440)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:159)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:159)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:544)
at 

[jira] [Created] (SPARK-28195) CheckAnalysis not working for Command and report misleading error message

2019-06-27 Thread liupengcheng (JIRA)
liupengcheng created SPARK-28195:


 Summary: CheckAnalysis not working for Command and report 
misleading error message
 Key: SPARK-28195
 URL: https://issues.apache.org/jira/browse/SPARK-28195
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.2
Reporter: liupengcheng


Currently, we encountered an issue when executing 
`InsertIntoDataSourceDirCommand`, and we found that it's query relied on 
non-exist table or view, but we finally got a misleading error message:
{code:java}
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'kr.objective_id
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:440)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:440)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:159)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:159)
at 
org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:544)
at 
org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand.run(InsertIntoDataSourceDirCommand.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at 
org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:246)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3277)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3276)
at org.apache.spark.sql.Dataset.init(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:277)
... 11 more

{code}
After looking into the code, I found that it's because we support 
`runSQLOnFiles` feature since 2.3, and if the table does not exist and it's not 
a temporary table, then It will be treated as running directly on files.

`ResolveSQLOnFile` rule will analyze it, and return an `UnresolvedRelation` on 
resolve failure(it's actually not a sql on files, so it will fail when 
resolving). Due to Command has empty children, `CheckAnalysis` will skip check 
the `UnresolvedRelation` and finally we got the above misleading error message 
when executing this command.

I think maybe we should checkAnalysis for command's query plan? Or is there any 
consideration for not checking analysis for command?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27802) SparkUI throws NoSuchElementException when inconsistency appears between `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s

2019-05-22 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27802:
-
Description: 
Recently, we hit this issue when testing spark2.3. It report the following 
error messages when clicking on the stage UI link.

We add more logs to print the executorId(here is 10) to debug, and finally find 
out that it's caused by the inconsistency between the list of 
`ExecutorStageSummaryWrapper` and the `ExecutorSummaryWrapper` in the KVStore. 
The number of deadExecutors may exceeded threshold and being removed from list 
of `ExecutorSummaryWrapper`, however, it may still be kept in the list of 
`ExecutorStageSummaryWrapper` in the store.
{code:java}
HTTP ERROR 500
Problem accessing /stages/stage/. Reason:

Server Error
Caused by:
java.util.NoSuchElementException: 10
at 
org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:83)
at 
org.apache.spark.status.ElementTrackingStore.read(ElementTrackingStore.scala:95)
at 
org.apache.spark.status.AppStatusStore.executorSummary(AppStatusStore.scala:70)
at 
org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:99)
at 
org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.ui.jobs.ExecutorTable.createExecutorTable(ExecutorTable.scala:92)
at 
org.apache.spark.ui.jobs.ExecutorTable.toNodeSeq(ExecutorTable.scala:75)
at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:478)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:166)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at 
org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at 
org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at 
org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at 
org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at 
org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at 
org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:539)
at 
org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at 
org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at 
org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at 
org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at 
org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
{code}

  was:
Recently, we hit this issue when testing spark2.3. It report the following 
error messages when click 

[jira] [Created] (SPARK-27802) SparkUI throws NoSuchElementException when inconsistency appears between `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s

2019-05-22 Thread liupengcheng (JIRA)
liupengcheng created SPARK-27802:


 Summary: SparkUI throws NoSuchElementException when inconsistency 
appears between `ExecutorStageSummaryWrapper`s and `ExecutorSummaryWrapper`s
 Key: SPARK-27802
 URL: https://issues.apache.org/jira/browse/SPARK-27802
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.3.2
Reporter: liupengcheng


Recently, we hit this issue when testing spark2.3. It report the following 
error messages when click on the stage UI link.

We add more logs to print the executorId(here is 10) to debug, and finally find 
out that it's caused by the inconsistency between the list of 
`ExecutorStageSummaryWrapper` and the `ExecutorSummaryWrapper` in the KVStore. 
The number of deadExecutors may exceeded threshold and being removed from list 
of `ExecutorSummaryWrapper`, however, it may still be kept in the list of 
`ExecutorStageSummaryWrapper` in the store.
{code:java}
HTTP ERROR 500
Problem accessing /stages/stage/. Reason:

Server Error
Caused by:
java.util.NoSuchElementException: 10
at 
org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:83)
at 
org.apache.spark.status.ElementTrackingStore.read(ElementTrackingStore.scala:95)
at 
org.apache.spark.status.AppStatusStore.executorSummary(AppStatusStore.scala:70)
at 
org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:99)
at 
org.apache.spark.ui.jobs.ExecutorTable$$anonfun$createExecutorTable$2.apply(ExecutorTable.scala:92)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.ui.jobs.ExecutorTable.createExecutorTable(ExecutorTable.scala:92)
at 
org.apache.spark.ui.jobs.ExecutorTable.toNodeSeq(ExecutorTable.scala:75)
at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:478)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:166)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at 
org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at 
org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at 
org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at 
org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at 
org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at 
org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:539)
at 
org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at 
org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at 
org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at 
org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at 
org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
 

[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-22 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27214:
-
Description: 
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

We encountered this issue in our production spark cluster, it caused lots of 
resources wasting and slowed down user's application.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5
{code:java}
maxTolerableStarvingTime = numTasksCanRun * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule

totalStarvingTime = sum(starvingTimeByTasks)

if (totalStarvingTime > maxTolerableStarvingTime)

{  upgrading locality level... }{code}
 

 

  was:
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

We encountered this issue in our production spark cluster, it caused lots of 
resources wasting and slowed down user's application.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5
{code:java}
maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule

totalStarvingTime = sum(starvingTimeByTasks)

if (totalStarvingTime > maxStarvingTimeForTasks)

{  upgrading locality level... }{code}
 

 


> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> We encountered this issue in our production spark cluster, it caused lots of 
> resources wasting and slowed down user's application.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> {code:java}
> maxTolerableStarvingTime = numTasksCanRun * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule
> totalStarvingTime = sum(starvingTimeByTasks)
> if (totalStarvingTime > maxTolerableStarvingTime)
> {  upgrading locality level... }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27214:
-
Description: 
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

We encountered this issue in our production spark cluster, it caused lots of 
resources wasting and slowed down user's application.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5
{code:java}
maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule

totalStarvingTime = sum(starvingTimeByTasks)

if (totalStarvingTime > maxStarvingTimeForTasks)

{  upgrading locality level... }{code}
 

 

  was:
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks)

{  upgrading locality level... }



```


> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> We encountered this issue in our production spark cluster, it caused lots of 
> resources wasting and slowed down user's application.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> {code:java}
> maxStarvingTimeForTasks = numPendingTasks * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule
> totalStarvingTime = sum(starvingTimeByTasks)
> if (totalStarvingTime > maxStarvingTimeForTasks)
> {  upgrading locality level... }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-27214:
-
Description: 
Currently, Spark locality wait mechanism is not friendly for large job, when 
number of tasks is large(e.g. 1+)and with a large number of executors(e.g. 
2000), executors may be launched on some nodes  where the locality is not the 
best(not the same nodes hold HDFS blocks). There are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks)

{  upgrading locality level... }



```

  was:
Currently, Spark locality wait mechanism is not friendly for large job, when 
tasks is large(e.g. 1+), there are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks) {

 upgrading locality level...

}



```


> Upgrading locality level when lots of pending tasks have been waiting more 
> than locality.wait
> -
>
> Key: SPARK-27214
> URL: https://issues.apache.org/jira/browse/SPARK-27214
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, Spark locality wait mechanism is not friendly for large job, when 
> number of tasks is large(e.g. 1+)and with a large number of 
> executors(e.g. 2000), executors may be launched on some nodes  where the 
> locality is not the best(not the same nodes hold HDFS blocks). There are 
> cases when `TaskSetManager.lastLaunchTime` is refreshed due to finished tasks 
> within `spark.locality.wait` but coming at low rate(e.g. every 
> `spark.locality.wait` seconds a task is finished), so locality level would 
> not be upgraded and lots of pending tasks will wait a long time. 
> In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
> executors may be removed by Driver due to become idle and finally slow down 
> the job.
> Actually, we can optimize this by following formula:
> Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
> probabilityOfNextLocalitySchedule=0.5
> ```
> maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
> localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
> `spark.locality.wait`
> if (numStavingTasks > maxStarvingTasks)
> {  upgrading locality level... }
> 
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27214) Upgrading locality level when lots of pending tasks have been waiting more than locality.wait

2019-03-20 Thread liupengcheng (JIRA)
liupengcheng created SPARK-27214:


 Summary: Upgrading locality level when lots of pending tasks have 
been waiting more than locality.wait
 Key: SPARK-27214
 URL: https://issues.apache.org/jira/browse/SPARK-27214
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, Spark locality wait mechanism is not friendly for large job, when 
tasks is large(e.g. 1+), there are cases when 
`TaskSetManager.lastLaunchTime` is refreshed due to finished tasks within 
`spark.locality.wait` but coming at low rate(e.g. every `spark.locality.wait` 
seconds a task is finished), so locality level would not be upgraded and lots 
of pending tasks will wait a long time. 

In this case, when `spark.dynamicAllocation.enabled=true`, then lots of 
executors may be removed by Driver due to become idle and finally slow down the 
job.

Actually, we can optimize this by following formula:

Suppose numPendingTasks=1, localityExecutionGainFactor=0.1, 
probabilityOfNextLocalitySchedule=0.5

```

maxStarvingTasks = numPendingTasks * medianOfTaskExecutionTime * 
localityExecutionGainFactor * probabilityOfNextLocalitySchedule / 
`spark.locality.wait`

if (numStavingTasks > maxStarvingTasks) {

 upgrading locality level...

}



```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-26 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16777836#comment-16777836
 ] 

liupengcheng commented on SPARK-26927:
--

[~Ngone51]

Let's say we got the following dynamic allocation settings: 

min: 20 initial: 20 max 50
 # we finished 50 tasks on 50 executors, and no more task to execute, thus 50 
executors will idle, then allocationManager will try to remove the 50 idle 
executors, if everything goes well, with the min number of executors 
guards(20), allocationManager will keep 20 executors not killed.
 # However, imagine such a case: when the `SparkListenerExecutorRemoved` comes 
before the `SparkListenerTaskStart`. – It's possible because the 
`SparkListenerTaskStart` event is posted by `DAGSchedulerEventLoop` thread, but 
the `SparkListenerExecutorRemoved` event is posted by `Netty` threads. 

          In this case, we might get a wrong number of `executorIds` due to the 
following logic: 
[https://github.com/apache/spark/blob/bc03c8b3faacd23edf40b8e75ffd9abb5881c50c/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L718]

         Explain:  allocationManager.executorIds does not contains executorId 
because it's removed in the onExecutorRemoved callback. so the removed 
executorId will be readded to the allocationManager.executorIds.

       3. Then, allocationManager now may think it has already got 21 or more 
executors, then we submit 20 tasks on 20 executors, then finish and idle. At 
this time, the allocationManager will not keep 20 min number of executors not 
removed, it remove 1 or more executors.

       4. so forth and back. .

       5. Finally, there might be no alive executors, but allocationManager 
still think it has kept more than min number of executors. An extrame case is 
the wrong number is greater than the max number of executors, so 
allocationManager will never schedule more executors and the application will 
hangs forever.

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg, Selection_046.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed even executor number is equal to `minNumExecutors` due to the 
> incorrect computation of `newExecutorTotal`(may greater than the 
> `minNumExecutors`), thus may finally causing zero available executors but a 
> wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
> !Selection_042.jpg!
> !Selection_043.jpg!
> !Selection_044.jpg!
> !Selection_045.jpg!
> !Selection_046.jpg!  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> 

[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Issue Type: Bug  (was: Improvement)

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg, Selection_046.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed even executor number is equal to `minNumExecutors` due to the 
> incorrect computation of `newExecutorTotal`(may greater than the 
> `minNumExecutors`), thus may finally causing zero available executors but a 
> wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
> !Selection_042.jpg!
> !Selection_043.jpg!
> !Selection_044.jpg!
> !Selection_045.jpg!
> !Selection_046.jpg!  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26941) incorrect computation of maxNumExecutorFailures in ApplicationMaster for streaming

2019-02-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26941:
-
Summary: incorrect computation of maxNumExecutorFailures in 
ApplicationMaster for streaming   (was: maxNumExecutorFailures should be 
computed with spark.streaming.dynamicAllocation.maxExecutors in 
ApplicationMaster for streaming )

> incorrect computation of maxNumExecutorFailures in ApplicationMaster for 
> streaming 
> ---
>
> Key: SPARK-26941
> URL: https://issues.apache.org/jira/browse/SPARK-26941
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, YARN
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, when enabled streaming dynamic allocation for streaming 
> applications, the maxNumExecutorFailures in ApplicationMaster is still 
> computed with `spark.dynamicAllocation.maxExecutors`. 
> Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` 
> instead.
> Related codes:
> {code:java}
> private val maxNumExecutorFailures = {
>   val effectiveNumExecutors =
> if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
>   sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
> } else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
>   sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
> } else {
>   sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
> }
>   // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation 
> is enabled. We need
>   // avoid the integer overflow here.
>   val defaultMaxNumExecutorFailures = math.max(3,
> if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * 
> effectiveNumExecutors))
>   
> sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26941) maxNumExecutorFailures should be computed with spark.streaming.dynamicAllocation.maxExecutors in ApplicationMaster for streaming

2019-02-20 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26941:
-
Component/s: YARN
Summary: maxNumExecutorFailures should be computed with 
spark.streaming.dynamicAllocation.maxExecutors in ApplicationMaster for 
streaming   (was: maxNumExecutorFailures should be computed with 
spark.streaming.dynamicAllocation.maxExecutors in streaming )

> maxNumExecutorFailures should be computed with 
> spark.streaming.dynamicAllocation.maxExecutors in ApplicationMaster for 
> streaming 
> -
>
> Key: SPARK-26941
> URL: https://issues.apache.org/jira/browse/SPARK-26941
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, YARN
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, when enabled streaming dynamic allocation for streaming 
> applications, the maxNumExecutorFailures in ApplicationMaster is still 
> computed with `spark.dynamicAllocation.maxExecutors`. 
> Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` 
> instead.
> Related codes:
> {code:java}
> private val maxNumExecutorFailures = {
>   val effectiveNumExecutors =
> if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
>   sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
> } else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
>   sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
> } else {
>   sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
> }
>   // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation 
> is enabled. We need
>   // avoid the integer overflow here.
>   val defaultMaxNumExecutorFailures = math.max(3,
> if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * 
> effectiveNumExecutors))
>   
> sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26941) maxNumExecutorFailures should be computed with spark.streaming.dynamicAllocation.maxExecutors in streaming

2019-02-20 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26941:


 Summary: maxNumExecutorFailures should be computed with 
spark.streaming.dynamicAllocation.maxExecutors in streaming 
 Key: SPARK-26941
 URL: https://issues.apache.org/jira/browse/SPARK-26941
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, when enabled streaming dynamic allocation for streaming 
applications, the maxNumExecutorFailures in ApplicationMaster is still computed 
with `spark.dynamicAllocation.maxExecutors`. 

Actually, we should consider `spark.streaming.dynamicAllocation.maxExecutors` 
instead.

Related codes:
{code:java}
private val maxNumExecutorFailures = {
  val effectiveNumExecutors =
if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
  sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
} else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
  sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
  sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
}
  // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is 
enabled. We need
  // avoid the integer overflow here.
  val defaultMaxNumExecutorFailures = math.max(3,
if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * 
effectiveNumExecutors))

  sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Description: 
Recently, we catch a bug that caused our production spark thriftserver hangs:

There is a race condition in the ExecutorAllocationManager that the 
`SparkListenerExecutorRemoved` event is posted before the 
`SparkListenerTaskStart` event, which will cause the incorrect result of 
`executorIds`, then when some executor idles, the real executors will be 
removed even executor number is equal to `minNumExecutors` due to the incorrect 
computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus 
may finally causing zero available executors but a wrong number of executorIds 
was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake 
`executorIds` released, because later idle event for the fake executors can not 
cause the real removal of these executors, as they are already removed and they 
are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`.

Logs:

!Selection_042.jpg!

!Selection_043.jpg!

!Selection_044.jpg!

!Selection_045.jpg!

!Selection_046.jpg!  

EventLogs(DisOrder of events):
{code:java}
{"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
ID":"131","Removed Reason":"Container 
container_e28_1547530852233_236191_02_000180 exited from explicit termination 
request."}

{"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt ID":0,"Task 
Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
Time":1549936032872,"Executor 
ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
"Speculative":false,"Getting Result Time":0,"Finish 
Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
 Faile d 
Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
 ue":39,"Internal":true,"Count Failed 
Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
 Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
 Failed Values":true},{"ID":12923962,"Na 
me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
 nt Failed 
Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
 Failed Values":true},{"ID":12921550,"Name":"number of output 
rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
rows","Update":"23","Value":"45","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total (min, 
med, max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"}]}}

{code}
 

  was:
Recently, we catch a bug that caused our production spark thriftserver hangs:

There is a race condition in the ExecutorAllocationManager that the 
`SparkListenerExecutorRemoved` event is posted before the 
`SparkListenerTaskStart` event, which will cause the incorrect result of 
`executorIds`, then when some executor idles, the real executors will be 
removed due to the incorrect computation of `newExecutorTotal`, because it may 
greater than the `minNumExecutors`, thus may finally causing zero available 
executors but a wrong number of executorIds was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake 
`executorIds` released, because later idle event for the fake executors can not 
cause the real removal of these executors, as they are already removed and they 
are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`.

Logs:

!Selection_042.jpg!

!Selection_043.jpg!

!Selection_044.jpg!

!Selection_045.jpg!

!Selection_046.jpg!  

EventLogs(DisOrder of events):
{code:java}
{"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
ID":"131","Removed Reason":"Container 
container_e28_1547530852233_236191_02_000180 exited from explicit termination 
request."}


[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Attachment: Selection_046.jpg

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg, Selection_046.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed due to the incorrect computation of `newExecutorTotal`, because it 
> may greater than the `minNumExecutors`, thus may finally causing zero 
> available executors but a wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
>  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Description: 
Recently, we catch a bug that caused our production spark thriftserver hangs:

There is a race condition in the ExecutorAllocationManager that the 
`SparkListenerExecutorRemoved` event is posted before the 
`SparkListenerTaskStart` event, which will cause the incorrect result of 
`executorIds`, then when some executor idles, the real executors will be 
removed due to the incorrect computation of `newExecutorTotal`, because it may 
greater than the `minNumExecutors`, thus may finally causing zero available 
executors but a wrong number of executorIds was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake 
`executorIds` released, because later idle event for the fake executors can not 
cause the real removal of these executors, as they are already removed and they 
are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`.

Logs:

!Selection_042.jpg!

!Selection_043.jpg!

!Selection_044.jpg!

!Selection_045.jpg!

!Selection_046.jpg!  

EventLogs(DisOrder of events):
{code:java}
{"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
ID":"131","Removed Reason":"Container 
container_e28_1547530852233_236191_02_000180 exited from explicit termination 
request."}

{"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt ID":0,"Task 
Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
Time":1549936032872,"Executor 
ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
"Speculative":false,"Getting Result Time":0,"Finish 
Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
 Faile d 
Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
 ue":39,"Internal":true,"Count Failed 
Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
 Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
 Failed Values":true},{"ID":12923962,"Na 
me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
 nt Failed 
Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
 Failed Values":true},{"ID":12921550,"Name":"number of output 
rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
rows","Update":"23","Value":"45","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total (min, 
med, max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"}]}}

{code}
 

  was:
Recently, we catch a bug that caused our production spark thriftserver hangs:

There is a race condition in the ExecutorAllocationManager that the 
`SparkListenerExecutorRemoved` event is posted before the 
`SparkListenerTaskStart` event, which will cause the incorrect result of 
`executorIds`, then when some executor idles, the real executors will be 
removed due to the incorrect computation of `newExecutorTotal`, because it may 
greater than the `minNumExecutors`, thus may finally causing zero available 
executors but a wrong number of executorIds was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake 
`executorIds` released, because later idle event for the fake executors can not 
cause the real removal of these executors, as they are already removed and they 
are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`.

Logs:

 

EventLogs(DisOrder of events):
{code:java}
{"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
ID":"131","Removed Reason":"Container 
container_e28_1547530852233_236191_02_000180 exited from explicit termination 
request."}

{"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt ID":0,"Task 
Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
Time":1549936032872,"Executor 

[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Attachment: Selection_045.jpg

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed due to the incorrect computation of `newExecutorTotal`, because it 
> may greater than the `minNumExecutors`, thus may finally causing zero 
> available executors but a wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
>  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Attachment: Selection_043.jpg

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed due to the incorrect computation of `newExecutorTotal`, because it 
> may greater than the `minNumExecutors`, thus may finally causing zero 
> available executors but a wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
>  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Attachment: Selection_044.jpg

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg, Selection_043.jpg, Selection_044.jpg, 
> Selection_045.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed due to the incorrect computation of `newExecutorTotal`, because it 
> may greater than the `minNumExecutors`, thus may finally causing zero 
> available executors but a wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
>  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26927:
-
Attachment: Selection_042.jpg

> Race condition may cause dynamic allocation not working
> ---
>
> Key: SPARK-26927
> URL: https://issues.apache.org/jira/browse/SPARK-26927
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_042.jpg
>
>
> Recently, we catch a bug that caused our production spark thriftserver hangs:
> There is a race condition in the ExecutorAllocationManager that the 
> `SparkListenerExecutorRemoved` event is posted before the 
> `SparkListenerTaskStart` event, which will cause the incorrect result of 
> `executorIds`, then when some executor idles, the real executors will be 
> removed due to the incorrect computation of `newExecutorTotal`, because it 
> may greater than the `minNumExecutors`, thus may finally causing zero 
> available executors but a wrong number of executorIds was kept in memory.
> What's more, even the `SparkListenerTaskEnd` event can not make the fake 
> `executorIds` released, because later idle event for the fake executors can 
> not cause the real removal of these executors, as they are already removed 
> and they are not exist in the `executorDataMap`  of 
> `CoaseGrainedSchedulerBackend`.
> Logs:
>  
> EventLogs(DisOrder of events):
> {code:java}
> {"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
> ID":"131","Removed Reason":"Container 
> container_e28_1547530852233_236191_02_000180 exited from explicit termination 
> request."}
> {"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt 
> ID":0,"Task Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
> Time":1549936032872,"Executor 
> ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
> "Speculative":false,"Getting Result Time":0,"Finish 
> Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
>  Faile d 
> Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
>  ue":39,"Internal":true,"Count Failed 
> Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
>  Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
> ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
> Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
>  Failed Values":true},{"ID":12923962,"Na 
> me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
>  Failed 
> Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
>  nt Failed 
> Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
>  Failed Values":true},{"ID":12921550,"Name":"number of output 
> rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
> rows","Update":"23","Value":"45","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total 
> (min, med, 
> max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
> med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
> Values":true,"Metadata":"sql"}]}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26927) Race condition may cause dynamic allocation not working

2019-02-19 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26927:


 Summary: Race condition may cause dynamic allocation not working
 Key: SPARK-26927
 URL: https://issues.apache.org/jira/browse/SPARK-26927
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Recently, we catch a bug that caused our production spark thriftserver hangs:

There is a race condition in the ExecutorAllocationManager that the 
`SparkListenerExecutorRemoved` event is posted before the 
`SparkListenerTaskStart` event, which will cause the incorrect result of 
`executorIds`, then when some executor idles, the real executors will be 
removed due to the incorrect computation of `newExecutorTotal`, because it may 
greater than the `minNumExecutors`, thus may finally causing zero available 
executors but a wrong number of executorIds was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake 
`executorIds` released, because later idle event for the fake executors can not 
cause the real removal of these executors, as they are already removed and they 
are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`.

Logs:

 

EventLogs(DisOrder of events):
{code:java}
{"Event":"SparkListenerExecutorRemoved","Timestamp":1549936077543,"Executor 
ID":"131","Removed Reason":"Container 
container_e28_1547530852233_236191_02_000180 exited from explicit termination 
request."}

{"Event":"SparkListenerTaskStart","Stage ID":136689,"Stage Attempt ID":0,"Task 
Info":{"Task ID":448048,"Index":2,"Attempt":0,"Launch 
Time":1549936032872,"Executor 
ID":"131","Host":"mb2-hadoop-prc-st474.awsind","Locality":"RACK_LOCAL", 
"Speculative":false,"Getting Result Time":0,"Finish 
Time":1549936032906,"Failed":false,"Killed":false,"Accumulables":[{"ID":12923945,"Name":"internal.metrics.executorDeserializeTime","Update":10,"Value":13,"Internal":true,"Count
 Faile d 
Values":true},{"ID":12923946,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2244016,"Value":4286494,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923947,"Name":"internal.metrics.executorRunTime","Update":20,"Val
 ue":39,"Internal":true,"Count Failed 
Values":true},{"ID":12923948,"Name":"internal.metrics.executorCpuTime","Update":13412614,"Value":26759061,"Internal":true,"Count
 Failed Values":true},{"ID":12923949,"Name":"internal.metrics.resultS 
ize","Update":3578,"Value":7156,"Internal":true,"Count Failed 
Values":true},{"ID":12923954,"Name":"internal.metrics.peakExecutionMemory","Update":33816576,"Value":67633152,"Internal":true,"Count
 Failed Values":true},{"ID":12923962,"Na 
me":"internal.metrics.shuffle.write.bytesWritten","Update":1367,"Value":2774,"Internal":true,"Count
 Failed 
Values":true},{"ID":12923963,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":23,"Value":45,"Internal":true,"Cou
 nt Failed 
Values":true},{"ID":12923964,"Name":"internal.metrics.shuffle.write.writeTime","Update":3259051,"Value":6858121,"Internal":true,"Count
 Failed Values":true},{"ID":12921550,"Name":"number of output 
rows","Update":"158","Value" :"289","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921546,"Name":"number of output 
rows","Update":"23","Value":"45","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921547,"Name":"peak memo ry total (min, 
med, max)","Update":"33816575","Value":"67633149","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"},{"ID":12921541,"Name":"data size total (min, 
med, max)","Update":"551","Value":"1077","Internal":true,"Count Failed 
Values":true,"Metadata":"sql"}]}}

{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26892) saveAsTextFile throws NullPointerException when null row present

2019-02-15 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26892:


 Summary: saveAsTextFile throws NullPointerException  when null row 
present 
 Key: SPARK-26892
 URL: https://issues.apache.org/jira/browse/SPARK-26892
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: liupengcheng


We encoutered this problem in our production cluster, it can be reproduced by 
the following code:
{code:java}
scala> sc.parallelize(Seq(1,null),1).saveAsTextFile("/tmp/foobar.dat")
19/02/15 21:39:17 ERROR Utils: Aborting task
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$3(RDD.scala:1510)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at 
org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:129)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1352)
at 
org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:127)
at 
org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:425)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1318)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:428)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26877) Support user staging directory in yarn mode

2019-02-14 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26877:


 Summary: Support user staging directory in yarn mode
 Key: SPARK-26877
 URL: https://issues.apache.org/jira/browse/SPARK-26877
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 2.4.0
Reporter: liupengcheng


Currently, when running applications on yarn mode, the app staging directory of 
 is controlled by `spark.yarn.stagingDir` config if specified, and this 
directory cannot separate different users, somtimes, it's inconvenient for file 
and quota management for users.

For example, user may have different quota for their own app staging dir. and 
they may also not want to use the home directory as the staging dir, because it 
might used mainly for private user files or data. The quota may also be 
different between private data and the app staging dir.

So I propose to add user sub directories under this app staging dir.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26877) Support user-level app staging directory in yarn mode when spark.yarn.stagingDir specified

2019-02-13 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26877:
-
Summary: Support user-level app staging directory in yarn mode when 
spark.yarn.stagingDir specified  (was: Support user staging directory in yarn 
mode)

> Support user-level app staging directory in yarn mode when 
> spark.yarn.stagingDir specified
> --
>
> Key: SPARK-26877
> URL: https://issues.apache.org/jira/browse/SPARK-26877
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> Currently, when running applications on yarn mode, the app staging directory 
> of  is controlled by `spark.yarn.stagingDir` config if specified, and this 
> directory cannot separate different users, somtimes, it's inconvenient for 
> file and quota management for users.
> For example, user may have different quota for their own app staging dir. and 
> they may also not want to use the home directory as the staging dir, because 
> it might used mainly for private user files or data. The quota may also be 
> different between private data and the app staging dir.
> So I propose to add user sub directories under this app staging dir.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Single disk broken causing YarnShuffleSerivce not available

2019-02-02 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Summary: Single disk broken causing YarnShuffleSerivce not available  (was: 
Disk broken causing YarnShuffleSerivce not available)

> Single disk broken causing YarnShuffleSerivce not available
> ---
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a single directory, which may be 
> unavailable if disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26768:
-
Description: 
Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

  !Selection_037.jpg!

So I think we should remove these useless code for easy reading.

 

  was:
Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

 

So I think we should remove these useless code for easy reading.

 


> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was 

[jira] [Updated] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26768:
-
Attachment: Selection_037.jpg

> Remove useless code in BlockManager
> ---
>
> Key: SPARK-26768
> URL: https://issues.apache.org/jira/browse/SPARK-26768
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
> Attachments: Selection_037.jpg
>
>
> Recently, when I was reading some code of `BlockManager.getBlockData`, I 
> found that there are useless code that would never reach. The related codes 
> is as below:
>  
> {code:java}
> override def getBlockData(blockId: BlockId): ManagedBuffer = {
>   if (blockId.isShuffle) {
> 
> shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
>   } else {
> getLocalBytes(blockId) match {
>   case Some(blockData) =>
> new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
> true)
>   case None =>
> // If this block manager receives a request for a block that it 
> doesn't have then it's
> // likely that the master has outdated block statuses for this block. 
> Therefore, we send
> // an RPC so that this block is marked as being unavailable from this 
> block manager.
> reportBlockStatus(blockId, BlockStatus.empty)
> throw new BlockNotFoundException(blockId.toString)
> }
>   }
> }
> {code}
> {code:java}
> def getLocalBytes(blockId: BlockId): Option[BlockData] = {
>   logDebug(s"Getting local block $blockId as bytes")
>   // As an optimization for map output fetches, if the block is for a 
> shuffle, return it
>   // without acquiring a lock; the disk store never deletes (recent) items so 
> this should work
>   if (blockId.isShuffle) {
> val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
> // TODO: This should gracefully handle case where local block is not 
> available. Currently
> // downstream code will throw an exception.
> val buf = new ChunkedByteBuffer(
>   
> shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
> Some(new ByteBufferBlockData(buf, true))
>   } else {
> blockInfoManager.lockForReading(blockId).map { info => 
> doGetLocalBytes(blockId, info) }
>   }
> }
> {code}
> the `blockId.isShuffle` is checked twice, but however it seems that in the 
> method calling hierarchy of `BlockManager.getLocalBytes`, the another 
> callsite of the `BlockManager.getLocalBytes` is at 
> `TorrentBroadcast.readBlocks` where the blockId can never be a 
> `ShuffleBlockId`.
>  
> So I think we should remove these useless code for easy reading.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26768) Remove useless code in BlockManager

2019-01-29 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26768:


 Summary: Remove useless code in BlockManager
 Key: SPARK-26768
 URL: https://issues.apache.org/jira/browse/SPARK-26768
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: liupengcheng


Recently, when I was reading some code of `BlockManager.getBlockData`, I found 
that there are useless code that would never reach. The related codes is as 
below:

 
{code:java}
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {

shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
getLocalBytes(blockId) match {
  case Some(blockData) =>
new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, 
true)
  case None =>
// If this block manager receives a request for a block that it doesn't 
have then it's
// likely that the master has outdated block statuses for this block. 
Therefore, we send
// an RPC so that this block is marked as being unavailable from this 
block manager.
reportBlockStatus(blockId, BlockStatus.empty)
throw new BlockNotFoundException(blockId.toString)
}
  }
}
{code}
{code:java}
def getLocalBytes(blockId: BlockId): Option[BlockData] = {
  logDebug(s"Getting local block $blockId as bytes")
  // As an optimization for map output fetches, if the block is for a shuffle, 
return it
  // without acquiring a lock; the disk store never deletes (recent) items so 
this should work
  if (blockId.isShuffle) {
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not 
available. Currently
// downstream code will throw an exception.
val buf = new ChunkedByteBuffer(
  
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Some(new ByteBufferBlockData(buf, true))
  } else {
blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
  }
}
{code}
the `blockId.isShuffle` is checked twice, but however it seems that in the 
method calling hierarchy of `BlockManager.getLocalBytes`, the another callsite 
of the `BlockManager.getLocalBytes` is at `TorrentBroadcast.readBlocks` where 
the blockId can never be a `ShuffleBlockId`.

 

So I think we should remove these useless code for easy reading.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26750) Estimate memory overhead should taking multi-cores into account

2019-01-28 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26750:
-
Summary: Estimate memory overhead should taking multi-cores into account  
(was: Estimate memory overhead with multi-cores)

> Estimate memory overhead should taking multi-cores into account
> ---
>
> Key: SPARK-26750
> URL: https://issues.apache.org/jira/browse/SPARK-26750
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, spark esitmate the memory overhead without taking multi-cores into 
> account, sometimes, it might cause direct memory oom, or killed by yarn for 
> exceeding requested physical memory. 
> I think the memory overhead is related to the executor's core number(mainly 
> the spark direct memory and some related jvm native memory, for instance, the 
> thread stacks, GC data etc.). so maybe we can improve this estimation by 
> taking the core number into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26750) Estimate memory overhead with multi-cores

2019-01-28 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26750:


 Summary: Estimate memory overhead with multi-cores
 Key: SPARK-26750
 URL: https://issues.apache.org/jira/browse/SPARK-26750
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 2.4.0
Reporter: liupengcheng


Currently, spark esitmate the memory overhead without taking multi-cores into 
account, sometimes, it might cause direct memory oom, or killed by yarn for 
exceeding requested physical memory. 

I think the memory overhead is related to the executor's core number(mainly the 
spark direct memory and some related jvm native memory, for instance, the 
thread stacks, GC data etc.). so maybe we can improve this estimation by taking 
the core number into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26689) Single disk broken causing broadcast failure

2019-01-27 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26689:
-
Summary: Single disk broken causing broadcast failure  (was: Disk broken 
causing broadcast failure)

> Single disk broken causing broadcast failure
> 
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26689) Disk broken causing broadcast failure

2019-01-26 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16752961#comment-16752961
 ] 

liupengcheng commented on SPARK-26689:
--

[~tgraves] In production environment, yarn.nodemanager.local-dirs is always 
configured as multiple directories which are mounted on different disks. so I 
think since we use this parameter in spark, we should also make use of this 
feature, and should not expect job failure when encountering only a single disk 
error.

This PR I put up can also reduce the FetchFailure and even Job failure caused 
by FetchFailed if blacklist not enabled or node not blacklisted(task may be 
repeated scheduled to the unhealthy node)

> Disk broken causing broadcast failure
> -
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26728) Make rdd.unpersist blocking configurable

2019-01-25 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26728:
-
Summary: Make rdd.unpersist blocking configurable  (was: Make rdd.unpersist 
and broadcast.unpersist blocking configurable)

> Make rdd.unpersist blocking configurable
> 
>
> Key: SPARK-26728
> URL: https://issues.apache.org/jira/browse/SPARK-26728
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, rdd.unpersist's blocking argument is set to true by default. 
> However, in actual production cluster(especially large cluster), node lost or 
> network issue can always happen.
> Users always use rdd.unpersist as non-exceptional, so sometimes the blocking 
> unpersist may cause user's job failure, and this happened many times in our 
> cluster.
> {code:java}
> 2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: 
> Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
> User class threw exception: java.io.IOException: Failed to send RPC 
> 7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
> java.io.IOException: Failed to send RPC 7571440800577648876 to 
> c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
> java.nio.channels.ClosedChannelException
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
>   at 
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>   at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
>   at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
>   at 
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
>   at 
> 

[jira] [Created] (SPARK-26728) Make rdd.unpersist and broadcast.unpersist blocking configurable

2019-01-25 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26728:


 Summary: Make rdd.unpersist and broadcast.unpersist blocking 
configurable
 Key: SPARK-26728
 URL: https://issues.apache.org/jira/browse/SPARK-26728
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, rdd.unpersist's blocking argument is set to true by default. 
However, in actual production cluster(especially large cluster), node lost or 
network issue can always happen.

Users always use rdd.unpersist as non-exceptional, so sometimes the blocking 
unpersist may cause user's job failure, and this happened many times in our 
cluster.
{code:java}
2018-05-16,13:28:33,489 WARN org.apache.spark.storage.BlockManagerMaster: 
Failed to remove RDD 15 - Failed to send RPC 7571440800577648876 to 
c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
java.nio.channels.ClosedChannelException
java.io.IOException: Failed to send RPC 7571440800577648876 to 
c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
java.nio.channels.ClosedChannelException
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at 
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at 
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
2018-05-16,13:28:33,489 ERROR org.apache.spark.deploy.yarn.ApplicationMaster: 
User class threw exception: java.io.IOException: Failed to send RPC 
7571440800577648876 to c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
java.nio.channels.ClosedChannelException
java.io.IOException: Failed to send RPC 7571440800577648876 to 
c3-hadoop-prc-st2325.bj/10.136.136.25:43474: 
java.nio.channels.ClosedChannelException
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
at 
io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
at 
io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
at 
io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 

[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a single directory, which may be 
> unavailable if disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a single directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a single directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start.

What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start even if there are executors on 
the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start even if there are 
> executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Description: 
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the shuffle service can not start.

What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.

  was:
Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.


> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the shuffle service can not start.
> What's more, if the the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26689) Disk broken causing broadcast failure

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26689:
-
Summary: Disk broken causing broadcast failure  (was: Bad disk causing 
broadcast failure)

> Disk broken causing broadcast failure
> -
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Issue Type: Bug  (was: Improvement)

> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26712) Disk broken causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26712:
-
Summary: Disk broken causing YarnShuffleSerivce not available  (was: Disk 
broken caused NM recovery failure causing YarnShuffleSerivce not available)

> Disk broken causing YarnShuffleSerivce not available
> 
>
> Key: SPARK-26712
> URL: https://issues.apache.org/jira/browse/SPARK-26712
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
> enabled, however, the recovery file is under a fixed directory, which may be 
> unavailable if Disk broken. So if a NM restart happen(may be caused by kill 
> or some reason), the `ExecutorShuffleInfo` will lost, and causes the 
> shuffleservice unavailble even if there are executors on the node.
> This may finally cause job failures(if node or executors on it not 
> blacklisted), or at least, it will cause resource waste.(shuffle from this 
> node always failed.)
> For long running spark applications, this problem may be more serious.
> So I think we should support multi directories(multi disk) for this recovery. 
> and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26712) Disk broken caused NM recovery failure causing YarnShuffleSerivce not available

2019-01-23 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26712:


 Summary: Disk broken caused NM recovery failure causing 
YarnShuffleSerivce not available
 Key: SPARK-26712
 URL: https://issues.apache.org/jira/browse/SPARK-26712
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


Currently, `ExecutorShuffleInfo` can be recovered from file if NM recovery 
enabled, however, the recovery file is under a fixed directory, which may be 
unavailable if Disk broken. So if a NM restart happen(may be caused by kill or 
some reason), the `ExecutorShuffleInfo` will lost, and causes the 
shuffleservice unavailble even if there are executors on the node.

This may finally cause job failures(if node or executors on it not 
blacklisted), or at least, it will cause resource waste.(shuffle from this node 
always failed.)

For long running spark applications, this problem may be more serious.

So I think we should support multi directories(multi disk) for this recovery. 
and change to good directory and when the disk of current directory is broken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26689) Bad disk causing broadcast failure

2019-01-23 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750669#comment-16750669
 ] 

liupengcheng commented on SPARK-26689:
--

[~tgraves] We use yarn as the resource manager, and we run spark applications 
on Yarn with spark version 2.1.0. The information is already provided in the 
environment field. Is there any more information you want me to provide? BTW, I 
don't think this exception is related to resource manager.

> Bad disk causing broadcast failure
> --
>
> Key: SPARK-26689
> URL: https://issues.apache.org/jira/browse/SPARK-26689
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark on Yarn
> Mutliple Disk
>Reporter: liupengcheng
>Priority: Major
>
> We encoutered an application failure in our production cluster which caused 
> by the bad disk problems. It will incur application failure.
> {code:java}
> Job aborted due to stage failure: Task serialization failed: 
> java.io.IOException: Failed to create local dir in 
> /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
> org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
> org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
> org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> We have multiple disk on our cluster nodes, however, it still fails. I think 
> it's because spark does not handle bad disk in `DiskBlockManager` currently. 
> Actually, we can handle bad disk in multiple disk environment to avoid 
> application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26689) Bad disk causing broadcast failure

2019-01-22 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26689:


 Summary: Bad disk causing broadcast failure
 Key: SPARK-26689
 URL: https://issues.apache.org/jira/browse/SPARK-26689
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
 Environment: Spark on Yarn

Mutliple Disk
Reporter: liupengcheng


We encoutered an application failure in our production cluster which caused by 
the bad disk problems. It will incur application failure.
{code:java}
Job aborted due to stage failure: Task serialization failed: 
java.io.IOException: Failed to create local dir in 
/home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:85)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
scala.Option.foreach(Option.scala:236)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}
We have multiple disk on our cluster nodes, however, it still fails. I think 
it's because spark does not handle bad disk in `DiskBlockManager` currently. 

Actually, we can handle bad disk in multiple disk environment to avoid 
application failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26684) Add logs when allocating large memory for PooledByteBufAllocator

2019-01-22 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26684:


 Summary: Add logs when allocating large memory for 
PooledByteBufAllocator
 Key: SPARK-26684
 URL: https://issues.apache.org/jira/browse/SPARK-26684
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.4.0
Reporter: liupengcheng


Currently, Spark use `PooledByteBufAllocator` to allocate memory for channel 
reading. However, the allocated heap/offheap memory size is not tracked. 
Sometimes, this make it difficult to  find out the cause of OOM failures(for 
instance, direct memory oom). we have to dump the heap and use more advanced 
tools like MAT to locate the cause.

Actually, we can add some logs for `PooledByteBufAllocator` when allocating 
large memory, which can facilitate the debugging.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26674) Consolidate CompositeByteBuf when reading large frame

2019-01-21 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26674:


 Summary: Consolidate CompositeByteBuf when reading large frame
 Key: SPARK-26674
 URL: https://issues.apache.org/jira/browse/SPARK-26674
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: liupengcheng


Currently, TransportFrameDecoder will not consolidate the buffers read from 
network which may cause memory waste. Actually, bytebuf's writtenIndex is far 
less than it's capacity  in most cases, so we can optimize it by doing 
consolidation.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26660) Add warning logs for large taskBinary size

2019-01-17 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26660:
-
Attachment: screenshot-1.png

> Add warning logs for large taskBinary size
> --
>
> Key: SPARK-26660
> URL: https://issues.apache.org/jira/browse/SPARK-26660
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
> Environment: Spark2.1
>Reporter: liupengcheng
>Priority: Minor
> Attachments: screenshot-1.png
>
>
> In our production environment, we encountered a OOM problem in ML 
> application. After carefully investigation, it was found that some ML library 
> may generate large ml model, thus causing executor may not able to 
> deserialize it and result in OOM failures.
> !screenshot-1.png!
> In order to facilitate the debuging of memory problem caused by large 
> taskBinary broadcast, we may should add same warning logs for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26660) Add warning logs for large taskBinary size

2019-01-17 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26660:


 Summary: Add warning logs for large taskBinary size
 Key: SPARK-26660
 URL: https://issues.apache.org/jira/browse/SPARK-26660
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
 Environment: Spark2.1
Reporter: liupengcheng


In our production environment, we encountered a OOM problem in ML application. 
After carefully investigation, it was found that some ML library may generate 
large ml model, thus causing executor may not able to deserialize it and result 
in OOM failures.

!screenshot-1.png!

In order to facilitate the debuging of memory problem caused by large 
taskBinary broadcast, we may should add same warning logs for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26634) OutputCommitCoordinator may allow task of FetchFailureStage commit again

2019-01-16 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26634:
-
Affects Version/s: (was: 2.4.0)

> OutputCommitCoordinator may allow task of FetchFailureStage commit again
> 
>
> Key: SPARK-26634
> URL: https://issues.apache.org/jira/browse/SPARK-26634
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
>
> In our production spark cluster, we encoutered a case that the task of retry 
> stage due to FetchFailure is denied to commit. However, the task is the first 
> attempt of this retry stage.
> After carefully investigating, it was found that the call of canCommit of 
> OutputCommitCoordinator would allow the task of FetchFailure stage(with the 
> same parition number as new task of retry stage) commit. which result in the 
> TaskCommitDenied for all the task (same partition) of retry stage. Becuase of 
> TaskCommitDenied is not countTowardsFailure, thus might cause Application 
> hangs forever.
>  
> {code:java}
> 2019-01-09,08:39:53,676 INFO org.apache.spark.scheduler.TaskSetManager: 
> Starting task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, 
> executor 456, partition 138, PROCESS_LOCAL, 5829 bytes)
> 2019-01-09,08:43:37,514 INFO org.apache.spark.scheduler.TaskSetManager: 
> Finished task 138.0 in stage 5.0 (TID 30634) in 466958 ms on 
> zjy-hadoop-prc-st1212.bj (executor 1632) (674/5000)
> 2019-01-09,08:45:57,372 WARN org.apache.spark.scheduler.TaskSetManager: Lost 
> task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 456): 
> TaskCommitDenied (Driver denied task commit) for job: 5, partition: 138, 
> attemptNumber: 1
> 166483 2019-01-09,08:45:57,373 INFO 
> org.apache.spark.scheduler.OutputCommitCoordinator: Task was denied 
> committing, stage: 5, partition: 138, attempt number: 0, attempt 
> number(counting failed stage): 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26634) OutputCommitCoordinator may allow task of FetchFailureStage commit again

2019-01-15 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26634:
-
Description: 
In our production spark cluster, we encoutered a case that the task of retry 
stage due to FetchFailure is denied to commit. However, the task is the first 
attempt of this retry stage.

After carefully investigating, it was found that the call of canCommit of 
OutputCommitCoordinator would allow the task of FetchFailure stage(with the 
same parition number as new task of retry stage) commit. which result in the 
TaskCommitDenied for all the task (same partition) of retry stage. Becuase of 
TaskCommitDenied is not countTowardsFailure, thus might cause Application hangs 
forever.

 
{code:java}
2019-01-09,08:39:53,676 INFO org.apache.spark.scheduler.TaskSetManager: 
Starting task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 
456, partition 138, PROCESS_LOCAL, 5829 bytes)
2019-01-09,08:43:37,514 INFO org.apache.spark.scheduler.TaskSetManager: 
Finished task 138.0 in stage 5.0 (TID 30634) in 466958 ms on 
zjy-hadoop-prc-st1212.bj (executor 1632) (674/5000)
2019-01-09,08:45:57,372 WARN org.apache.spark.scheduler.TaskSetManager: Lost 
task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 456): 
TaskCommitDenied (Driver denied task commit) for job: 5, partition: 138, 
attemptNumber: 1
166483 2019-01-09,08:45:57,373 INFO 
org.apache.spark.scheduler.OutputCommitCoordinator: Task was denied committing, 
stage: 5, partition: 138, attempt number: 0, attempt number(counting failed 
stage): 1
{code}

  was:
In our production spark cluster, we encoutered a case that the task of retry 
stage due to FetchFailure is denied to commit. However, the task is the first 
attempt of this retry stage.

After carefully investigating, it was found that the call of canCommit of 
OutputCommitCoordinator would allow the task of FetchFailure stage(with the 
same parition number as new task of retry stage) commit. which result in the 
TaskCommitDenied for all the task of retry stage. This is a correctness bug.


> OutputCommitCoordinator may allow task of FetchFailureStage commit again
> 
>
> Key: SPARK-26634
> URL: https://issues.apache.org/jira/browse/SPARK-26634
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> In our production spark cluster, we encoutered a case that the task of retry 
> stage due to FetchFailure is denied to commit. However, the task is the first 
> attempt of this retry stage.
> After carefully investigating, it was found that the call of canCommit of 
> OutputCommitCoordinator would allow the task of FetchFailure stage(with the 
> same parition number as new task of retry stage) commit. which result in the 
> TaskCommitDenied for all the task (same partition) of retry stage. Becuase of 
> TaskCommitDenied is not countTowardsFailure, thus might cause Application 
> hangs forever.
>  
> {code:java}
> 2019-01-09,08:39:53,676 INFO org.apache.spark.scheduler.TaskSetManager: 
> Starting task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, 
> executor 456, partition 138, PROCESS_LOCAL, 5829 bytes)
> 2019-01-09,08:43:37,514 INFO org.apache.spark.scheduler.TaskSetManager: 
> Finished task 138.0 in stage 5.0 (TID 30634) in 466958 ms on 
> zjy-hadoop-prc-st1212.bj (executor 1632) (674/5000)
> 2019-01-09,08:45:57,372 WARN org.apache.spark.scheduler.TaskSetManager: Lost 
> task 138.0 in stage 5.1 (TID 31437, zjy-hadoop-prc-st159.bj, executor 456): 
> TaskCommitDenied (Driver denied task commit) for job: 5, partition: 138, 
> attemptNumber: 1
> 166483 2019-01-09,08:45:57,373 INFO 
> org.apache.spark.scheduler.OutputCommitCoordinator: Task was denied 
> committing, stage: 5, partition: 138, attempt number: 0, attempt 
> number(counting failed stage): 1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26634) OutputCommitCoordinator may allow task of FetchFailureStage commit again

2019-01-15 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26634:


 Summary: OutputCommitCoordinator may allow task of 
FetchFailureStage commit again
 Key: SPARK-26634
 URL: https://issues.apache.org/jira/browse/SPARK-26634
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.1.0
Reporter: liupengcheng


In our production spark cluster, we encoutered a case that the task of retry 
stage due to FetchFailure is denied to commit. However, the task is the first 
attempt of this retry stage.

After carefully investigating, it was found that the call of canCommit of 
OutputCommitCoordinator would allow the task of FetchFailure stage(with the 
same parition number as new task of retry stage) commit. which result in the 
TaskCommitDenied for all the task of retry stage. This is a correctness bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26614) Speculation kill might cause job failure

2019-01-15 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26614:
-
Fix Version/s: 2.3.1
   2.4.0

> Speculation kill might cause job failure
> 
>
> Key: SPARK-26614
> URL: https://issues.apache.org/jira/browse/SPARK-26614
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2, 2.3.1, 2.4.0
>
>
> This issue is similar to SPARK-26612
> Some odd exceptions might be thrown in speculation kill, however, currently 
> spark does not handle this case and will report failure to Driver. This 
> exception will be counting towards MAX_TASK_FAILURES, and might result in the 
> job failure.
>  
> I think we can check state of task to tell if we report failure or killed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26612) Speculation kill causing finished stage recomputed

2019-01-15 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26612:
-
Fix Version/s: 2.3.1
   2.4.0

> Speculation kill causing finished stage recomputed
> --
>
> Key: SPARK-26612
> URL: https://issues.apache.org/jira/browse/SPARK-26612
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2, 2.3.1, 2.4.0
>
>
> In our production spark cluster, we encoutered this issue.
> A more detailed explaination:
> Let's say we have two stage: stage0.0 and stage1.0, and stage 0 is a 
> shuffleMapStage, and stage1 has dependency on stage0, and we enabled 
> spark.speculation.
> when task0.0 of stage1.0 finished, and is trying to kill task0.1(speculative) 
> of stage1.0, task0.1 throws a wrapped FetchFailedException whose root cause 
> is  java.nio.channels.ClosedByInterruptException(caused by speculation kill).
> Exception stack:
> {code:java}
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply$mcZ$sp(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.tryThrowFetchFailedException(BlockStoreShuffleReader.scala:127)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.hasNext(BlockStoreShuffleReader.scala:148)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:308)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer{file=/home/work/hdd6/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1505730831071_76097/blockmgr-bb226ff8-dd5f-4296-b3cc-ce7ff5cc60cc/37/shuffle_1_1182_0.data,
>  offset=17789166, length=35709}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:114)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:371)
> ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
> at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:155)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
> {code}
> Seems in latest spark version, this problem still exists! 
> FetchFailedException might be throwed in ShuffleBlockFetcherIterator.next, 
> where the task is accessing local shuffle block or encountering a stream 
> corruption. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26612) Speculation kill causing finished stage recomputed

2019-01-15 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743571#comment-16743571
 ] 

liupengcheng commented on SPARK-26612:
--

Already resolved by https://github.com/apache/spark/pull/20987

> Speculation kill causing finished stage recomputed
> --
>
> Key: SPARK-26612
> URL: https://issues.apache.org/jira/browse/SPARK-26612
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2
>
>
> In our production spark cluster, we encoutered this issue.
> A more detailed explaination:
> Let's say we have two stage: stage0.0 and stage1.0, and stage 0 is a 
> shuffleMapStage, and stage1 has dependency on stage0, and we enabled 
> spark.speculation.
> when task0.0 of stage1.0 finished, and is trying to kill task0.1(speculative) 
> of stage1.0, task0.1 throws a wrapped FetchFailedException whose root cause 
> is  java.nio.channels.ClosedByInterruptException(caused by speculation kill).
> Exception stack:
> {code:java}
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply$mcZ$sp(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.tryThrowFetchFailedException(BlockStoreShuffleReader.scala:127)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.hasNext(BlockStoreShuffleReader.scala:148)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:308)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer{file=/home/work/hdd6/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1505730831071_76097/blockmgr-bb226ff8-dd5f-4296-b3cc-ce7ff5cc60cc/37/shuffle_1_1182_0.data,
>  offset=17789166, length=35709}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:114)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:371)
> ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
> at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:155)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
> {code}
> Seems in latest spark version, this problem still exists! 
> FetchFailedException might be throwed in ShuffleBlockFetcherIterator.next, 
> where the task is accessing local shuffle block or encountering a stream 
> corruption. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26614) Speculation kill might cause job failure

2019-01-15 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743569#comment-16743569
 ] 

liupengcheng commented on SPARK-26614:
--

Already resolved by https://github.com/apache/spark/pull/20987

> Speculation kill might cause job failure
> 
>
> Key: SPARK-26614
> URL: https://issues.apache.org/jira/browse/SPARK-26614
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2
>
>
> This issue is similar to SPARK-26612
> Some odd exceptions might be thrown in speculation kill, however, currently 
> spark does not handle this case and will report failure to Driver. This 
> exception will be counting towards MAX_TASK_FAILURES, and might result in the 
> job failure.
>  
> I think we can check state of task to tell if we report failure or killed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26612) Speculation kill causing finished stage recomputed

2019-01-15 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng resolved SPARK-26612.
--
   Resolution: Fixed
Fix Version/s: 2.2.2

> Speculation kill causing finished stage recomputed
> --
>
> Key: SPARK-26612
> URL: https://issues.apache.org/jira/browse/SPARK-26612
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2
>
>
> In our production spark cluster, we encoutered this issue.
> A more detailed explaination:
> Let's say we have two stage: stage0.0 and stage1.0, and stage 0 is a 
> shuffleMapStage, and stage1 has dependency on stage0, and we enabled 
> spark.speculation.
> when task0.0 of stage1.0 finished, and is trying to kill task0.1(speculative) 
> of stage1.0, task0.1 throws a wrapped FetchFailedException whose root cause 
> is  java.nio.channels.ClosedByInterruptException(caused by speculation kill).
> Exception stack:
> {code:java}
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply$mcZ$sp(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.tryThrowFetchFailedException(BlockStoreShuffleReader.scala:127)
> at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.hasNext(BlockStoreShuffleReader.scala:148)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:308)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer{file=/home/work/hdd6/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1505730831071_76097/blockmgr-bb226ff8-dd5f-4296-b3cc-ce7ff5cc60cc/37/shuffle_1_1182_0.data,
>  offset=17789166, length=35709}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:114)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:371)
> ... 26 more
> Caused by: java.nio.channels.ClosedByInterruptException
> at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:155)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
> {code}
> Seems in latest spark version, this problem still exists! 
> FetchFailedException might be throwed in ShuffleBlockFetcherIterator.next, 
> where the task is accessing local shuffle block or encountering a stream 
> corruption. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26614) Speculation kill might cause job failure

2019-01-15 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng resolved SPARK-26614.
--
   Resolution: Fixed
Fix Version/s: 2.2.2

> Speculation kill might cause job failure
> 
>
> Key: SPARK-26614
> URL: https://issues.apache.org/jira/browse/SPARK-26614
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
> Fix For: 2.2.2
>
>
> This issue is similar to SPARK-26612
> Some odd exceptions might be thrown in speculation kill, however, currently 
> spark does not handle this case and will report failure to Driver. This 
> exception will be counting towards MAX_TASK_FAILURES, and might result in the 
> job failure.
>  
> I think we can check state of task to tell if we report failure or killed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26530) Validate heartheat arguments in HeartbeatReceiver

2019-01-14 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26530:
-
Summary: Validate heartheat arguments in HeartbeatReceiver  (was: Validate 
heartheat arguments in SparkSubmitArguments)

> Validate heartheat arguments in HeartbeatReceiver
> -
>
> Key: SPARK-26530
> URL: https://issues.apache.org/jira/browse/SPARK-26530
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, heartbeat related arguments is not validated in spark, so if these 
> args are inproperly specified, the Application may run for a while and not 
> failed until the max executor failures reached(especially with 
> spark.dynamicAllocation.enabled=true), thus may incurs resources waste.
> We shall do validation before submit to cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26614) Speculation kill might cause job failure

2019-01-14 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26614:


 Summary: Speculation kill might cause job failure
 Key: SPARK-26614
 URL: https://issues.apache.org/jira/browse/SPARK-26614
 Project: Spark
  Issue Type: Bug
  Components: Scheduler, Spark Core
Affects Versions: 2.1.0
Reporter: liupengcheng


This issue is similar 
to[SPARK-26612|[http://issues.apache.org/jira/browse/SPARK-26612]]

Some odd exceptions might be thrown in speculation kill, however, currently 
spark does not handle this case and will report failure to Driver. This 
exception will be counting towards MAX_TASK_FAILURES, and might result in the 
job failure.

 

I think we can check state of task to tell if we report failure or killed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26612) Speculation kill causing finished stage recomputed

2019-01-14 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26612:


 Summary: Speculation kill causing finished stage recomputed
 Key: SPARK-26612
 URL: https://issues.apache.org/jira/browse/SPARK-26612
 Project: Spark
  Issue Type: Bug
  Components: Scheduler, Spark Core
Affects Versions: 2.1.0
Reporter: liupengcheng


In our production spark cluster, we encoutered this issue.

A more detailed explaination:

Let's say we have two stage: stage0.0 and stage1.0, and stage 0 is a 
shuffleMapStage, and stage1 has dependency on stage0, and we enabled 
spark.speculation.

when task0.0 of stage1.0 finished, and is trying to kill task0.1(speculative) 
of stage1.0, task0.1 throws a wrapped FetchFailedException whose root cause is  
java.nio.channels.ClosedByInterruptException(caused by speculation kill).

Exception stack:
{code:java}
at 
org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply$mcZ$sp(BlockStoreShuffleReader.scala:148)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator$$anonfun$hasNext$1.apply(BlockStoreShuffleReader.scala:148)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.tryThrowFetchFailedException(BlockStoreShuffleReader.scala:127)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader$ProcessFetchFailedIterator.hasNext(BlockStoreShuffleReader.scala:148)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:308)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer{file=/home/work/hdd6/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1505730831071_76097/blockmgr-bb226ff8-dd5f-4296-b3cc-ce7ff5cc60cc/37/shuffle_1_1182_0.data,
 offset=17789166, length=35709}
at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:114)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:371)
... 26 more
Caused by: java.nio.channels.ClosedByInterruptException
at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:155)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
{code}
Seems in latest spark version, this problem still exists! FetchFailedException 
might be throwed in ShuffleBlockFetcherIterator.next, where the task is 
accessing local shuffle block or encountering a stream corruption. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26614) Speculation kill might cause job failure

2019-01-14 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26614:
-
Description: 
This issue is similar to SPARK-26612

Some odd exceptions might be thrown in speculation kill, however, currently 
spark does not handle this case and will report failure to Driver. This 
exception will be counting towards MAX_TASK_FAILURES, and might result in the 
job failure.

 

I think we can check state of task to tell if we report failure or killed.

  was:
This issue is similar 
to[SPARK-26612|[http://issues.apache.org/jira/browse/SPARK-26612]]

Some odd exceptions might be thrown in speculation kill, however, currently 
spark does not handle this case and will report failure to Driver. This 
exception will be counting towards MAX_TASK_FAILURES, and might result in the 
job failure.

 

I think we can check state of task to tell if we report failure or killed.


> Speculation kill might cause job failure
> 
>
> Key: SPARK-26614
> URL: https://issues.apache.org/jira/browse/SPARK-26614
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: liupengcheng
>Priority: Major
>
> This issue is similar to SPARK-26612
> Some odd exceptions might be thrown in speculation kill, however, currently 
> spark does not handle this case and will report failure to Driver. This 
> exception will be counting towards MAX_TASK_FAILURES, and might result in the 
> job failure.
>  
> I think we can check state of task to tell if we report failure or killed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26126) Put scala-library deps into root pom instead of spark-tags module

2019-01-06 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng resolved SPARK-26126.
--
Resolution: Not A Problem

> Put scala-library deps into root pom instead of spark-tags module
> -
>
> Key: SPARK-26126
> URL: https://issues.apache.org/jira/browse/SPARK-26126
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.3.0, 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> When I do some backport in our custom spark, I notice some strange code from 
> spark-tags module:
> {code:java}
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
> 
> {code}
> As i known, should spark-tags only contains some annotation related classes 
> or deps?
> should we put the scala-library deps to root pom?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26529) Add debug logs for confArchive when preparing local resource

2019-01-04 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26529:
-
Summary: Add debug logs for confArchive when preparing local resource   
(was: Add logs for IOException when preparing local resource )

> Add debug logs for confArchive when preparing local resource 
> -
>
> Key: SPARK-26529
> URL: https://issues.apache.org/jira/browse/SPARK-26529
> Project: Spark
>  Issue Type: Improvement
>  Components: Deploy, Spark Core
>Affects Versions: 2.3.2, 2.4.0
>Reporter: liupengcheng
>Priority: Trivial
>
> Currently, `Client#createConfArchive` do not handle IOException, and some 
> detail info is not provided in logs. Sometimes, this may delay the time of 
> locating the root cause of io error.
> A case happened in our production environment is that local disk is full, and 
> the following exception is thrown but no detail path info provided. we have 
> to investigate all the local disk of the machine to find out the root cause.
> {code:java}
> Exception in thread "main" java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:345)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.ZipOutputStream.closeEntry(ZipOutputStream.java:238)
> at java.util.zip.ZipOutputStream.finish(ZipOutputStream.java:343)
> at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:238)
> at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:360)
> at org.apache.spark.deploy.yarn.Client.createConfArchive(Client.scala:769)
> at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:657)
> at 
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
> at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:177)
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1202)
> at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1261)
> at org.apache.spark.deploy.yarn.Client.main(Client.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:767)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:189)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:214)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
> It make sense for us to catch the IOException and print some useful 
> information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26126) Put scala-library deps into root pom instead of spark-tags module

2019-01-04 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26126:
-
Summary: Put scala-library deps into root pom instead of spark-tags module  
(was: Should put scala-library deps into root pom instead of spark-tags module)

> Put scala-library deps into root pom instead of spark-tags module
> -
>
> Key: SPARK-26126
> URL: https://issues.apache.org/jira/browse/SPARK-26126
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.3.0, 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> When I do some backport in our custom spark, I notice some strange code from 
> spark-tags module:
> {code:java}
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
> 
> {code}
> As i known, should spark-tags only contains some annotation related classes 
> or deps?
> should we put the scala-library deps to root pom?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26126) Should put scala-library deps into root pom instead of spark-tags module

2019-01-03 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16733836#comment-16733836
 ] 

liupengcheng edited comment on SPARK-26126 at 1/4/19 5:41 AM:
--

[~hyukjin.kwon] Yes, there is no actual problem happening, but put the 
dependency of scala-library into spark-tags module is confusing.

If you agree that we shall put it into root pom for better understanding, I can 
put a PR for this issue or you can just leave it resovled.


was (Author: liupengcheng):
[~hyukjin.kwon] Yes, there is no actual problem happening, but put the 
dependency of scala-library into spark-tags module is confusing.

If you agree that we shall put it into root pom for better understanding, I can 
put a PR for this issue.

> Should put scala-library deps into root pom instead of spark-tags module
> 
>
> Key: SPARK-26126
> URL: https://issues.apache.org/jira/browse/SPARK-26126
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.3.0, 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> When I do some backport in our custom spark, I notice some strange code from 
> spark-tags module:
> {code:java}
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
> 
> {code}
> As i known, should spark-tags only contains some annotation related classes 
> or deps?
> should we put the scala-library deps to root pom?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26126) Should put scala-library deps into root pom instead of spark-tags module

2019-01-03 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16733836#comment-16733836
 ] 

liupengcheng commented on SPARK-26126:
--

[~hyukjin.kwon] Yes, there is no actual problem happening, but put the 
dependency of scala-library into spark-tags module is confusing.

If you agree that we shall put it into root pom for better understanding, I can 
put a PR for this issue.

> Should put scala-library deps into root pom instead of spark-tags module
> 
>
> Key: SPARK-26126
> URL: https://issues.apache.org/jira/browse/SPARK-26126
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.3.0, 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> When I do some backport in our custom spark, I notice some strange code from 
> spark-tags module:
> {code:java}
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
> 
> {code}
> As i known, should spark-tags only contains some annotation related classes 
> or deps?
> should we put the scala-library deps to root pom?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26530) Validate heartheat arguments in SparkSubmitArguments

2019-01-03 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26530:


 Summary: Validate heartheat arguments in SparkSubmitArguments
 Key: SPARK-26530
 URL: https://issues.apache.org/jira/browse/SPARK-26530
 Project: Spark
  Issue Type: Improvement
  Components: Deploy, Spark Core
Affects Versions: 2.4.0, 2.3.2
Reporter: liupengcheng


Currently, heartbeat related arguments is not validated in spark, so if these 
args are inproperly specified, the Application may run for a while and not 
failed until the max executor failures reached(especially with 
spark.dynamicAllocation.enabled=true), thus may incurs resources waste.

We shall do validation before submit to cluster.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26529) Add logs for IOException when preparing local resource

2019-01-03 Thread liupengcheng (JIRA)
liupengcheng created SPARK-26529:


 Summary: Add logs for IOException when preparing local resource 
 Key: SPARK-26529
 URL: https://issues.apache.org/jira/browse/SPARK-26529
 Project: Spark
  Issue Type: Improvement
  Components: Deploy, Spark Core
Affects Versions: 2.4.0, 2.3.2
Reporter: liupengcheng


Currently, `Client#createConfArchive` do not handle IOException, and some 
detail info is not provided in logs. Sometimes, this may delay the time of 
locating the root cause of io error.

A case happened in our production environment is that local disk is full, and 
the following exception is thrown but no detail path info provided. we have to 
investigate all the local disk of the machine to find out the root cause.
{code:java}
Exception in thread "main" java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
at java.util.zip.ZipOutputStream.closeEntry(ZipOutputStream.java:238)
at java.util.zip.ZipOutputStream.finish(ZipOutputStream.java:343)
at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:238)
at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:360)
at org.apache.spark.deploy.yarn.Client.createConfArchive(Client.scala:769)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:657)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:895)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:177)
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1202)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1261)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:767)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:214)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
It make sense for us to catch the IOException and print some useful information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26126) Should put scala-library deps into root pom instead of spark-tags module

2019-01-03 Thread liupengcheng (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16733738#comment-16733738
 ] 

liupengcheng commented on SPARK-26126:
--

[~hyukjin.kwon] it's an issue, it's really doesn't matter, but it's just 
confusing.

> Should put scala-library deps into root pom instead of spark-tags module
> 
>
> Key: SPARK-26126
> URL: https://issues.apache.org/jira/browse/SPARK-26126
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.3.0, 2.4.0
>Reporter: liupengcheng
>Priority: Minor
>
> When I do some backport in our custom spark, I notice some strange code from 
> spark-tags module:
> {code:java}
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
> 
> {code}
> As i known, should spark-tags only contains some annotation related classes 
> or deps?
> should we put the scala-library deps to root pom?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26525) Fast release memory of ShuffleBlockFetcherIterator

2019-01-03 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26525:
-
Description: 
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak.

An example is Shuffle -> map -> Coalesce(shuffle = false). Each 
ShuffleBlockFetcherIterator contains  some metas about 
MapStatus(blocksByAddress) and each ShuffleMapTask will keep n(max to shuffle 
partitions) shuffleBlockFetcherIterator for they are refered by 
onCompleteCallbacks of TaskContext, in some case, it may take huge memory and 
the memory will not released until the task finished.

Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed.

  was:
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak.

An example is Shuffle -> map -> Coalesce(shuffle = false). Each ShuffleMapTask 
will keep n(max to shuffle partitions) shuffleBlockFetcherIterator for they are 
refered by onCompleteCallbacks of TaskContext, and each 
ShuffleBlockFetcherIterator contains  some metas about 
MapStatus(blocksByAddress), in some case, it may take huge memory and the 
memory will not released until the task finished.

Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed.


> Fast release memory of ShuffleBlockFetcherIterator
> --
>
> Key: SPARK-26525
> URL: https://issues.apache.org/jira/browse/SPARK-26525
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.2
>Reporter: liupengcheng
>Priority: Major
>
> Currently, spark would not release ShuffleBlockFetcherIterator until the 
> whole task finished.
> In some conditions, it incurs memory leak.
> An example is Shuffle -> map -> Coalesce(shuffle = false). Each 
> ShuffleBlockFetcherIterator contains  some metas about 
> MapStatus(blocksByAddress) and each ShuffleMapTask will keep n(max to shuffle 
> partitions) shuffleBlockFetcherIterator for they are refered by 
> onCompleteCallbacks of TaskContext, in some case, it may take huge memory and 
> the memory will not released until the task finished.
> Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26525) Fast release memory of ShuffleBlockFetcherIterator

2019-01-03 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26525:
-
Description: 
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak.

An example is Shuffle -> map -> Coalesce(shuffle = false). Each ShuffleMapTask 
will keep n(max to shuffle partitions) shuffleBlockFetcherIterator for they are 
refered by onCompleteCallbacks of TaskContext, and each 
ShuffleBlockFetcherIterator contains  some metas about 
MapStatus(blocksByAddress), in some case, it may take huge memory and the 
memory will not released until the task finished.

Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed.

  was:
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak, because it contains  some metas 
about MapStatus(blocksByAddress), which may take huge memory.

An example is Shuffle -> map -> Coalesce(shuffle = false), each ShuffleMapTask 
will keep n(max to shuffle partitions) shuffleBlockFetcherIterator for they are 
refered by onCompleteCallbacks of TaskContext.

We can release ShuffleBlockFetcherIterator as soon as it's consumed.


> Fast release memory of ShuffleBlockFetcherIterator
> --
>
> Key: SPARK-26525
> URL: https://issues.apache.org/jira/browse/SPARK-26525
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.2
>Reporter: liupengcheng
>Priority: Major
>
> Currently, spark would not release ShuffleBlockFetcherIterator until the 
> whole task finished.
> In some conditions, it incurs memory leak.
> An example is Shuffle -> map -> Coalesce(shuffle = false). Each 
> ShuffleMapTask will keep n(max to shuffle partitions) 
> shuffleBlockFetcherIterator for they are refered by onCompleteCallbacks of 
> TaskContext, and each ShuffleBlockFetcherIterator contains  some metas about 
> MapStatus(blocksByAddress), in some case, it may take huge memory and the 
> memory will not released until the task finished.
> Actually, We can release ShuffleBlockFetcherIterator as soon as it's consumed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26525) Fast release memory of ShuffleBlockFetcherIterator

2019-01-03 Thread liupengcheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liupengcheng updated SPARK-26525:
-
Description: 
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak, because it contains  some metas 
about MapStatus(blocksByAddress), which may take huge memory.

An example is Shuffle -> map -> Coalesce(shuffle = false), each ShuffleMapTask 
will keep n(max to shuffle partitions) shuffleBlockFetcherIterator for they are 
refered by onCompleteCallbacks of TaskContext.

We can release ShuffleBlockFetcherIterator as soon as it's consumed.

  was:
Currently, spark would not release ShuffleBlockFetcherIterator until the whole 
task finished.

In some conditions, it incurs memory leak.

An example is Shuffle -> map -> Coalesce(shuffle = false), each ShuffleMapTask 
will keep n(max to shuffle partitions) shuffleBlockFetcherIterator for they are 
refered by onCompleteCallbacks of TaskContext.

We can release ShuffleBlockFetcherIterator as soon as it's consumed.


> Fast release memory of ShuffleBlockFetcherIterator
> --
>
> Key: SPARK-26525
> URL: https://issues.apache.org/jira/browse/SPARK-26525
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.3.2
>Reporter: liupengcheng
>Priority: Major
>
> Currently, spark would not release ShuffleBlockFetcherIterator until the 
> whole task finished.
> In some conditions, it incurs memory leak, because it contains  some metas 
> about MapStatus(blocksByAddress), which may take huge memory.
> An example is Shuffle -> map -> Coalesce(shuffle = false), each 
> ShuffleMapTask will keep n(max to shuffle partitions) 
> shuffleBlockFetcherIterator for they are refered by onCompleteCallbacks of 
> TaskContext.
> We can release ShuffleBlockFetcherIterator as soon as it's consumed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >