That’s exactly what I was looking for. Thank you.
> On Jul 7, 2016, at 4:51 PM, amir bahmanyari <[email protected]> wrote:
>
> KafkaIO .withTimestampFn event time should do it...
> I dont have that in my code, but can try it this eve.
> Cheers
>
>
> From: David Desberg <[email protected]>
> To: [email protected]
> Sent: Thursday, July 7, 2016 4:27 PM
> Subject: Re: Event time processing with Flink runner and Kafka source
>
> Dan,
>
> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as it
> looks to provide exactly the functionality I want. I was wondering how to set
> the timestamp correctly, at the source. Thank you for your help!
>
> David
>
>> On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi David,
>>
>> In Beam pipelines, the event time is initially set on the source. Downstream
>> code can make an event *later* just fine, but, making it *earlier* might
>> move it before the current watermark. This would effective tur data that we
>> believe is on-time into late data, and would in general be very bad! Allowed
>> lateness is a feature that lets you move data earlier by a fixed amount, so
>> if you have a tight bound on the time set by the source, this can sometimes
>> help. But it's generally discouraged in favor of proper timestamps in the
>> first place.
>>
>> My guess is that UnboundedFlinkSource is using the *processing time*, aka
>> current time when the element is received, rather than any event time
>> provided by the element. It might be possible using that source to provide
>> the element time.
>>
>> Alternately, I think you should be using KafkaIO and setting the event time
>> there using withTimestampFn:
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
>>
>> <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136>
>>
>> This way the elements will come into the system from Kafka with good
>> timestamps, and you don't need a downstream DoFn to transport them back in
>> time.
>>
>> Thanks,
>> Dan
>>
>> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi David,
>> I am doing pretty much the same thing using Beam KafkaIO.
>> For the simple thing I am doing, its working as expected.
>> Can you provide the code how you are invoking/receiving from Kafka pls?
>> Cheers
>>
>>
>> From: David Desberg <[email protected] <mailto:[email protected]>>
>> To: [email protected] <mailto:[email protected]>
>> Sent: Thursday, July 7, 2016 12:54 PM
>> Subject: Event time processing with Flink runner and Kafka source
>>
>> Hi all,
>>
>> I’m struggling to get a basic Beam application setup, windowed based upon
>> event time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer
>> to begin my pipeline. To set up event time processing, I applied a DoFn
>> transformation (via ParDo) that calls ProcessContext.outputWithTimestamp
>> using a timestamp extracted from each Kafka message. However, this results
>> in an exception telling me to override getAllowedTimestampSkew, since
>> evidently the messages are already timestamped and I am moving these
>> timestamps back in time, but only shifting to the future is allowed.
>> getAllowedTimestampSkew, however, is deprecated, and if I do override it and
>> allow skew, the windowing I am applying later in the pipeline fails. I
>> decided to backtrack and look at how the timestamps are even being assigned
>> initially, since the Flink source has no concept of the structure of my
>> messages and thus shouldn’t know how to assign any time at all. I found that
>> it turns out that the pipeline runner marks each incoming message with
>> ingestion time, in a manner that cannot be overridden/is not configurable
>> (see
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
>>
>> <https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273>)
>>
>> Why is this the case? Since part of the point of Beam is to allow event-time
>> processing, I’m sure I’m missing something here. How can I correctly ingest
>> message from Kafka and stamp them with event time, rather than ingestion
>> time?
>>
>>
>>
>
>
>