Hi Hatem,

The reason for setting different client ids is to due to Kafka client
metrics conflicts and the issue is documented here:
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics.
I think that the warning log is benign if you are using Flink's metric
system for monitoring the Kafka connector and it would be nice to introduce
an option in the connector to configure the same `client.id` across all
tasks for the quota feature you mentioned.

Best,
Mason

On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa <m...@hatem.co> wrote:

> Hello Martijn,
>
> Yes, checkpointing is enabled and the offsets are committed without a
> problem. I think I might have figured out the answer to my second question
> based on my understanding of this code
> <https://github.com/apache/flink/blob/0612a997ddcc791ee54f500fbf1299ce04987679/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java>,
> flink uses low level consumer that does not trigger consumer.subscribe
> which makes the consumer group not appear as an active member using
> kafka-consumer-group tool, The consumer group functionality is fine though.
> However I am more interested in an answer for my first question. Kafka
> Quotas is one of the important features of using kafka and with flink
> setting a different client id for every consumer in the same consumer group
> makes it hard to set quotas for that consumer group. What is the reason
> behind setting different client ids?
>
> On Wed, May 24, 2023 at 1:13 PM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi Hatem,
>>
>> Could it be that you don't have checkpointing enabled? Flink only commits
>> its offset when a checkpoint has been completed successfully, as explained
>> on
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>>
>> Best regards,
>>
>> Martijn
>>
>>
>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa <m...@hatem.co> wrote:
>>
>>> Hello,
>>>
>>> I have two questions that are related to each other:
>>>
>>> *First question:*
>>>
>>> I have been trying to set `client.id` to set a kafka client quota
>>> <https://kafka.apache.org/documentation.html#design_quotas> for
>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>>> lot of data from our kafka cluster causing a denial of service for our
>>> kafka cluster. However `client.id` gets overridden by flink source here
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>.
>>> How would I enforce quotas for flink kafka source?
>>>
>>> *Second question:*
>>>
>>> Also something I didn't quite understand when describing our consumer
>>> group in kafka why I don't see the metadata for the consumer group
>>> information (consumer id, client id & host) and I get that the consumer
>>> group has no active members but it's actually active and consuming.
>>>
>>> *Example describing a flink consumer group*
>>>
>>>> ./kafka-consumer-groups.sh --bootstrap-server
>>>> kafka-server-address:9092   --describe --group flink-consumer-group
>>>> Consumer group 'flink-consumer-group' has no active members.
>>>> GROUP                           TOPIC           PARTITION
>>>>  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST
>>>>      CLIENT-ID
>>>> flink_consumer_group     topic_name 1         514588965       514689721
>>>>       100756                                       -
>>>>      -                     -
>>>
>>>
>>>
>>> *Example describing a normal consumer group written using a confluent
>>> kafka python library.*
>>>
>>>> ./kafka-consumer-groups.sh ---bootstrap-server
>>>> kafka-server-address:9092  --describe --group
>>>> python_confluent_kafka_consumer
>>>> GROUP                                            TOPIC
>>>>  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>>>
>>>>                   HOST           CLIENT-ID
>>>> python_confluent_kafka_consumer topic_name       1
>>>>  17279532                  17279908                  376
>>>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
>>>> /<HOST-IP> python_confluent_kafka_consumer_client_id
>>>
>>>
>>>
>>> I am using flink version 1.15.
>>>
>>> Thanks,
>>> Hatem
>>>
>>>
>>>
>>>

Reply via email to