I want to just highlight some aspects of the docs for this thread:

 - getAllowedTimestampSkew() does not adjust the watermark, so it can be
dangerous and you have to carefully set up allowed lateness too
 - the replacement for getAllowedTimestampSkew does not exist yet, but is
described in https://issues.apache.org/jira/browse/BEAM-644

I was browsing the code & docs of TextIO / FileIO / MatchConfiguration and
I did not see any way to influence the input event timestamps or watermark.
Does it exist?

Kenn

On Wed, Mar 27, 2019 at 12:06 PM Robin Qiu <[email protected]> wrote:

> Hi Nikhil,
>
> If I understand it correctly your DoFn (KeyVal) class may emit records
> whose timestamp is before the input's, in this case I think you could
> override the default behavior of getAllowedTimestampSkew() in DoFn to allow
> infinite skew.
>
> To be specific, you can add the following function to your KeyVal class:
> @Override public Duration getAllowedTimestampSkew() {return
> Duration.millis(Long.MAX_VALUE);}
>
> Hope this helps!
> Robin
>
> On Wed, Mar 27, 2019 at 11:44 AM Nikhil Goyal <[email protected]> wrote:
>
>> Hi guys,
>> I am seeing this error while running my beam job on Dataflow.
>>
>> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.IllegalArgumentException: Cannot output with timestamp
>> 2019-03-27T17:46:41.893Z. Output timestamps must be no earlier than the
>> timestamp of the current input (2019-03-27T17:47:00.526Z) minus the allowed
>> skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
>> details on changing the allowed skew.
>>
>> This is the code. It's quite simple.
>>
>> static class KeyVal extends DoFn <String, KV<String, Long>> {
>>   @ProcessElement
>>   public void processElement(@Element String r, OutputReceiver<KV<String, 
>> Long>> out) {
>>     String[] keyval = r.split(" ");
>>     out.outputWithTimestamp(KV.of(keyval[0], Long.parseLong(keyval[1])), 
>> Instant.ofEpochMilli(Long.pareLong(keyval[2]))) ;}
>> }
>>
>> TextIO.Read source = 
>> TextIO.read().from(INPUT_PATH).watchForNewFiles(Duration.standardMinutes(1), 
>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(2)));
>>
>>
>>  pipeline
>>     .apply(source)
>>     .apply(ParDo.of(new KeyVal()))
>>     .apply(
>>         Window.<KV<String,Long>>into(
>>             FixedWindows.of(Duration.standardMinutes(10))
>>         ).triggering(
>>             
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2)))
>>         
>> ).withAllowedLateness(Duration.standardMinutes(20)).accumulatingFiredPanes()
>>     )...
>>
>>
>> I got it working using
>>
>> .apply(WithTimestamps.of((String line) -> 
>> Instant.ofEpochMilli(Long.parseLong(line.split(" 
>> ")[2]))).withAllowedTimestampSkew(Duration.millis(Long.MAX_VALUE / 2L)))
>>
>> Documentation says withAllowedTimestampSkew is deprecated and using
>> withAllowedLateness should take care of it. Is there a better way to add
>> timestamp to record? Thanks
>>
>> Nikhil
>>
>>
>>

Reply via email to