Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-03 Thread Till Rohrmann
 Hi Alexey,

I think the current idleness detection works based on timeouts. You need a
special watermark generator that periodically emits the watermarks. If no
event has been emitted for so and so long, then it is marked as idle.

Yes, I was referring to FLINK-18450. At the moment nobody is actively
working on it, but it is on the roadmap for improvements for the new source
APIs (FLIP-27).

Cheers,
Till

On Tue, Jun 1, 2021 at 8:55 PM Alexey Trenikhun  wrote:

> Hi Till,
>
> >However, this will stall the whole reading process if there is a
> partition which has no more data. Hence, you will probably also need a
> mechanism to advance the watermark if the partition becomes idle.
> This is why I need to find out is partition idle. Looks like Kafka Flink
> Connector definitely has this information,  looks like derived
> class KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput
> and deferredOutput have field state which has *idle* flag.
>
> Thank you for information about new KafkaConnector, I assume that you are
> referring to [1], but it seems also stalled. Or you are talking about
> different task ?
>
> [1]-https://issues.apache.org/jira/browse/FLINK-18450
> [FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF JIRA
> 
> trigger comment-preview_link fieldId comment fieldName Comment
> rendererType atlassian-wiki-renderer issueKey FLINK-18450 Preview comment
> issues.apache.org
>
> Thanks,
> Alexey
> --
> *From:* Till Rohrmann 
> *Sent:* Tuesday, June 1, 2021 6:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: How to check is Kafka partition "idle" in
> emitRecordsWithTimestamps
>
> Hi Alexey,
>
> looking at KafkaTopicPartitionStatus, it looks that it does not contain
> this information. In a nutshell, what you probably have to do is to
> aggregate the watermarks across all partitions and then pause the
> consumption of a partition if its watermark advances too much wrt to the
> minimum watermark. However, this will stall the whole reading process if
> there is a partition which has no more data. Hence, you will probably also
> need a mechanism to advance the watermark if the partition becomes idle.
>
> Note that the community is currently working on a new KafkaConnector based
> on Flink's new source API (FLIP-27). If I am not mistaken, then these new
> interfaces should eventually also support event time alignment.
>
> Cheers,
> Till
>
> On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing custom Kafka connector which provides
> event alignment (similar to FLINK-10921, which seems abandoned). What is
> the way to determine is partition is idle from override
> of AbstractFetcher.emitRecordsWithTimestamps()?
> Does KafkaTopicPartitionState has this information ?
>
> Thanks,
> Alexey
>
>


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-01 Thread Alexey Trenikhun
Hi Till,

>However, this will stall the whole reading process if there is a partition 
>which has no more data. Hence, you will probably also need a mechanism to 
>advance the watermark if the partition becomes idle.
This is why I need to find out is partition idle. Looks like Kafka Flink 
Connector definitely has this information,  looks like derived class 
KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput and 
deferredOutput have field state which has idle flag.

Thank you for information about new KafkaConnector, I assume that you are 
referring to [1], but it seems also stalled. Or you are talking about different 
task ?

[1]-https://issues.apache.org/jira/browse/FLINK-18450
[FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF 
JIRA
trigger comment-preview_link fieldId comment fieldName Comment rendererType 
atlassian-wiki-renderer issueKey FLINK-18450 Preview comment
issues.apache.org

Thanks,
Alexey

From: Till Rohrmann 
Sent: Tuesday, June 1, 2021 6:24 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain this 
information. In a nutshell, what you probably have to do is to aggregate the 
watermarks across all partitions and then pause the consumption of a partition 
if its watermark advances too much wrt to the minimum watermark. However, this 
will stall the whole reading process if there is a partition which has no more 
data. Hence, you will probably also need a mechanism to advance the watermark 
if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based on 
Flink's new source API (FLIP-27). If I am not mistaken, then these new 
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I'm thinking about implementing custom Kafka connector which provides event 
alignment (similar to FLINK-10921, which seems abandoned). What is the way to 
determine is partition is idle from override of 
AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has 
this information ?

Thanks,
Alexey


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-01 Thread Till Rohrmann
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain
this information. In a nutshell, what you probably have to do is to
aggregate the watermarks across all partitions and then pause the
consumption of a partition if its watermark advances too much wrt to the
minimum watermark. However, this will stall the whole reading process if
there is a partition which has no more data. Hence, you will probably also
need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based
on Flink's new source API (FLIP-27). If I am not mistaken, then these new
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun  wrote:

> Hello,
> I'm thinking about implementing custom Kafka connector which provides
> event alignment (similar to FLINK-10921, which seems abandoned). What is
> the way to determine is partition is idle from override
> of AbstractFetcher.emitRecordsWithTimestamps()?
> Does KafkaTopicPartitionState has this information ?
>
> Thanks,
> Alexey
>


How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-05-28 Thread Alexey Trenikhun
Hello,
I'm thinking about implementing custom Kafka connector which provides event 
alignment (similar to FLINK-10921, which seems abandoned). What is the way to 
determine is partition is idle from override of 
AbstractFetcher.emitRecordsWithTimestamps()? Does KafkaTopicPartitionState has 
this information ?

Thanks,
Alexey