Thank you, this is very informative.

We tried reducing the JdbcIO batch size from 10000 to 1000, then to 100. In
our runs, we no longer see the explicit OOM-error, but we are seeing
executor heartbeat timeouts. From what we understand, this is typically
caused by OOM-errors also. However, the stage in question is ready from a
web server that can be slow to respond. Could it be that the request to the
web server is locking the executor long enough to cause the heartbeat
timeout?

On Mon, Apr 26, 2021 at 1:48 PM Alexey Romanenko <[email protected]>
wrote:

>
>
> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) <
> [email protected]> wrote:
>
> The stack-trace for the OOM:
>
> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57,
>> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> at
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>
>
> It may be caused by a large total size of batched records before WriteFn
> flushes them.
> Did you try to decrease the number by “withBatchSize(long)” (by default,
> it’s 1000) ? [1]
>
> [1]
> https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-
>
> at
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>> at
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
>> at
>> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>> at
>> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>> at
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
>> at org.apache.spark.scheduler.Task.run(Task.scala:113)
>> at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>
> I should note this exception is not always printed. The issue is usually
> represented by an ExecutorLostFailure (I am assuming this is caused by an
> OOM-error):
>
>>
>> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace:
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 5 in stage 2.0 failed 4 times, most recent
>> failure: Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3):
>> ExecutorLostFailure (executor 3 exited caused by one of the running tasks)
>> Reason: Executor heartbeat timed out after 240254 ms
>> Driver stacktrace:
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>> at
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
>> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
>> at
>> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
>> at
>> org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1)
>> at
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
>> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
>> at
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
>> at
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
>> at
>> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
>> at
>> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
>> at
>> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
>> at
>> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
>> at
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
>> at
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
>> at
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
>> at
>> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
>> at
>> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at
>> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
>> at
>> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
>> at
>> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
>> at scala.util.Try$.apply(Try.scala:192)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
>> at
>> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task
>> 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure
>> (executor 3 exited caused by one of the running tasks) Reason: Executor
>> heartbeat timed out after 240254 ms
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> <http://org.apache.spark.scheduler.dagscheduler.org/>
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
>> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359)
>> at
>> org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
>> at
>> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127)
>> at
>> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228)
>> at
>> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ... 1 more
>> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for
>> 4169729928902178844_6376508440917463187_job-95-run-16-action-295
>>
>
> xxx
>
> On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <[email protected]>
> wrote:
>
>> Hi Thomas,
>>
>> Could you share the stack trace of your OOM and, if possible, the code
>> snippet of your pipeline?
>> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”,
>> may lead to OOM with SparkRunner.
>>
>> —
>> Alexey
>>
>>
>> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) <
>> [email protected]> wrote:
>> >
>> > Good morning,
>> >
>> > We are ingesting a very large dataset into our database using Beam on
>> Spark. The dataset is available through a REST-like API and is splicedin
>> such a way so that in order to obtain the whole dataset, we must do around
>> 24000 API calls.
>> >
>> > All in all, this results in 24000 CSV files that need to be parsed then
>> written to our database.
>> >
>> > Unfortunately, we are encountering some OutOfMemoryErrors along the
>> way. From what we have gathered, this is due to the data being queued
>> between transforms in the pipeline. In order to mitigate this, we have
>> tried to implement a streaming-scheme where the requests streamed to the
>> request executor, the flows to the database. This too produced the
>> OOM-error.
>> >
>> > What are the best ways of implementing such pipelines so as to minimize
>> the memory footprint? Are there any differences between runners we should
>> be aware of here? (e.g. between Dataflow and Spark)
>>
>>
>

Reply via email to