Hi Amir,

I believe you should use KafkaIO#withTimestampFn [1]. For unbounded
PCollections, the source itself needs to know about the timestamps so it
can maintain a good watermark.

The example you are editing uses a bounded input, which has different
implications for the watermark. The text in the comment seems to have
strayed from the example code [2].

Hope this helps,

Kenn

[1]
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
[2]
https://github.com/apache/incubator-beam/commit/691a5828b45423940e850bbc64ccc5daf7599c76#diff-a1ce476065c2eb973a02d794d29758e0

On Thu, Aug 4, 2016 at 11:38 AM, amir bahmanyari <[email protected]>
wrote:

> I found in the following method that the time is deliberetly calculated
> within the past 2 hours.
> On the other hand, I get the following exception complaining why its in
> the past!!!
> I appreciate any clarification...
>
> public class WindowedWordCount {
>
> static class *AddTimestampFn* extends DoFn<KV<byte[], String>, String> {
>     private static final Duration RAND_RANGE = Duration.standardHours(2);
>     private final Instant minTimestamp;
>
>     AddTimestampFn() {
>       this.minTimestamp = new Instant(System.currentTimeMillis());
>     }
>
>     @Override
>     public void processElement(ProcessContext c) {
>      * // Generate a timestamp that falls somewhere in the past two
> hours.*
>       long randMillis = (long) (Math.random() * RAND_RANGE.getMillis());
>       Instant randomTimestamp = minTimestamp.plus(randMillis);
>       /**
>        * Concept #2: Set the data element with that timestamp.
>        */
>       c.outputWithTimestamp(c.element().toString()  , new
> Instant(randomTimestamp));
>     }
>   }
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Wednesday, August 3, 2016 7:25 PM
> *Subject:* changing the allowed skew
>
> Hi Colleagues,
> I am basically running the code in example WindowedWordCount.
> The only difference is that I dont TextIO but get records via KakkaIO.
> Everything else the same. I get the following exception.
> Appreciate your suggestions to fix  it..
> Cheers
>
>
> Caused by: java.lang.IllegalArgumentException: Cannot output with
> timestamp 2016-08-04T02:23:21.137Z. Output timestamps must be no earlier
> than the timestamp of the current input (2016-08-04T02:23:22.896Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestmapSkew()
> Javadoc for details on changing the allowed skew.
>         at org.apache.beam.runners.flink.translation.wrappers.streaming.
> FlinkAbstractParDoWrapper.checkTimestamp(FlinkAbstractParDoWrapper.
> java:201)
>
>
>
>
>

Reply via email to