Hi all,

It's really a bug, haishui has created the FLINK-32496[1],
let's follow this issue there.


[1] https://issues.apache.org/jira/browse/FLINK-32496

Best,
Rui Fan

On Thu, Jun 29, 2023 at 8:34 PM Alexis Sarda-Espinosa <
[email protected]> wrote:

> BTW, it seems I spoke too soon in my previous email. I left the job
> running overnight with each source having its own alignment group to
> evaluate only per-split alignment, and I can see that eventually some
> partitions never resumed consumption and the consumer lag increased.
>
> Regards,
> Alexis.
>
> Am Do., 29. Juni 2023 um 10:08 Uhr schrieb Alexis Sarda-Espinosa <
> [email protected]>:
>
>> Hi Martjin,
>>
>> thanks for the pointers. I think the issue I'm seeing is not caused by
>> those because in my case the watermarks are not negative. Some more
>> information from my setup in case it's relevant:
>>
>> - All Kafka topics have 6 partitions.
>> - Job parallelism is 2, but 2 of the Kafka sources are hard-coded to
>> parallelism=1.
>>
>> Regards,
>> Alexis.
>>
>> Am Do., 29. Juni 2023 um 10:00 Uhr schrieb Martijn Visser <
>> [email protected]>:
>>
>>> Hi Alexis,
>>>
>>> There are a couple of recent Flink tickets on watermark alignment,
>>> specifically https://issues.apache.org/jira/browse/FLINK-32414 and
>>> https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be
>>> also applicable in your case?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa <
>>> [email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> just for completeness, I don't see the problem if I assign a different
>>>> alignment group to each source, i.e. using only split-level watermark
>>>> alignment.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>> Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui <[email protected]
>>>> >:
>>>>
>>>>> Hi,
>>>>> I have the same trouble. This is really a bug.
>>>>> `shouldWaitForAlignment` needs to be another change.
>>>>>
>>>>> By the way, a source will be marked as idle, when the source has
>>>>> waiting for alignment for a long time. Is this a bug?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" <
>>>>> [email protected]> 写道:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am currently evaluating idleness and alignment with Flink 1.17.1 and
>>>>> the externalized Kafka connector. My job has 3 sources whose watermark
>>>>> strategies are defined like this:
>>>>>
>>>>> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>>>>>             .withIdleness(idleTimeout)
>>>>>             .withWatermarkAlignment("group", maxAllowedWatermarkDrift,
>>>>> Duration.ofSeconds(1L))
>>>>>
>>>>> The max allowed drift is currently 5 seconds, and my sources have an
>>>>> idleTimeout of 1, 1.5, and 5 seconds.
>>>>>
>>>>> What I observe is that, when I restart the job, all sources publish
>>>>> messages, but then 2 of them are marked as idle and never resume. I found
>>>>> https://issues.apache.org/jira/browse/FLINK-31632, which should be
>>>>> fixed in 1.17.1, but I don't think it's the same issue, my logs don't show
>>>>> negative values:
>>>>>
>>>>> 2023-06-27 15:11:42,927 DEBUG
>>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>>> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) 
>>>>> from
>>>>> subTaskId=1
>>>>> 2023-06-27 15:11:43,009 DEBUG
>>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>>> 07:12:55.807) from subTaskId=0
>>>>> 2023-06-27 15:11:43,091 DEBUG
>>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>>> 07:12:55.807) from subTaskId=0
>>>>> 2023-06-27 15:11:43,116 DEBUG
>>>>> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New
>>>>> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17
>>>>> 07:12:55.807) from subTaskId=0
>>>>> 2023-06-27 15:11:43,298 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1]
>>>>> 2023-06-27 15:11:43,304 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>>> 2023-06-27 15:11:43,306 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0]
>>>>> 2023-06-27 15:11:43,486 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>> 2023-06-27 15:11:43,489 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>> 2023-06-27 15:11:43,492 INFO
>>>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 -
>>>>> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[]
>>>>>
>>>>> Does anyone know if I'm missing something or this is really a bug?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>

Reply via email to