I found in the following method that the time is deliberetly calculated within 
the past 2 hours.On the other hand, I get the following exception complaining 
why its in the past!!!I appreciate any clarification...
public class WindowedWordCount {
static class AddTimestampFn extends DoFn<KV<byte[], String>, String> {    
private static final Duration RAND_RANGE = Duration.standardHours(2);    
private final Instant minTimestamp;
    AddTimestampFn() {      this.minTimestamp = new 
Instant(System.currentTimeMillis());    }
    @Override    public void processElement(ProcessContext c) {      // 
Generate a timestamp that falls somewhere in the past two hours.      long 
randMillis = (long) (Math.random() * RAND_RANGE.getMillis());      Instant 
randomTimestamp = minTimestamp.plus(randMillis);      /**       * Concept #2: 
Set the data element with that timestamp.       */      
c.outputWithTimestamp(c.element().toString()  , new Instant(randomTimestamp));  
  }  }

      From: amir bahmanyari <[email protected]>
 To: "[email protected]" <[email protected]> 
 Sent: Wednesday, August 3, 2016 7:25 PM
 Subject: changing the allowed skew
   
Hi Colleagues,I am basically running the code in example WindowedWordCount.The 
only difference is that I dont TextIO but get records via KakkaIO.Everything 
else the same. I get the following exception.Appreciate your suggestions to fix 
 it..Cheers

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 
2016-08-04T02:23:21.137Z. Output timestamps must be no earlier than the 
timestamp of the current input (2016-08-04T02:23:22.896Z) minus the allowed 
skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew() Javadoc for 
details on changing the allowed skew.        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.java:201)



  

Reply via email to