Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-14 Thread Haruki Okada
Hi.

> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
this could become a performance bottleneck

As Greg pointed out, this is a trade-off between the ordering-guarantee and
the throughput.
So you should first measure the throughput of
max.in.flight.requests.per.connection=1 with tuning batching config
(batch.size, linger.ms) in your environment and see if it becomes the
bottleneck or not.


2024年3月13日(水) 18:31 William Lee :

> Hi Richard,
> Thanks for replying.
>
> > but I close the KafkaProducer inside the send
> > callback.
> > ...
> >  Combined with idempotence enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
>
> Yes, I also find that closing the KafkaProducer inside the send callback
> can prevent more extra records from being sent. But after some
> investigation into the source code of KafkaProducer and Sender, I think
> closing kafka producer in callback is not 100% reliable in such cases. For
> example, If you set max.in.flight.requests.per.connection to 5, and you
> sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
> callback and you initiated kafka producer closing inside callback, but
> batch No.3 might already in flight which still might be sent to the broker.
> Even though I haven't observed such results during my experiments, I am
> still not sure this is reliable since kafka's official documentation has no
> guarantee about this behaviour.
>
> In the source code of KafkaProducer and Sender, only when
> max.in.flight.requests.per.connection set to 1 will the
> "guaranteeMessageOrder" property set to true thus ensuring only one request
> will be in flight per partition.
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L128
> >
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L538
> >
>
> Do you have any thoughts?
>
> Thanks and regards,
> William Lee
>
> Richard Bosch  于2024年3月13日周三 16:38写道:
>
> > Hi WIlliam,
> >
> > I see from your example that you close the kafka producer in the send
> > loop, based on the content of sendException that is used in the callback
> of
> > the KafkaProducer send.
> > Since your send loop is a different thread than the KafkaProducer uses to
> > send you will encounter race conditions on this close logic.
> >
> > I actually had a similar requirement as yours and solved it by using a
> > sendException like you do, but I close the KafkaProducer inside the send
> > callback. The send callback is executed as part of the produce thread,
> and
> > closing the consumer there will stop all subsequent batches of
> processing,
> > as the current batch isn't finished yet. Combined with idempotence
> enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
> >
> > Kind regards,
> >
> >
> > Richard Bosch
> >
> > Developer Advocate
> >
> > Axual BV
> >
> > https://axual.com/
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-13 Thread William Lee
Hi Richard,
Thanks for replying.

> but I close the KafkaProducer inside the send
> callback.
> ...
>  Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave
me
> relatively good performance.

Yes, I also find that closing the KafkaProducer inside the send callback
can prevent more extra records from being sent. But after some
investigation into the source code of KafkaProducer and Sender, I think
closing kafka producer in callback is not 100% reliable in such cases. For
example, If you set max.in.flight.requests.per.connection to 5, and you
sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
callback and you initiated kafka producer closing inside callback, but
batch No.3 might already in flight which still might be sent to the broker.
Even though I haven't observed such results during my experiments, I am
still not sure this is reliable since kafka's official documentation has no
guarantee about this behaviour.

In the source code of KafkaProducer and Sender, only when
max.in.flight.requests.per.connection set to 1 will the
"guaranteeMessageOrder" property set to true thus ensuring only one request
will be in flight per partition.
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
at master · a0x8o/kafka

kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
at master · a0x8o/kafka


Do you have any thoughts?

Thanks and regards,
William Lee

Richard Bosch  于2024年3月13日周三 16:38写道:

> Hi WIlliam,
>
> I see from your example that you close the kafka producer in the send
> loop, based on the content of sendException that is used in the callback of
> the KafkaProducer send.
> Since your send loop is a different thread than the KafkaProducer uses to
> send you will encounter race conditions on this close logic.
>
> I actually had a similar requirement as yours and solved it by using a
> sendException like you do, but I close the KafkaProducer inside the send
> callback. The send callback is executed as part of the produce thread, and
> closing the consumer there will stop all subsequent batches of processing,
> as the current batch isn't finished yet. Combined with idempotence enabled
> and max inflight set to 5 (the maximum for idempotence tracking) it gave me
> relatively good performance.
>
> Kind regards,
>
>
> Richard Bosch
>
> Developer Advocate
>
> Axual BV
>
> https://axual.com/
>


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-13 Thread Richard Bosch
Hi WIlliam,

I see from your example that you close the kafka producer in the send
loop, based on the content of sendException that is used in the callback of
the KafkaProducer send.
Since your send loop is a different thread than the KafkaProducer uses to
send you will encounter race conditions on this close logic.

I actually had a similar requirement as yours and solved it by using a
sendException like you do, but I close the KafkaProducer inside the send
callback. The send callback is executed as part of the produce thread, and
closing the consumer there will stop all subsequent batches of processing,
as the current batch isn't finished yet. Combined with idempotence enabled
and max inflight set to 5 (the maximum for idempotence tracking) it gave me
relatively good performance.

Kind regards,


Richard Bosch

Developer Advocate

Axual BV

https://axual.com/


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi Greg,
Thanks for replying.

> From your description, it sounds like you want the success/failure of
> a callback to influence whether that record (and later records) are
> present in the topic. Is this correct?
Yes

> The solution that you posted does actually write a record that has an
> erroneous callback, is that desirable, or would you want that record
> to also be rejected?
The source code demonstrated the original problem, not the solution.
What I want is: once an exception is present in producer callback, I would
assume the record is not delivered to the broker and I would like all
records after the exception to be suspended from sending to the broker so
that all records already delivered to the broker is strictly ordered after
the producer was closed.
In the source code demo, the former batch met an exception, but the latter
batch was sent successfully even though I initiated the producer closing
operation immediately when I detected the exception. That is not what I
want.
As for "does actually write a record that has an erroneous callback" you
said, I noticed this problem, but this is not the key point of my problem
so I did not mention it.

As for transactional producers, transactional producers are not suited to
my user case. My data is database CDC(change data capture) data which
should maintain order strictly(similar to debezium project). There is no
need for me to use a transactional producer.

> I think you should carefully consider throwing delivery-critical
> errors from the callback, as that is not a common workflow. Could
> those errors be moved to a different part of the pipeline, such as the
> consumer application?
The problem is not related to the consumer side. I do want to achieve
Exactly Once delivery. I previously thought idempotent producer could be a
solution, but I later found that idempotent producer could only guarantee
ordering when kafka is retrying producer batch internally.

Thanks and regards,
William

Greg Harris  于2024年3月12日周二 00:50写道:

> Hi William,
>
> From your description, it sounds like you want the success/failure of
> a callback to influence whether that record (and later records) are
> present in the topic. Is this correct?
> The solution that you posted does actually write a record that has an
> erroneous callback, is that desirable, or would you want that record
> to also be rejected?
>
> This sounds like a use-case for transactional producers [1] utilizing
> Exactly Once delivery. You can start a transaction, emit records, have
> them persisted in Kafka, perform some computation afterwards, and then
> decide whether to commit or abort the transaction based on the result
> of that computation.
>
> There is also a performance penalty to transactional producers, but it
> is different from the max.in.flight.requests.per.connection bottleneck
> and not directly comparable.
> I think you should carefully consider throwing delivery-critical
> errors from the callback, as that is not a common workflow. Could
> those errors be moved to a different part of the pipeline, such as the
> consumer application?
>
> And since you're performance sensitive, please be aware that
> performance (availability) nearly always comes at the cost of delivery
> guarantees (consistency) [2]. If you're not willing to pay the
> performance cost of max.in.flight.requests.per.connection=1, then you
> may need to make a compromise on the consistency and find a way to
> manage the extra data.
>
> Thanks,
> Greg Harris
>
> [1]
> https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> [2] https://en.wikipedia.org/wiki/CAP_theorem
>
> On Mon, Mar 11, 2024 at 7:32 AM William Lee 
> wrote:
> >
> > Hi Haruki,
> > Thanks for your answer.
> > > I still don't get why you need this behavior though
> > The reason is I have to ensure message ordering per partition strictly.
> > Once there is an exception in the producer callback, it indicates that
> the
> > exception is not a retryable exception(from kafka producer's
> perspective).
> > There must be something wrong, so I have to stop sending records and
> > resolve the underlying issue.
> >
> > As for the performance problem, I found a kafka wiki which investigated
> the
> > impact of max.in.flight.requests.per.connection: An analysis of the
> impact
> > of max.in.flight.requests.per.connection and acks on Producer
> performance -
> > Apache Kafka - Apache Software Foundation
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance
> >
> > From the wiki, max.in.flight.requests.per.connection is better set to 2
> or
> > more.
> >
> > By setting max.in.flight.requests.per.connection to 1, I'm concerned that
> > this could become a performance bottleneck
>


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Greg Harris
Hi William,

>From your description, it sounds like you want the success/failure of
a callback to influence whether that record (and later records) are
present in the topic. Is this correct?
The solution that you posted does actually write a record that has an
erroneous callback, is that desirable, or would you want that record
to also be rejected?

This sounds like a use-case for transactional producers [1] utilizing
Exactly Once delivery. You can start a transaction, emit records, have
them persisted in Kafka, perform some computation afterwards, and then
decide whether to commit or abort the transaction based on the result
of that computation.

There is also a performance penalty to transactional producers, but it
is different from the max.in.flight.requests.per.connection bottleneck
and not directly comparable.
I think you should carefully consider throwing delivery-critical
errors from the callback, as that is not a common workflow. Could
those errors be moved to a different part of the pipeline, such as the
consumer application?

And since you're performance sensitive, please be aware that
performance (availability) nearly always comes at the cost of delivery
guarantees (consistency) [2]. If you're not willing to pay the
performance cost of max.in.flight.requests.per.connection=1, then you
may need to make a compromise on the consistency and find a way to
manage the extra data.

Thanks,
Greg Harris

[1] 
https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
[2] https://en.wikipedia.org/wiki/CAP_theorem

On Mon, Mar 11, 2024 at 7:32 AM William Lee  wrote:
>
> Hi Haruki,
> Thanks for your answer.
> > I still don't get why you need this behavior though
> The reason is I have to ensure message ordering per partition strictly.
> Once there is an exception in the producer callback, it indicates that the
> exception is not a retryable exception(from kafka producer's perspective).
> There must be something wrong, so I have to stop sending records and
> resolve the underlying issue.
>
> As for the performance problem, I found a kafka wiki which investigated the
> impact of max.in.flight.requests.per.connection: An analysis of the impact
> of max.in.flight.requests.per.connection and acks on Producer performance -
> Apache Kafka - Apache Software Foundation
> 
> From the wiki, max.in.flight.requests.per.connection is better set to 2 or
> more.
>
> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
> this could become a performance bottleneck


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi Haruki,
Thanks for your answer.
> I still don't get why you need this behavior though
The reason is I have to ensure message ordering per partition strictly.
Once there is an exception in the producer callback, it indicates that the
exception is not a retryable exception(from kafka producer's perspective).
There must be something wrong, so I have to stop sending records and
resolve the underlying issue.

As for the performance problem, I found a kafka wiki which investigated the
impact of max.in.flight.requests.per.connection: An analysis of the impact
of max.in.flight.requests.per.connection and acks on Producer performance -
Apache Kafka - Apache Software Foundation

>From the wiki, max.in.flight.requests.per.connection is better set to 2 or
more.

By setting max.in.flight.requests.per.connection to 1, I'm concerned that
this could become a performance bottleneck


Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Haruki Okada
Hi.

> I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent

I still don't get why you need this behavior though, as long as you set
max.in.flight.requests.per.connection to greater than 1, it's impossible to
avoid this because KafkaProducer can do nothing about requests that are
already sent out.

By the way, with appropriate batch.size and linger.ms configuration, you
can achieve high throughput even with
max.in.flight.requests.per.connection=1 which wouldn't be a problem unless
you have to send large data over slow network.

2024年3月11日(月) 22:55 William Lee :

> Hi all,
> I am facing a problem when I detect an exception in kafka producer
> callback, I immediately stop sending more new records and stop the kafka
> producer, but some extra records were still sent.
>
> I found a way to resolve this issue: setting
> max.in.flight.requests.per.connection to 1 and closing kafka producer when
> encountering an exception in kafka producer callback.
> set max.in.flight.requests.per.connection to 1 will make sure only one
> request will be inflight for one partition, and closing kafka producer in
> producer callback will make Sender in "forceClose" state thus avoiding
> sending extra records.
>
> But, as far as I know, setting max.in.flight.requests.per.connection to 1
> will decrease the performance of kafka producer. I would like to know, is
> there any other way to work around this issue without setting
> max.in.flight.requests.per.connection to 1 so that I can ensure performance
> of kafka producer?
>
> here is my demo source code, you can also find it on Github Gist:
> https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576
>
> public class KafkaProducerProblemDemo {
>
> public static void main(String[] args) {
> Logger rootLogger = (Logger)
> LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
> rootLogger.setLevel(Level.INFO);
>
> String topicName = "test_topic_202403112035";
> Map kafkaTopicConfigs = new HashMap<>();
> Properties props = new Properties();
> props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
> props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> CreateTopicsResult createTopicsResult;
> try (AdminClient adminClient = AdminClient.create(props)) {
> NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
> newTopic.configs(kafkaTopicConfigs);
> kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
> "8640");
> createTopicsResult =
> adminClient.createTopics(Collections.singletonList(newTopic));
> System.out.println(createTopicsResult.all().get());
> } catch (Exception e) {
> rootLogger.error("create topic error", e);
> }
>
> // adjust requestTimeout to ensure the request timeout is enough
> long requestTimeout = 2000;
> Properties kafkaProps = new Properties();
> kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringSerializer.class.getName());
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
> // force one batch per record
> kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
> kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>
> kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>
> try (KafkaProducer kafkaProducer = new
> KafkaProducer<>(kafkaProps)) {
> AtomicBoolean isFirstRecord = new AtomicBoolean(true);
> AtomicReference sendException = new
> AtomicReference<>();
>
> for (int i = 0; i < 2048; i++) {
> String content = String.valueOf(i);
> ProducerRecord record = new
> ProducerRecord<>(topicName, content.getBytes());
>
> if (sendException.get() != null) {
> // once found exception in callback, stop sending more
> records
> kafkaProducer.close();
> break;
> }
>
> kafkaProducer.send(record, (RecordMetadata metadata,
> Exception exception) -> {
> if (isFirstRecord.getAndSet(false)) {
> try {
>

Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread William Lee
Hi all,
I am facing a problem when I detect an exception in kafka producer
callback, I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent.

I found a way to resolve this issue: setting
max.in.flight.requests.per.connection to 1 and closing kafka producer when
encountering an exception in kafka producer callback.
set max.in.flight.requests.per.connection to 1 will make sure only one
request will be inflight for one partition, and closing kafka producer in
producer callback will make Sender in "forceClose" state thus avoiding
sending extra records.

But, as far as I know, setting max.in.flight.requests.per.connection to 1
will decrease the performance of kafka producer. I would like to know, is
there any other way to work around this issue without setting
max.in.flight.requests.per.connection to 1 so that I can ensure performance
of kafka producer?

here is my demo source code, you can also find it on Github Gist:
https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576

public class KafkaProducerProblemDemo {

public static void main(String[] args) {
Logger rootLogger = (Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
rootLogger.setLevel(Level.INFO);

String topicName = "test_topic_202403112035";
Map kafkaTopicConfigs = new HashMap<>();
Properties props = new Properties();
props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
CreateTopicsResult createTopicsResult;
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
newTopic.configs(kafkaTopicConfigs);
kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
"1048576");
kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
"1048576");
kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
"8640");
createTopicsResult =
adminClient.createTopics(Collections.singletonList(newTopic));
System.out.println(createTopicsResult.all().get());
} catch (Exception e) {
rootLogger.error("create topic error", e);
}

// adjust requestTimeout to ensure the request timeout is enough
long requestTimeout = 2000;
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class.getName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
// force one batch per record
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

try (KafkaProducer kafkaProducer = new
KafkaProducer<>(kafkaProps)) {
AtomicBoolean isFirstRecord = new AtomicBoolean(true);
AtomicReference sendException = new
AtomicReference<>();

for (int i = 0; i < 2048; i++) {
String content = String.valueOf(i);
ProducerRecord record = new
ProducerRecord<>(topicName, content.getBytes());

if (sendException.get() != null) {
// once found exception in callback, stop sending more
records
kafkaProducer.close();
break;
}

kafkaProducer.send(record, (RecordMetadata metadata,
Exception exception) -> {
if (isFirstRecord.getAndSet(false)) {
try {
// sleep more than twice the
DELIVERY_TIMEOUT_MS_CONFIG to make waiting batch expired
// simulate spend too much time in kafka
callback
Thread.sleep(requestTimeout * 2 + 1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

if (exception != null) {
rootLogger.error("send data failed, record content:
{}, reason: {}", content, exception.toString());
sendException.compareAndSet(null, exception);
} else {
rootLogger.info("send data success, offset: {},
record content: {}", metadata.offset(), content);