> On 27 Apr 2021, at 08:39, Thomas Fredriksen(External) 
> <thomas.fredrik...@cognite.com> wrote:
> 
> Thank you, this is very informative.
> 
> We tried reducing the JdbcIO batch size from 10000 to 1000, then to 100. In 
> our runs, we no longer see the explicit OOM-error, but we are seeing executor 
> heartbeat timeouts. From what we understand, this is typically caused by 
> OOM-errors also. However, the stage in question is ready from a web server 
> that can be slow to respond. Could it be that the request to the web server 
> is locking the executor long enough to cause the heartbeat timeout?

Well, I think it can but I don’t see how it can lead to OOM in this case. Did 
you see something about this in logs?

> 
> On Mon, Apr 26, 2021 at 1:48 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> 
> 
>> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) 
>> <thomas.fredrik...@cognite.com <mailto:thomas.fredrik...@cognite.com>> wrote:
>> 
>> The stack-trace for the OOM:
>> 
>> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, 
>> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: 
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>> at 
>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>  Source)
> 
> It may be caused by a large total size of batched records before WriteFn 
> flushes them. 
> Did you try to decrease the number by “withBatchSize(long)” (by default, it’s 
> 1000) ? [1]
> 
> [1] 
> https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-
>  
> <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long->
>> at 
>> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>> at 
>> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
>> at 
>> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>> at 
>> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>> at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>> at 
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979)
>> at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
>> at org.apache.spark.scheduler.Task.run(Task.scala:113)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
>> 
>> I should note this exception is not always printed. The issue is usually 
>> represented by an ExecutorLostFailure (I am assuming this is caused by an 
>> OOM-error):
>> 
>> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: 
>> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
>> to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: 
>> Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): 
>> ExecutorLostFailure (executor 3 exited caused by one of the running tasks) 
>> Reason: Executor heartbeat timed out after 240254 ms
>> Driver stacktrace:
>> at 
>> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
>> at 
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
>> at 
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
>> at 
>> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79)
>> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
>> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93)
>> at 
>> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203)
>> at 
>> org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209)
>> at 
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1)
>> at 
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44)
>> at 
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46)
>> at 
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1)
>> at 
>> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6)
>> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
>> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
>> at 
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
>> at 
>> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
>> at 
>> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
>> at 
>> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
>> at 
>> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
>> at 
>> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215)
>> at 
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202)
>> at 
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
>> at 
>> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667)
>> at 
>> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373)
>> at 
>> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at 
>> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
>> at 
>> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
>> at 
>> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644)
>> at scala.util.Try$.apply(Try.scala:192)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337)
>> at 
>> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage 
>> failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task 
>> 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure 
>> (executor 3 exited caused by one of the running tasks) Reason: Executor 
>> heartbeat timed out after 240254 ms
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org 
>> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
>> at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
>> at scala.Option.foreach(Option.scala:257)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977)
>> at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
>> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977)
>> at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359)
>> at 
>> org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
>> at 
>> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127)
>> at 
>> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228)
>> at 
>> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ... 1 more
>> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for 
>> 4169729928902178844_6376508440917463187_job-95-run-16-action-295
>> 
>> xxx
>> 
>> On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> Hi Thomas,
>> 
>> Could you share the stack trace of your OOM and, if possible, the code 
>> snippet of your pipeline? 
>> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may 
>> lead to OOM with SparkRunner.
>> 
>> —
>> Alexey
>> 
>> 
>> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) 
>> > <thomas.fredrik...@cognite.com <mailto:thomas.fredrik...@cognite.com>> 
>> > wrote:
>> > 
>> > Good morning,
>> > 
>> > We are ingesting a very large dataset into our database using Beam on 
>> > Spark. The dataset is available through a REST-like API and is splicedin 
>> > such a way so that in order to obtain the whole dataset, we must do around 
>> > 24000 API calls.
>> > 
>> > All in all, this results in 24000 CSV files that need to be parsed then 
>> > written to our database.
>> > 
>> > Unfortunately, we are encountering some OutOfMemoryErrors along the way. 
>> > From what we have gathered, this is due to the data being queued between 
>> > transforms in the pipeline. In order to mitigate this, we have tried to 
>> > implement a streaming-scheme where the requests streamed to the request 
>> > executor, the flows to the database. This too produced the OOM-error.
>> > 
>> > What are the best ways of implementing such pipelines so as to minimize 
>> > the memory footprint? Are there any differences between runners we should 
>> > be aware of here? (e.g. between Dataflow and Spark)
>> 
> 

Reply via email to