I would suggest updating the documentation to include that statement.

I imagine dynamic partition discovery has no effect on this?

Regards,
Alexis.

Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> Flink only reads the offsets from Kafka when the job is initially started
> from a clear slate.
> Once checkpoints are involved it only relies on offsets stored in the
> state.
>
> On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote:
>
> Hello again,
>
> I just performed a test
> using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I
> did a few tests in the following order, and I noticed a few weird things.
> Note that our job uses Processing Time windows, so watermarks are
> irrelevant.
>
> 1. After the job had been running for a while, we manually moved the
> consumer group's offset to 12 hours in the past [1] (without restarting the
> job).
>   - After this, the consumer simply stopped reading messages - the
> consumer lag in Kafka stayed at around 150k (no new data arrived)
>
> 2. We restarted the job with a checkpoint.
>   - The consumer lag in Kafka dropped down to 0, but no data was
> emitted from the windows.
>
> 3. We stopped the job, moved the offset again, and restarted Without any
> checkpoint/savepoint.
>   - This time the consumer correctly processed the backlog and emitted
> events from the windows.
>
> This was done with Flink 1.15.0.
>
> Is this expected? In other words, if there's a mismatch between Flink's
> state's offset and Kafka's offset, will the job be unable to run?
>
>
>
> [1] The command to move the offset was:
>
> kafka-consumer-groups.sh \
>   --bootstrap-server ... \
>   --topic our-topic \
>   --group our-group \
>   --command-config kafka-preprod.properties \
>   --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \
>   --execute
>
> Regards,
> Alexis.
>
> Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
>> Hi Yaroslav,
>>
>> The test I did was just using earliest, I'll test with committed offset
>> again, thanks.
>>
>> Regards,
>> Alexis.
>>
>> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, <yaros...@goldsky.io>
>> wrote:
>>
>>> Hi Alexis,
>>>
>>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
>>> consumer offsets? In this case, it should get the offsets from Kafka and
>>> not the state.
>>>
>>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Regarding the new Kafka source (configure with a consumer group), I
>>>> found out that if I manually change the group's offset with Kafka's admin
>>>> API independently of Flink (while the job is running), the Flink source
>>>> will ignore that and reset it to whatever it stored internally. Is there
>>>> any way to prevent this?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>

Reply via email to