Hi.

> I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent

I still don't get why you need this behavior though, as long as you set
max.in.flight.requests.per.connection to greater than 1, it's impossible to
avoid this because KafkaProducer can do nothing about requests that are
already sent out.

By the way, with appropriate batch.size and linger.ms configuration, you
can achieve high throughput even with
max.in.flight.requests.per.connection=1 which wouldn't be a problem unless
you have to send large data over slow network.

2024年3月11日(月) 22:55 William Lee <ligaopeng...@gmail.com>:

> Hi all,
> I am facing a problem when I detect an exception in kafka producer
> callback, I immediately stop sending more new records and stop the kafka
> producer, but some extra records were still sent.
>
> I found a way to resolve this issue: setting
> max.in.flight.requests.per.connection to 1 and closing kafka producer when
> encountering an exception in kafka producer callback.
> set max.in.flight.requests.per.connection to 1 will make sure only one
> request will be inflight for one partition, and closing kafka producer in
> producer callback will make Sender in "forceClose" state thus avoiding
> sending extra records.
>
> But, as far as I know, setting max.in.flight.requests.per.connection to 1
> will decrease the performance of kafka producer. I would like to know, is
> there any other way to work around this issue without setting
> max.in.flight.requests.per.connection to 1 so that I can ensure performance
> of kafka producer?
>
> here is my demo source code, you can also find it on Github Gist:
> https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576
>
> public class KafkaProducerProblemDemo {
>
>     public static void main(String[] args) {
>         Logger rootLogger = (Logger)
> LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
>         rootLogger.setLevel(Level.INFO);
>
>         String topicName = "test_topic_202403112035";
>         Map<String, String> kafkaTopicConfigs = new HashMap<>();
>         Properties props = new Properties();
>         props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
>         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
>         CreateTopicsResult createTopicsResult;
>         try (AdminClient adminClient = AdminClient.create(props)) {
>             NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
>             newTopic.configs(kafkaTopicConfigs);
>             kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
> "1048576");
>             kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
> "1048576");
>             kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
> "86400000");
>             createTopicsResult =
> adminClient.createTopics(Collections.singletonList(newTopic));
>             System.out.println(createTopicsResult.all().get());
>         } catch (Exception e) {
>             rootLogger.error("create topic error", e);
>         }
>
>         // adjust requestTimeout to ensure the request timeout is enough
>         long requestTimeout = 2000;
>         Properties kafkaProps = new Properties();
>         kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
>         kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringSerializer.class.getName());
>         kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
>         kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
>         kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
>         kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
>         // force one batch per record
>         kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
>         kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>
> kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>
>         try (KafkaProducer<String, byte[]> kafkaProducer = new
> KafkaProducer<>(kafkaProps)) {
>             AtomicBoolean isFirstRecord = new AtomicBoolean(true);
>             AtomicReference<Exception> sendException = new
> AtomicReference<>();
>
>             for (int i = 0; i < 2048; i++) {
>                 String content = String.valueOf(i);
>                 ProducerRecord<String, byte[]> record = new
> ProducerRecord<>(topicName, content.getBytes());
>
>                 if (sendException.get() != null) {
>                     // once found exception in callback, stop sending more
> records
>                     kafkaProducer.close();
>                     break;
>                 }
>
>                 kafkaProducer.send(record, (RecordMetadata metadata,
> Exception exception) -> {
>                     if (isFirstRecord.getAndSet(false)) {
>                         try {
>                             // sleep more than twice the
> DELIVERY_TIMEOUT_MS_CONFIG to make waiting batch expired
>                             // simulate spend too much time in kafka
> callback
>                             Thread.sleep(requestTimeout * 2 + 1000);
>                         } catch (Exception e) {
>                             throw new RuntimeException(e);
>                         }
>                     }
>
>                     if (exception != null) {
>                         rootLogger.error("send data failed, record content:
> {}, reason: {}", content, exception.toString());
>                         sendException.compareAndSet(null, exception);
>                     } else {
>                         rootLogger.info("send data success, offset: {},
> record content: {}", metadata.offset(), content);
>                     }
>                 });
>
>                 Thread.sleep(1000);
>             }
>         } catch (Exception e) {
>             rootLogger.error("send data error", e);
>         }
>     }
> }
>


-- 
========================
Okada Haruki
ocadar...@gmail.com
========================

Reply via email to