Github user pluppens commented on a diff in the pull request:
https://github.com/apache/flink/pull/5845#discussion_r187085104
--- Diff:
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducerTest.java
---
@@ -0,0
Github user pluppens commented on a diff in the pull request:
https://github.com/apache/flink/pull/5845#discussion_r187084563
--- Diff:
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
---
@@ -0,0 +1,36
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Good point. An ugly workaround would be to store a timestamp when the
ending number is being set on a shard, and provide a configurable/sufficiently
enough (eg. 7 days) window. It would exclude
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Regarding the remark from @StephanEwen: perhaps it would be ok to re-use
the `KinesisProxy` to return a list of all shards and compare them to the
`sequenceNumsToRestore` to prune any shards
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Thanks - we've been running it in production for the last 5 days without
issues, so it seems to work fine. We'll be enabling autoscaling of the streams
in the coming hours, so if anything is amiss
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
@tzulitai Is there anything more I can do from my side?
---
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
@bowenli86 Makes sense - I've updated the description to contain the
initial email/issue. HTH.
---
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
@bowenli86 we're passing the last-seen shardId, and the Kinesis call
returns only newer shards. Not sure if that answers your remark - because I
didn't really understand the question either.
---
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Alright, I've given it a quick stab - but the whole 'remove/update/re-add'
cycle is kinda ugly due to the hashcode change. And I've just copied the test
from the other example rather than using
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Just a small remark - from what I understood, the only property that *can*
change is the endingSequenceNumber - all other state should be considered as
'set once', so there should be no point
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Ok, that makes sense to me. Give me a bit to cook up both the new test and
the new approach, and I'll update the PR. Thank you very much for the comments!
---
Github user pluppens commented on the issue:
https://github.com/apache/flink/pull/5337
Ok, so you'd prefer to synchronize the state of the retrieve shard against
the stored shards by comparing its stream name and shard id, before doing the
containsKey() check?
---
Github user pluppens commented on a diff in the pull request:
https://github.com/apache/flink/pull/5337#discussion_r163226460
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
---
@@ -515,6
GitHub user pluppens opened a pull request:
https://github.com/apache/flink/pull/5337
[FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot
restoration is able to handle recently closed shards
FLINK-8484: ensure that a state change in the StreamShardMetadata
14 matches
Mail list logo