Hi Trevor,

Beam Portable Spark pipeline in the end is just a Spark pipeline which you run 
on Spark cluster. So, all resources are managed by processing engine (Spark in 
your case) and cluster configuration, Beam doesn’t handle errors on this level.

So, on your place, I’d investigate this issue on Spark/EMR level. Quite likely 
that you are facing OOM limits on Spark workers when you run it on a mixed 
cluster.

—
Alexey  

> On 1 Jul 2021, at 13:19, Trevor Kramer <trevordkra...@gmail.com> wrote:
> 
> Hi everyone. We have a Beam pipeline running using the portable Spark runner 
> on EMR. If we use 100% on-demand Core nodes the pipeline finishes 
> successfully. If we run a mix of on-demand Core nodes and spot Task nodes the 
> pipeline fails every time with the following error. Does Beam have resiliency 
> against losing nodes and does it schedule with awareness of Core vs Task 
> nodes?
> 
> Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Job 
> aborted due to stage failure: A shuffle map stage with indeterminate output 
> was failed and retried. However, Spark cannot rollback the ShuffleMapStage 5 
> to re-process the input data, and has to fail this job. Please eliminate the 
> indeterminacy by checkpointing the RDD before repartition and try again.
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>       at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>       at 
> org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:199)
>       at 
> org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
>       ... 5 more
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> A shuffle map stage with indeterminate output was failed and retried. 
> However, Spark cannot rollback the ShuffleMapStage 5 to re-process the input 
> data, and has to fail this job. Please eliminate the indeterminacy by 
> checkpointing the RDD before repartition and try again.
>       at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2136)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2124)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2123)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1674)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1666)
>       at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> 
> Thanks,
> 
> Trevor

  • SparkRunner Trevor Kramer
    • Re: SparkRunner Alexey Romanenko

Reply via email to