https://issues.apache.org/jira/browse/BEAM-1403
Von: Bergmann, Rico (GfK External) [mailto:[email protected]] Gesendet: Montag, 6. Februar 2017 12:19 An: [email protected] Betreff: AW: possible reasons for exception "Cannot move input watermark time backwards from" See below Von: Amit Sela [mailto:[email protected]] Gesendet: Freitag, 3. Februar 2017 15:31 An: [email protected]<mailto:[email protected]> Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from" OK, this is indeed a different stacktrace - the problem now is in SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix any issues you've encountered. More questions: is your data timestamped ? Yes (but only internally) is your pipeline aware of the timestamp fields (using DoFn#outputWithTimestamp or a source that defines the timestamp)? No, we do not expose timestamps to the pipeline. For the pipeline this are simply fields in a record. Looks like this<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138> is broken anyway, I don't think there's actually time-order guarantee when processing a partition. Could you open a ticket please ? Thanks! I’ll do this! On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) <[email protected]<mailto:[email protected]>> wrote: Due to restricitions in my contract I can not show you the pipeline. But it’s a very complex we are work on for several months already. Also with Beam 0.4.0 Interesting to note is, that we already successfully ran our pipeline with that version. Now in a series of 30 executions about 20 get this exception, the others succeed… The full StackTrace org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329) at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52) at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z at org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) at org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189) at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140) at org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64) at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97) at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Von: Amit Sela [mailto:[email protected]<mailto:[email protected]>] Gesendet: Freitag, 3. Februar 2017 13:10 An: [email protected]<mailto:[email protected]> Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from" Is it the exact same stack trace ? Would you mind sharing the stack trace and the pipeline ? Thanks, Amit On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) <[email protected]<mailto:[email protected]>> wrote: Hi! Thanks for the insights. As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into the same error … ☹ Any further ideas or suggestions? Best, Rico. Von: Amit Sela [mailto:[email protected]<mailto:[email protected]>] Gesendet: Donnerstag, 2. Februar 2017 17:15 An: [email protected]<mailto:[email protected]> Betreff: Re: possible reasons for exception "Cannot move input watermark time backwards from" Hi Rico, Batch sort of uses Watermarks by noting the "start watermark" at the beginning of time, and the "end watermark" at the end of time (this is the "overflow" you see), or in a more "Beam" way, the watermark at the beginning is the start of time, and after processing all the elements the watermark jumps to the end of time because we know there are no more elements left to process. Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that area in the Spark runner, and release is on the way so 0.5.0 should be available within a few days anyway. Thanks, Amit On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) <[email protected]<mailto:[email protected]>> wrote: Hi @all! I’m using Beam 0.4.0 and only the batch processing features of it. While executing the pipeline I get an exception: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z First, since I’m not using the streaming features I’m wondering about watermarks (but this may be an Beam internal thing, I don’t know). Second, the timestamp stated in the exception message is really weird and looks a bit like an overflow in a long value to me. Does anyone have a clue what the reason for this exception could be? Thanks, Rico. Full Stacktrace: 2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster (Logging.scala:logError(95)) - User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112) at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101) at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329) at com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52) at com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) Caused by: java.lang.IllegalStateException: Cannot move input watermark time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) at org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163) at org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89) at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372) at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335) at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143) at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138) at org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91) at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75) at org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ________________________________ GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted. ________________________________ GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted. ________________________________ GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted. ________________________________ GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted. ________________________________ GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any attachments may contain confidential or privileged information. Please note that unauthorized copying, disclosure or distribution of the material in this email is not permitted.
