> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) 
> <[email protected]> wrote:
> 
> The stack-trace for the OOM:
> 
> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, 
> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at 
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)

It may be caused by a large total size of batched records before WriteFn 
flushes them. 
Did you try to decrease the number by “withBatchSize(long)” (by default, it’s 
1000) ? [1]

[1] 
https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-

> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
> at 
> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
> at 
> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
> at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
> at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
> at org.apache.spark.scheduler.Task.run(Task.scala:113)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> 
> I should note this exception is not always printed. The issue is usually 
> represented by an ExecutorLostFailure (I am assuming this is caused by an 
> OOM-error):
> 
> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: 
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: 
> Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): 
> ExecutorLostFailure (executor 3 exited caused by one of the running tasks) 
> Reason: Executor heartbeat timed out after 240254 ms
> Driver stacktrace:
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
> at 
> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
> at 
> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
> at org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1)
> at 
> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6)
> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
> at 
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
> at 
> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
> at 
> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
> at 
> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
> at 
> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
> at 
> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
> at 
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
> at 
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
> at 
> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
> at 
> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
> at 
> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
> at 
> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
> at 
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
> at 
> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
> at 
> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at 
> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
> at 
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
> at 
> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
> at 
> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
> at 
> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
> at scala.util.Try$.apply(Try.scala:192)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
> at 
> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 5.3 in 
> stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure (executor 3 
> exited caused by one of the running tasks) Reason: Executor heartbeat timed 
> out after 240254 ms
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977)
> at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359)
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
> at 
> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127)
> at 
> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for 
> 4169729928902178844_6376508440917463187_job-95-run-16-action-295
> 
> xxx
> 
> On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Thomas,
> 
> Could you share the stack trace of your OOM and, if possible, the code 
> snippet of your pipeline? 
> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may 
> lead to OOM with SparkRunner.
> 
> —
> Alexey
> 
> 
> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) 
> > <[email protected] <mailto:[email protected]>> 
> > wrote:
> > 
> > Good morning,
> > 
> > We are ingesting a very large dataset into our database using Beam on 
> > Spark. The dataset is available through a REST-like API and is splicedin 
> > such a way so that in order to obtain the whole dataset, we must do around 
> > 24000 API calls.
> > 
> > All in all, this results in 24000 CSV files that need to be parsed then 
> > written to our database.
> > 
> > Unfortunately, we are encountering some OutOfMemoryErrors along the way. 
> > From what we have gathered, this is due to the data being queued between 
> > transforms in the pipeline. In order to mitigate this, we have tried to 
> > implement a streaming-scheme where the requests streamed to the request 
> > executor, the flows to the database. This too produced the OOM-error.
> > 
> > What are the best ways of implementing such pipelines so as to minimize the 
> > memory footprint? Are there any differences between runners we should be 
> > aware of here? (e.g. between Dataflow and Spark)
> 

Reply via email to