Hi guys,
I am seeing this error while running my beam job on Dataflow.

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot output with timestamp
2019-03-27T17:46:41.893Z. Output timestamps must be no earlier than the
timestamp of the current input (2019-03-27T17:47:00.526Z) minus the allowed
skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
details on changing the allowed skew.

This is the code. It's quite simple.

static class KeyVal extends DoFn <String, KV<String, Long>> {
  @ProcessElement
  public void processElement(@Element String r,
OutputReceiver<KV<String, Long>> out) {
    String[] keyval = r.split(" ");
    out.outputWithTimestamp(KV.of(keyval[0],
Long.parseLong(keyval[1])),
Instant.ofEpochMilli(Long.pareLong(keyval[2]))) ;}
}

TextIO.Read source =
TextIO.read().from(INPUT_PATH).watchForNewFiles(Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(2)));


 pipeline
    .apply(source)
    .apply(ParDo.of(new KeyVal()))
    .apply(
        Window.<KV<String,Long>>into(
            FixedWindows.of(Duration.standardMinutes(10))
        ).triggering(
            
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2)))
        
).withAllowedLateness(Duration.standardMinutes(20)).accumulatingFiredPanes()
    )...


I got it working using

.apply(WithTimestamps.of((String line) ->
Instant.ofEpochMilli(Long.parseLong(line.split("
")[2]))).withAllowedTimestampSkew(Duration.millis(Long.MAX_VALUE /
2L)))

Documentation says withAllowedTimestampSkew is deprecated and using
withAllowedLateness should take care of it. Is there a better way to add
timestamp to record? Thanks

Nikhil

Reply via email to