Re: [DISCUSS] Minimum constraint for segment.ms

2024-03-13 Thread Divij Vaidya
+ users@kafka

Hi users of Apache Kafka

With the upcoming 4.0 release, we have an opportunity to improve the
constraints and default values for various Kafka configurations.

We are soliciting your feedback and suggestions on configurations where the
default values and/or constraints should be adjusted. Please reply in this
thread directly.

--
Divij Vaidya
Apache Kafka PMC



On Wed, Mar 13, 2024 at 12:56 PM Divij Vaidya 
wrote:

> Thanks for the discussion folks. I have started a KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
> to keep track of the changes that we are discussion. Please consider this
> as a collaborative work-in-progress KIP and once it is ready to be
> published, we can start a discussion thread on it.
>
> I am also going to start a thread to solicit feedback from users@ mailing
> list as well.
>
> --
> Divij Vaidya
>
>
>
> On Wed, Mar 13, 2024 at 12:55 PM Christopher Shannon <
> christopher.l.shan...@gmail.com> wrote:
>
>> I think it's a great idea to raise a KIP to look at adjusting defaults and
>> minimum/maximum config values for version 4.0.
>>
>> As pointed out, the minimum values for segment.ms and segment.bytes don't
>> make sense and would probably bring down a cluster pretty quickly if set
>> that low, so version 4.0 is a good time to fix it and to also look at the
>> other configs as well for adjustments.
>>
>> On Wed, Mar 13, 2024 at 4:39 AM Sergio Daniel Troiano
>>  wrote:
>>
>> > hey guys,
>> >
>> > Regarding to num.recovery.threads.per.data.dir: I agree, in our company
>> we
>> > use the number of vCPUs to do so as this is not competing with ready
>> > cluster traffic.
>> >
>> >
>> > On Wed, 13 Mar 2024 at 09:29, Luke Chen  wrote:
>> >
>> > > Hi Divij,
>> > >
>> > > Thanks for raising this.
>> > > The valid minimum value 1 for `segment.ms` is completely
>> unreasonable.
>> > > Similarly for `segment.bytes`, `metadata.log.segment.ms`,
>> > > `metadata.log.segment.bytes`.
>> > >
>> > > In addition to that, there are also some config default values we'd
>> like
>> > to
>> > > propose to change in v4.0.
>> > > We can collect more comments from the community, and come out with a
>> KIP
>> > > for them.
>> > >
>> > > 1. num.recovery.threads.per.data.dir:
>> > > The current default value is 1. But the log recovery is happening
>> before
>> > > brokers are in ready state, which means, we should use all the
>> available
>> > > resource to speed up the log recovery to bring the broker to ready
>> state
>> > > soon. Default value should be... maybe 4 (to be decided)?
>> > >
>> > > 2. Other configs might be able to consider to change the default, but
>> > open
>> > > for comments:
>> > >2.1. num.replica.fetchers: default is 1, but that's not enough when
>> > > there are multiple partitions in the cluster
>> > >2.2. `socket.send.buffer.bytes`/`socket.receive.buffer.bytes`:
>> > > Currently, we set 100kb as default value, but that's not enough for
>> > > high-speed network.
>> > >
>> > > Thank you.
>> > > Luke
>> > >
>> > >
>> > > On Tue, Mar 12, 2024 at 1:32 AM Divij Vaidya > >
>> > > wrote:
>> > >
>> > > > Hey folks
>> > > >
>> > > > Before I file a KIP to change this in 4.0, I wanted to understand
>> the
>> > > > historical context for the value of the following setting.
>> > > >
>> > > > Currently, segment.ms minimum threshold is set to 1ms [1].
>> > > >
>> > > > Segments are expensive. Every segment uses multiple file descriptors
>> > and
>> > > > it's easy to run out of OS limits when creating a large number of
>> > > segments.
>> > > > Large number of segments also delays log loading on startup because
>> of
>> > > > expensive operations such as iterating through all directories &
>> > > > conditionally loading all producer state.
>> > > >
>> > > > I am currently not aware of a reason as to why someone might want to
>> > work
>> > > > with a segment.ms of less than ~10s (number chosen arbitrary that
>> > looks
>> > > > sane)
>> > > >
>> > > > What was the historical context of setting the minimum threshold to
>> 1ms
>> > > for
>> > > > this setting?
>> > > >
>> > > > [1]
>> > https://kafka.apache.org/documentation.html#topicconfigs_segment.ms
>> > > >
>> > > > --
>> > > > Divij Vaidya
>> > > >
>> > >
>> >
>>
>


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-13 Thread William Lee
Hi Richard,
Thanks for replying.

> but I close the KafkaProducer inside the send
> callback.
> ...
>  Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave
me
> relatively good performance.

Yes, I also find that closing the KafkaProducer inside the send callback
can prevent more extra records from being sent. But after some
investigation into the source code of KafkaProducer and Sender, I think
closing kafka producer in callback is not 100% reliable in such cases. For
example, If you set max.in.flight.requests.per.connection to 5, and you
sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
callback and you initiated kafka producer closing inside callback, but
batch No.3 might already in flight which still might be sent to the broker.
Even though I haven't observed such results during my experiments, I am
still not sure this is reliable since kafka's official documentation has no
guarantee about this behaviour.

In the source code of KafkaProducer and Sender, only when
max.in.flight.requests.per.connection set to 1 will the
"guaranteeMessageOrder" property set to true thus ensuring only one request
will be in flight per partition.
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
at master · a0x8o/kafka

kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
at master · a0x8o/kafka


Do you have any thoughts?

Thanks and regards,
William Lee

Richard Bosch  于2024年3月13日周三 16:38写道:

> Hi WIlliam,
>
> I see from your example that you close the kafka producer in the send
> loop, based on the content of sendException that is used in the callback of
> the KafkaProducer send.
> Since your send loop is a different thread than the KafkaProducer uses to
> send you will encounter race conditions on this close logic.
>
> I actually had a similar requirement as yours and solved it by using a
> sendException like you do, but I close the KafkaProducer inside the send
> callback. The send callback is executed as part of the produce thread, and
> closing the consumer there will stop all subsequent batches of processing,
> as the current batch isn't finished yet. Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave me
> relatively good performance.
>
> Kind regards,
>
>
> Richard Bosch
>
> Developer Advocate
>
> Axual BV
>
> https://axual.com/
>


Re: Kakfa Topic gets auto-created post deletion

2024-03-13 Thread Abhishek Singla
Issue is now resolved by upgrading zookeeper version from 3.4.14 to 3.8.0

On Sun, Feb 18, 2024 at 6:15 PM sunil chaudhari 
wrote:

> Hi Abhishek,
> as I told you before, topic is getting created only because of consumer
> polling.
> The only way is to stop consumer, remove topic name from the consumer,
> delete topic.
>
> Here I assume your consumer knows which topic to read from( by static topic
> name)
> If not, I dont know how your consumer knows excat topic name where to read
> from?
>
> In my case we had static topic names given in consumer config. So removed
> those before topic deletion.
>
> Alternatively you can set auto create false by rolling restart all your
> brokers if number of brokers and not much.
> Tbat will be quick fix
>
>
>
> On Sun, 18 Feb 2024 at 6:07 PM, Abhishek Singla <
> abhisheksingla...@gmail.com>
> wrote:
>
> > Hi Megh,
> >
> > Thanks for the detailed explanation. I tested it out and below are the
> > observations.
> >
> > When the delete topic command is issued, the topic is marked for deletion
> > and it is deleted within the next 1 minute. In this 1 minute interval,
> the
> > consumer offsets are there in __consumer_offsets topic but the describe
> > group command fails with below error.
> > *Error: Executing consumer group command failed due to
> > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This
> > server does not host this topic-partition.*
> >
> > Now, within that one minute if consumer is running then the topic gets
> > created again and offset is reset to 0 and if consumer is not running
> then
> > offsets are cleared automatically from __consumer_offsets topic and topic
> > is not created upon resuming consumers.
> >
> > kafka-consumer-groups.sh --delete-offsets will expedite the offset
> > clearance bit but it also requires consumers to be stopped. One more
> thing
> > I noticed is --consumer-property allow.auto.create.topics=false does not
> > restrict topic creation in this particular scenario.
> >
> > Is there any other way I can delete the topics without stopping
> consumers?
> >
> > Regards,
> > Abhishek Singla
> >
> >
> >
> > On Sun, Feb 18, 2024 at 3:18 PM megh vidani 
> > wrote:
> >
> > > Hi Abhishek,
> > >
> > > The consumer offsets get stored in the __consumer_offsets topic by the
> > > kafka brokers and don't get automatically deleted when you delete a
> > topic.
> > > If you have auto.create.topics.enable set to true in your broker
> configs,
> > > th consumer automatically creates the topic again.
> > >
> > > You will have to delete the offsets for that topic using the
> > > kafka-consumer-groups.sh for your consumer group in order to remove the
> > > topic offsets for that consumer group.
> > >
> > > Also, you can disable automatic creation of topic in the kafka brokers
> > > config using auto.create.topics.enable property.
> > >
> > > Thanks,
> > > Megh
> > >
> > >
> > > On Sun, Feb 18, 2024, 15:09 Abhishek Singla <
> abhisheksingla...@gmail.com
> > >
> > > wrote:
> > >
> > > > We only delete topics which does not have any active producers.
> > > >
> > > > Nowhere in our consumer config we explicitly specify topic name,
> these
> > > are
> > > > subscribed based on pattern. How do I remove topic from consumer
> > config?
> > > >
> > > > On Sun, 18 Feb 2024 at 2:51 PM, sunil chaudhari <
> > > > sunilmchaudhar...@gmail.com>
> > > > wrote:
> > > >
> > > > > correcte missig thing.
> > > > > You have to remove that topic from producer as well as consumer
> > config
> > > > > both.
> > > > >
> > > > > On Sun, 18 Feb 2024 at 2:49 PM, sunil chaudhari <
> > > > > sunilmchaudhar...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > You have to remove that topic from consumer config.
> > > > > > restart consumer.
> > > > > > the wait for some time.
> > > > > > Then delete topic.
> > > > > > this time it wont create again.
> > > > > >
> > > > > > On Sun, 18 Feb 2024 at 1:07 PM, Abhishek Singla <
> > > > > > abhisheksingla...@gmail.com> wrote:
> > > > > >
> > > > > >> Hi Team,
> > > > > >>
> > > > > >> Kafka version: 2_2.12-2.6.0
> > > > > >> Zookeeper version: 3.4.14
> > > > > >> Java version: 1.8.0_301
> > > > > >>
> > > > > >> Kafka Subscribe Pattern API is used to Subscribe to all topics
> > > > matching
> > > > > >> specified pattern to get dynamically assigned partitions. New
> > topics
> > > > of
> > > > > >> specified pattern get created with time.
> > > > > >>
> > > > > >> > KafkaConsumer subscribe(Pattern pattern,
> > > > > >> Handler>
> > > > > >> completionHandler);
> > > > > >>
> > > > > >> The issue arises when we try to delete the topic. After the
> delete
> > > > topic
> > > > > >> command is issued, the topic is deleted successfully. However,
> it
> > > gets
> > > > > >> auto-created again within 5 mins. Broker offsets are reset to
> > zero,
> > > > new
> > > > > >> topic partitions could be on same broker nodes or different.
> > > > > >>
> > > > > >> Below are some of the configs used:
> > > > > >>
> > > > > >>
> > > > > >> 

Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-13 Thread Bruno Cadonna

Hi Venkatesh,

Extending on what Matthias replied, a metadata refresh might trigger a 
rebalance if the metadata changed. However, a metadata refresh that does 
not show a change in the metadata will not trigger a rebalance. In this 
context, i.e., config METADATA_MAX_AGE_CONFIG, the metadata is the 
metadata about the cluster received by the client.


The metadata mentioned in the log messages you posted is metadata of the 
group to which the member (a.k.a. consumer, a.k.a. client) belongs. The 
log message originates from the broker (in contrast 
METADATA_MAX_AGE_CONFIG is a client config). If the rebalance were 
triggered by a cluster metadata change the log message should contain 
something like "cached metadata has changed" as client reason [1].


Your log messages seem genuine log messages that are completely normal 
during rebalance events.


How often do they happen?
What do you mean with stop-the-world rebalances?

Best,
Bruno


[1] 
https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66



On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:

Just want to share another variant of the log message which is also related to 
metadata and rebalancing but has a different client reason:

INFO [GroupCoordinator 3]: Preparing to rebalance group  in state 
PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: triggered followup 
rebalance scheduled for 0) (kafka.coordinator.group.GroupCoordinator)

Thank you.

Kind regards,
Venkatesh

From: Venkatesh Nagarajan 
Date: Wednesday, 13 March 2024 at 12:06 pm
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Thanks very much for your important inputs, Matthias.

I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I saw 
a lot of such rebalancing related messages in the MSK broker logs:

INFO [GroupCoordinator 2]: Preparing to rebalance group  in state 
PreparingRebalance with old generation  (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: need to revoke partitions 
and re-join) (kafka.coordinator.group.GroupCoordinator)

I am guessing that the two are unrelated. If you have any suggestions on how to 
reduce such rebalancing, that will be very helpful.

Thank you very much.

Kind regards,
Venkatesh

From: Matthias J. Sax 
Date: Tuesday, 12 March 2024 at 1:31 pm
To: users@kafka.apache.org 
Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you
setting


METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata
refresh does not trigger a rebalance.



-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


*   Kafka Streams Version: 3.5.1
*   Kafka: AWS MSK v3.6.0
*   Consumes events from 6 topics
*   Calls APIs to enrich events
*   Sometimes joins two streams
*   Produces enriched events in output topics

Runs on AWS ECS:

*   Each task has 10 streaming threads
*   Autoscaling based on offset lags and a maximum of 6 ECS tasks
*   Input topics have 60 partitions each to match 6 tasks * 10 threads
*   Fairly good spread of events across all topic partitions using 
partitioning keys

Settings and configuration:


*   At least once semantics
*   MAX_POLL_RECORDS_CONFIG: 10
*   APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

*   Static membership (using GROUP_INSTANCE_ID_CONFIG)
*   METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
make rebalances rare)
*   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
*   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

*   TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
*   STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
*   NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

*   Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
*   Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

*   Offset lag stuck at ~5k
*   Stable consumer group
*   No events processed
*   No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

  

Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-13 Thread Richard Bosch
Hi WIlliam,

I see from your example that you close the kafka producer in the send
loop, based on the content of sendException that is used in the callback of
the KafkaProducer send.
Since your send loop is a different thread than the KafkaProducer uses to
send you will encounter race conditions on this close logic.

I actually had a similar requirement as yours and solved it by using a
sendException like you do, but I close the KafkaProducer inside the send
callback. The send callback is executed as part of the produce thread, and
closing the consumer there will stop all subsequent batches of processing,
as the current batch isn't finished yet. Combined with idempotence enabled
and max inflight set to 5 (the maximum for idempotence tracking) it gave me
relatively good performance.

Kind regards,


Richard Bosch

Developer Advocate

Axual BV

https://axual.com/