> On 27 Apr 2021, at 08:39, Thomas Fredriksen(External) > <thomas.fredrik...@cognite.com> wrote: > > Thank you, this is very informative. > > We tried reducing the JdbcIO batch size from 10000 to 1000, then to 100. In > our runs, we no longer see the explicit OOM-error, but we are seeing executor > heartbeat timeouts. From what we understand, this is typically caused by > OOM-errors also. However, the stage in question is ready from a web server > that can be slow to respond. Could it be that the request to the web server > is locking the executor long enough to cause the heartbeat timeout?
Well, I think it can but I don’t see how it can lead to OOM in this case. Did you see something about this in logs? > > On Mon, Apr 26, 2021 at 1:48 PM Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> wrote: > > >> On 26 Apr 2021, at 13:34, Thomas Fredriksen(External) >> <thomas.fredrik...@cognite.com <mailto:thomas.fredrik...@cognite.com>> wrote: >> >> The stack-trace for the OOM: >> >> 21/04/21 21:40:43 WARN TaskSetManager: Lost task 1.2 in stage 2.0 (TID 57, >> 10.139.64.6, executor 3): org.apache.beam.sdk.util.UserCodeException: >> java.lang.OutOfMemoryError: GC overhead limit exceeded >> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) >> at >> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteVoid$WriteFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) > > It may be caused by a large total size of batched records before WriteFn > flushes them. > Did you try to decrease the number by “withBatchSize(long)” (by default, it’s > 1000) ? [1] > > [1] > https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long- > > <https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/io/jdbc/JdbcIO.WriteVoid.html#withBatchSize-long-> >> at >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) >> at >> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191) >> at >> org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65) >> at >> org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:140) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141) >> at >> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136) >> at >> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979) >> at >> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:979) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2323) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) >> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140) >> at org.apache.spark.scheduler.Task.run(Task.scala:113) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded >> >> I should note this exception is not always printed. The issue is usually >> represented by an ExecutorLostFailure (I am assuming this is caused by an >> OOM-error): >> >> 21/04/21 21:44:52 ERROR ScalaDriverLocal: User Code Stack Trace: >> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due >> to stage failure: Task 5 in stage 2.0 failed 4 times, most recent failure: >> Lost task 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): >> ExecutorLostFailure (executor 3 exited caused by one of the running tasks) >> Reason: Executor heartbeat timed out after 240254 ms >> Driver stacktrace: >> at >> org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104) >> at >> org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92) >> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:79) >> at org.odp.beam.sdk.OdpPipeline.run(OdpPipeline.java:60) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308) >> at org.odp.beam.sdk.OdpPipeline.runThenExit(OdpPipeline.java:93) >> at >> org.odp.pipelines.emodnet_bronze.EmodNetBronze.runPipeline(EmodNetBronze.java:203) >> at >> org.odp.pipelines.emodnet_bronze.EmodNetBronze.main(EmodNetBronze.java:209) >> at >> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command--1:1) >> at >> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw$$iw.<init>(command--1:44) >> at >> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw$$iw.<init>(command--1:46) >> at >> lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw$$iw.<init>(command--1:48) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw$$iw.<init>(command--1:50) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$$iw.<init>(command--1:52) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read.<init>(command--1:54) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<init>(command--1:58) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$read$.<clinit>(command--1) >> at >> lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print$lzycompute(<notebook>:7) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval$.$print(<notebook>:6) >> at lineb2837a4aea8b4382bd297a3df4a6a20d25.$eval.$print(<notebook>) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) >> at >> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645) >> at >> scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644) >> at >> scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) >> at >> scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) >> at >> scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) >> at >> com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215) >> at >> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202) >> at >> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) >> at >> com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) >> at >> com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:714) >> at >> com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:667) >> at >> com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202) >> at >> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:396) >> at >> com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:373) >> at >> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >> at >> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233) >> at >> com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49) >> at >> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:275) >> at >> com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49) >> at >> com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:373) >> at >> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) >> at >> com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) >> at scala.util.Try$.apply(Try.scala:192) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) >> at >> com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.spark.SparkException: Job aborted due to stage >> failure: Task 5 in stage 2.0 failed 4 times, most recent failure: Lost task >> 5.3 in stage 2.0 (TID 62, 10.139.64.6, executor 3): ExecutorLostFailure >> (executor 3 exited caused by one of the running tasks) Reason: Executor >> heartbeat timed out after 240254 ms >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) >> at scala.Option.foreach(Option.scala:257) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2282) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2348) >> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:979) >> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:977) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:392) >> at org.apache.spark.rdd.RDD.foreach(RDD.scala:977) >> at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:359) >> at >> org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45) >> at >> org.apache.beam.runners.spark.translation.BoundedDataset.action(BoundedDataset.java:127) >> at >> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:228) >> at >> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:241) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> ... 1 more >> 21/04/21 21:44:52 INFO ProgressReporter$: Removed result fetcher for >> 4169729928902178844_6376508440917463187_job-95-run-16-action-295 >> >> xxx >> >> On Mon, Apr 26, 2021 at 1:03 PM Alexey Romanenko <aromanenko....@gmail.com >> <mailto:aromanenko....@gmail.com>> wrote: >> Hi Thomas, >> >> Could you share the stack trace of your OOM and, if possible, the code >> snippet of your pipeline? >> Afaik, usually only “large" GroupByKey transforms, caused by “hot keys”, may >> lead to OOM with SparkRunner. >> >> — >> Alexey >> >> >> > On 26 Apr 2021, at 08:23, Thomas Fredriksen(External) >> > <thomas.fredrik...@cognite.com <mailto:thomas.fredrik...@cognite.com>> >> > wrote: >> > >> > Good morning, >> > >> > We are ingesting a very large dataset into our database using Beam on >> > Spark. The dataset is available through a REST-like API and is splicedin >> > such a way so that in order to obtain the whole dataset, we must do around >> > 24000 API calls. >> > >> > All in all, this results in 24000 CSV files that need to be parsed then >> > written to our database. >> > >> > Unfortunately, we are encountering some OutOfMemoryErrors along the way. >> > From what we have gathered, this is due to the data being queued between >> > transforms in the pipeline. In order to mitigate this, we have tried to >> > implement a streaming-scheme where the requests streamed to the request >> > executor, the flows to the database. This too produced the OOM-error. >> > >> > What are the best ways of implementing such pipelines so as to minimize >> > the memory footprint? Are there any differences between runners we should >> > be aware of here? (e.g. between Dataflow and Spark) >> >