[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Good point. An ugly workaround would be to store a timestamp when the ending number is being set on a shard, and provide a configurable/sufficiently enough (eg. 7 days) window. It would exclude the

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 Here it is: https://issues.apache.org/jira/browse/FLINK-8542 ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens My only concern is that scanning the whole list of shards can be very limited to AWS Kinesis's API invoke rate limitations. Also, we would then only be cleaning up the state on

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-02-01 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Regarding the remark from @StephanEwen: perhaps it would be ok to re-use the `KinesisProxy` to return a list of all shards and compare them to the `sequenceNumsToRestore` to prune any shards that

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-31 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 Great, thanks for the update! As a side note, I will be making some additional changes to the code regarding the not-so-nice iteration across the `sequenceNumsToRestore` map. It would make

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-31 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Thanks - we've been running it in production for the last 5 days without issues, so it seems to work fine. We'll be enabling autoscaling of the streams in the coming hours, so if anything is amiss,

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-31 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens the changes look good to merge! Thanks a lot for working on this. Will merge this for `release-1.4`, `release-1.3`, and `master` .. ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-28 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @tzulitai Is there anything more I can do from my side? ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-25 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @bowenli86 Makes sense - I've updated the description to contain the initial email/issue. HTH. ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread bowenli86
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5337 good to have the background info from the email thread. I didn't have a full picture before ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 @bowenli86 we're passing the last-seen shardId, and the Kinesis call returns only newer shards. Not sure if that answers your remark - because I didn't really understand the question either. ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 re @bowenli86 yes, Kinesis shard metadata is fetched every `DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS` millis. Could you describe a bit more which part you don't think is valid?

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-24 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 re @StephanEwen yes, currently, state is still kept indefinitely for closed shards. A special `SHARD_END` marker sequence number is stored as the sequence number for closed shards, so that the

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread bowenli86
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5337 I'm wondering if this is valid? My understanding is that, by default, flink-connector-kinesis will get kinesis metadata (#shard, shard id, etc) every 10s (defined by

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread StephanEwen
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5337 I am not deeply into the Kinesis Consumer logic, just writing here to double check that we do not build a solution where state grows infinitely. For example, it would not be feasible to

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Alright, I've given it a quick stab - but the whole 'remove/update/re-add' cycle is kinda ugly due to the hashcode change. And I've just copied the test from the other example rather than using the

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens yes, that sounds correct. `parentShardId` `adjacentParentShardId` `startingHashKey` `endingHashKey` `startingSequenceNumber` these should all be fixed once the

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Just a small remark - from what I understood, the only property that *can* change is the endingSequenceNumber - all other state should be considered as 'set once', so there should be no point in

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Ok, that makes sense to me. Give me a bit to cook up both the new test and the new approach, and I'll update the PR. Thank you very much for the comments! ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5337 @pluppens yes, I think that would be the proper solution here. ---

[GitHub] flink issue #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis co...

2018-01-23 Thread pluppens
Github user pluppens commented on the issue: https://github.com/apache/flink/pull/5337 Ok, so you'd prefer to synchronize the state of the retrieve shard against the stored shards by comparing its stream name and shard id, before doing the containsKey() check? ---