Hi Nikhil,
If I understand it correctly your DoFn (KeyVal) class may emit records
whose timestamp is before the input's, in this case I think you could
override the default behavior of getAllowedTimestampSkew() in DoFn to allow
infinite skew.
To be specific, you can add the following function to your KeyVal class:
@Override public Duration getAllowedTimestampSkew() {return
Duration.millis(Long.MAX_VALUE);}
Hope this helps!
Robin
On Wed, Mar 27, 2019 at 11:44 AM Nikhil Goyal <[email protected]> wrote:
> Hi guys,
> I am seeing this error while running my beam job on Dataflow.
>
> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2019-03-27T17:46:41.893Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-03-27T17:47:00.526Z) minus the allowed
> skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
> details on changing the allowed skew.
>
> This is the code. It's quite simple.
>
> static class KeyVal extends DoFn <String, KV<String, Long>> {
> @ProcessElement
> public void processElement(@Element String r, OutputReceiver<KV<String,
> Long>> out) {
> String[] keyval = r.split(" ");
> out.outputWithTimestamp(KV.of(keyval[0], Long.parseLong(keyval[1])),
> Instant.ofEpochMilli(Long.pareLong(keyval[2]))) ;}
> }
>
> TextIO.Read source =
> TextIO.read().from(INPUT_PATH).watchForNewFiles(Duration.standardMinutes(1),
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(2)));
>
>
> pipeline
> .apply(source)
> .apply(ParDo.of(new KeyVal()))
> .apply(
> Window.<KV<String,Long>>into(
> FixedWindows.of(Duration.standardMinutes(10))
> ).triggering(
>
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2)))
>
> ).withAllowedLateness(Duration.standardMinutes(20)).accumulatingFiredPanes()
> )...
>
>
> I got it working using
>
> .apply(WithTimestamps.of((String line) ->
> Instant.ofEpochMilli(Long.parseLong(line.split("
> ")[2]))).withAllowedTimestampSkew(Duration.millis(Long.MAX_VALUE / 2L)))
>
> Documentation says withAllowedTimestampSkew is deprecated and using
> withAllowedLateness should take care of it. Is there a better way to add
> timestamp to record? Thanks
>
> Nikhil
>
>
>