Re: Timestamp/watermark support in Kinesis consumer

2018-02-23 Thread Thomas Weise
Another nice thing is that readers can potentially also read from different sources (historic/latest). To arrive at a general connector pattern, it will also be necessary to consider the ordering relationship between restrictions/splits/blocks/segments when it is important for the processing logic

Re: Timestamp/watermark support in Kinesis consumer

2018-02-22 Thread Aljoscha Krettek
While we're on this: https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html This is a concrete way of separating partition/shard/split discovery from their reading. The nice thing about this is that you can mix-and-match "discovery components" and "reader components". For example, for

Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
I know this is a very simplistic idea but... In general the issue Eron is describing occurs whenever two (or more) parallel partitions are assigned to the same Flink sub-task and there is large time delta between them. This problem exists though largely because we are not making any decisions

Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
Big +1 on trying to come up with a common framework for partition-based, replayable sources. There is so much common code to be written that makes it possible to write correct connectors and Gordon's bullet points are exactly those -- and it's not just Kinesis and Kafka. It's also true for

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
It is valuable to consider the behavior of a consumer in both a real-time processing context, which consists mostly of tail reads, and a historical processing context, where there's an abundance of backlogged data. In the historical processing context, system internals (e.g. shard selection

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Thomas Weise
I don't think there is a generic solution to the problem you are describing; we don't know how long it will take for resharding to take effect and those changes to become visible to the connector. Depending on how latency sensitive the pipeline is, possibly a configurable watermark hold period

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
I'd like to know how you envision dealing with resharding in relation to the watermark state. Imagine that a given shard S1 has a watermark of T1, and is then split into two shards S2 and S3. The new shards are assigned to subtasks according to a hash function. The current watermarks of those

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Thomas Weise
Based on my draft implementation, the changes that are needed in the Flink connector are as follows: I need to be able to override the following to track last record timestamp and idle time per shard. protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int

Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Thomas Weise
--> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai wrote: > Regarding the two hooks you would like to be available: > > >- Provide hook to override discovery (not to hit Kinesis from every >subtask) > > Yes, I think we can easily provide a way, for example

Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Regarding the two hooks you would like to be available: Provide hook to override discovery (not to hit Kinesis from every subtask) Yes, I think we can easily provide a way, for example setting -1 for SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. Though, the user would then have to

Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Hi Thomas, It’s great that you’ve brought out these issues, which IMO are all very valid. They have also been in my head for a while. Here’s a list of things, out of the top of my head, that I would really like to improve as part of a major Kafka / Kinesis connector rework. Some have JIRAs for

Re: Timestamp/watermark support in Kinesis consumer

2018-02-07 Thread Thomas Weise
Generalizing the pattern would be great. I was also wondering if there aren't other commonalities between sources that would benefit from a shared framework. Kafka and Kinesis don't look all that different from a consumer perspective: replayable source, topic -> stream, partition -> shard, offset

Re: Timestamp/watermark support in Kinesis consumer

2018-02-07 Thread Aljoscha Krettek
Hi, That last point is very valid. For a while now I've wanted to generalise the pattern of our file source to other sources. (This is related to how Beam sources are being refactored to use Splittable DoFn.) I'm very eager for design work to start on this once 1.5 is out the door. There are

Re: Timestamp/watermark support in Kinesis consumer

2018-02-06 Thread Thomas Weise
In addition to lack of watermark support, the Kinesis consumer suffers from a discovery related issue that also needs to be resolved. Shard discovery runs periodically in all subtasks. That's not just inefficient but becomes a real problem when there is a large number of subtasks due to rate

Timestamp/watermark support in Kinesis consumer

2018-02-05 Thread Thomas Weise
Hi, The Kinesis consumer currently does not emit watermarks, and this can lead to problems when a single subtask reads from multiple shards and offsets are not closely aligned with respect to the event time. The Kafka consumer has support for periodic and punctuated watermarks, although there is