See below

Von: Amit Sela [mailto:[email protected]]
Gesendet: Freitag, 3. Februar 2017 15:31
An: [email protected]
Betreff: Re: possible reasons for exception "Cannot move input watermark time 
backwards from"

OK, this is indeed a different stacktrace - the problem now is in 
SparkGroupAlsoByWIndow which did not exist in 0.4.0, and I hoped would fix any 
issues you've encountered.

More questions: is your data timestamped ?
Yes (but only internally)
is your pipeline aware of the timestamp fields (using DoFn#outputWithTimestamp 
or a source that defines the timestamp)?
No, we do not expose timestamps to the pipeline. For the pipeline this are 
simply fields in a record.

Looks like 
this<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java#L138>
 is broken anyway, I don't think there's actually time-order guarantee when 
processing a partition. Could you open a ticket please ? Thanks!

I’ll do this!

On Fri, Feb 3, 2017 at 4:13 PM Bergmann, Rico (GfK External) 
<[email protected]<mailto:[email protected]>> wrote:

Due to restricitions in my contract I can not show you the pipeline. But it’s a 
very complex we are work on for several months already. Also with Beam 0.4.0

Interesting to note is, that we already successfully ran our pipeline with that 
version. Now in a series of 30 executions about 20 get this exception, the 
others succeed…


The full StackTrace
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Cannot move input watermark time backwards 
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time 
backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at 
org.apache.beam.runners.core.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at 
org.apache.beam.runners.core.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:189)
        at 
org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:140)
        at 
org.apache.beam.runners.spark.translation.SparkGroupAlsoByWindowFn.call(SparkGroupAlsoByWindowFn.java:56)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext.processPartition(SparkProcessContext.java:64)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:97)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:47)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Von: Amit Sela [mailto:[email protected]<mailto:[email protected]>]
Gesendet: Freitag, 3. Februar 2017 13:10

An: [email protected]<mailto:[email protected]>
Betreff: Re: possible reasons for exception "Cannot move input watermark time 
backwards from"


Is it the exact same stack trace ? Would you mind sharing the stack trace and 
the pipeline ?

Thanks,
Amit

On Fri, Feb 3, 2017, 13:58 Bergmann, Rico (GfK External) 
<[email protected]<mailto:[email protected]>> wrote:
Hi!

Thanks for the insights.

As you suggested I tried it with the current beam0.5.0-SNAPSHOT. But ran into 
the same error … ☹

Any further ideas or suggestions?

Best,
Rico.

Von: Amit Sela [mailto:[email protected]<mailto:[email protected]>]
Gesendet: Donnerstag, 2. Februar 2017 17:15
An: [email protected]<mailto:[email protected]>
Betreff: Re: possible reasons for exception "Cannot move input watermark time 
backwards from"

Hi Rico,

Batch sort of uses Watermarks by noting the "start watermark"  at the beginning 
of time, and the "end watermark" at the end of time (this is the "overflow" you 
see), or in a more "Beam" way, the watermark at the beginning is the start of 
time, and after processing all the elements the watermark jumps to the end of 
time because we know there are no more elements left to process.

Could you try 0.5.0-SNAPSHOT please ? there was a large refactor around that 
area in the Spark runner, and release is on the way so 0.5.0 should be 
available within a few days anyway.

Thanks,
Amit

On Thu, Feb 2, 2017 at 5:04 PM Bergmann, Rico (GfK External) 
<[email protected]<mailto:[email protected]>> wrote:
Hi @all!

I’m using Beam 0.4.0 and only the batch processing features of it.
While executing the pipeline I get an exception: Cannot move input watermark 
time backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
First, since I’m not using the streaming features I’m wondering about 
watermarks (but this may be an Beam internal thing, I don’t know).
Second, the timestamp stated in the exception message is really weird and looks 
a bit like an overflow in a long value to me.

Does anyone have a clue what the reason for this exception could be?

Thanks,
Rico.


Full Stacktrace:
2017-02-02 14:31:24,863 ERROR [Driver] yarn.ApplicationMaster 
(Logging.scala:logError(95)) - User class threw exception: 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Cannot move input watermark time backwards 
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Cannot move input watermark time backwards 
from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at 
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:72)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:112)
        at 
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:101)
        at com.gfk.mde.pipeline.MDEPipeline.run(MDEPipeline.java:329)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.run(HiveSourceSparkDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.mainHandlingOozieActionConfiguration(HiveSourceSparkDriver.java:52)
        at 
com.gfk.mde.pipeline.HiveSourceSparkDriver.main(HiveSourceSparkDriver.java:31)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.lang.IllegalStateException: Cannot move input watermark time 
backwards from 294247-01-09T04:00:54.775Z to -290308-12-21T19:59:05.225Z
        at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199)
        at 
org.apache.beam.sdk.util.state.InMemoryTimerInternals.advanceInputWatermark(InMemoryTimerInternals.java:163)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:89)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.invokeProcessElement(SparkProcessContext.java:372)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:335)
        at 
com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
        at 
com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
        at 
org.apache.beam.runners.spark.translation.SparkProcessContext.callWithCtxt(SparkProcessContext.java:91)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:75)
        at 
org.apache.beam.runners.spark.translation.DoFnFunction.call(DoFnFunction.java:43)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:159)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht 
Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of 
the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra 
Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any 
attachments may contain confidential or privileged information. Please note 
that unauthorized copying, disclosure or distribution of the material in this 
email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht 
Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of 
the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra 
Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any 
attachments may contain confidential or privileged information. Please note 
that unauthorized copying, disclosure or distribution of the material in this 
email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht 
Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of 
the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra 
Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any 
attachments may contain confidential or privileged information. Please note 
that unauthorized copying, disclosure or distribution of the material in this 
email is not permitted.

________________________________


GfK SE, Nuremberg, Germany, commercial register at the local court Amtsgericht 
Nuremberg HRB 25014; Management Board: Dr. Gerhard Hausruckinger (Speaker of 
the Management Board), Christian Diedrich (CFO), David Krajicek, Alessandra 
Cama; Chairman of the Supervisory Board: Ralf Klein-Bölting This email and any 
attachments may contain confidential or privileged information. Please note 
that unauthorized copying, disclosure or distribution of the material in this 
email is not permitted.

Reply via email to