Hi David,

If you want to try Beam+Flink with Kafka 0.8, you can:
1. get Beam+Flink working (I tested with Docker in [1])
2. run Kafka 0.8 (I tested with Docker with the image by Spotify)
3. run the KafkaWindowedWordCountExample inside Flink runner

I don’t think this will be the official/supported way, but it’s currently 
(still) working.

Best,

[1] 
http://medium.com/@ecesena/a-quick-demo-of-apache-beam-with-docker-da98b99a502a
[2] https://github.com/spotify/docker-kafka
[3] 
https://github.com/apache/incubator-beam/blob/master/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java


> On Jul 7, 2016, at 4:56 PM, David Desberg <[email protected]> wrote:
> 
> I see. Are there any options for Kafka 0.8? Thanks for the heads up.
> 
>> On Jul 7, 2016, at 4:54 PM, Raghu Angadi <[email protected]> wrote:
>> 
>> David,
>> 
>> note that KafkaIO in Beam requires Kafka server version should be >= 0.9
>> 
>> On Thu, Jul 7, 2016 at 4:27 PM, David Desberg <[email protected]> wrote:
>> Dan,
>> 
>> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as it 
>> looks to provide exactly the functionality I want. I was wondering how to 
>> set the timestamp correctly, at the source. Thank you for your help!
>> 
>> David
>> 
>>> On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote:
>>> 
>>> Hi David,
>>> 
>>> In Beam pipelines, the event time is initially set on the source. 
>>> Downstream code can make an event *later* just fine, but, making it 
>>> *earlier* might move it before the current watermark. This would effective 
>>> tur data that we believe is on-time into late data, and would in general be 
>>> very bad! Allowed lateness is a feature that lets you move data earlier by 
>>> a fixed amount, so if you have a tight bound on the time set by the source, 
>>> this can sometimes help. But it's generally discouraged in favor of proper 
>>> timestamps in the first place.
>>> 
>>> My guess is that UnboundedFlinkSource is using the *processing time*, aka 
>>> current time when the element is received, rather than any event time 
>>> provided by the element. It might be possible using that source to provide 
>>> the element time.
>>> 
>>> Alternately, I think you should be using KafkaIO and setting the event time 
>>> there using withTimestampFn: 
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
>>> 
>>> This way the elements will come into the system from Kafka with good 
>>> timestamps, and you don't need a downstream DoFn to transport them back in 
>>> time.
>>> 
>>> Thanks,
>>> Dan
>>> 
>>> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]> wrote:
>>> Hi David,
>>> I am doing pretty much the same thing  using Beam KafkaIO.
>>> For the simple thing I am doing, its working as expected.
>>> Can you provide the code how you are invoking/receiving from Kafka pls?
>>> Cheers
>>> 
>>> 
>>> From: David Desberg <[email protected]>
>>> To: [email protected] 
>>> Sent: Thursday, July 7, 2016 12:54 PM
>>> Subject: Event time processing with Flink runner and Kafka source
>>> 
>>> Hi all,
>>> 
>>> I’m struggling to get a basic Beam application setup, windowed based upon 
>>> event time. I’m reading from an UnboundedFlinkSource of a 
>>> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I 
>>> applied a DoFn transformation (via ParDo) that calls 
>>> ProcessContext.outputWithTimestamp using a timestamp extracted from each 
>>> Kafka message. However, this results in an exception telling me to override 
>>> getAllowedTimestampSkew, since evidently the messages are already 
>>> timestamped and I am moving these timestamps back in time, but only 
>>> shifting to the future is allowed. getAllowedTimestampSkew, however, is 
>>> deprecated, and if I do override it and allow skew, the windowing I am 
>>> applying later in the pipeline fails. I decided to backtrack and look at 
>>> how the timestamps are even being assigned initially, since the Flink 
>>> source has no concept of the structure of my messages and thus shouldn’t 
>>> know how to assign any time at all. I found that it turns out that the 
>>> pipeline runner marks each incoming message with ingestion time, in a 
>>> manner that cannot be overridden/is not configurable (see 
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273)
>>> 
>>> Why is this the case? Since part of the point of Beam is to allow 
>>> event-time processing, I’m sure I’m missing something here. How can I 
>>> correctly ingest message from Kafka and stamp them with event time, rather 
>>> than ingestion time? 
>>> 
>>> 
>>> 
>> 
>> 
> 

-- 
Emanuele Cesena, Data Eng.
http://www.shopkick.com

Il corpo non ha ideali




Reply via email to