Hi Nikhil,

If I understand it correctly your DoFn (KeyVal) class may emit records
whose timestamp is before the input's, in this case I think you could
override the default behavior of getAllowedTimestampSkew() in DoFn to allow
infinite skew.

To be specific, you can add the following function to your KeyVal class:
@Override public Duration getAllowedTimestampSkew() {return
Duration.millis(Long.MAX_VALUE);}

Hope this helps!
Robin

On Wed, Mar 27, 2019 at 11:44 AM Nikhil Goyal <[email protected]> wrote:

> Hi guys,
> I am seeing this error while running my beam job on Dataflow.
>
> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalArgumentException: Cannot output with timestamp
> 2019-03-27T17:46:41.893Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-03-27T17:47:00.526Z) minus the allowed
> skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
> details on changing the allowed skew.
>
> This is the code. It's quite simple.
>
> static class KeyVal extends DoFn <String, KV<String, Long>> {
>   @ProcessElement
>   public void processElement(@Element String r, OutputReceiver<KV<String, 
> Long>> out) {
>     String[] keyval = r.split(" ");
>     out.outputWithTimestamp(KV.of(keyval[0], Long.parseLong(keyval[1])), 
> Instant.ofEpochMilli(Long.pareLong(keyval[2]))) ;}
> }
>
> TextIO.Read source = 
> TextIO.read().from(INPUT_PATH).watchForNewFiles(Duration.standardMinutes(1), 
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(2)));
>
>
>  pipeline
>     .apply(source)
>     .apply(ParDo.of(new KeyVal()))
>     .apply(
>         Window.<KV<String,Long>>into(
>             FixedWindows.of(Duration.standardMinutes(10))
>         ).triggering(
>             
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2)))
>         
> ).withAllowedLateness(Duration.standardMinutes(20)).accumulatingFiredPanes()
>     )...
>
>
> I got it working using
>
> .apply(WithTimestamps.of((String line) -> 
> Instant.ofEpochMilli(Long.parseLong(line.split(" 
> ")[2]))).withAllowedTimestampSkew(Duration.millis(Long.MAX_VALUE / 2L)))
>
> Documentation says withAllowedTimestampSkew is deprecated and using
> withAllowedLateness should take care of it. Is there a better way to add
> timestamp to record? Thanks
>
> Nikhil
>
>
>

Reply via email to