Naancy commented on code in PR #12462: URL: https://github.com/apache/kafka/pull/12462#discussion_r1062120831
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -273,26 +273,29 @@ public RecordAppendResult append(String topic, // check if we have an in-progress batch Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + RecordAppendResult appendResult; synchronized (dq) { // After taking the lock, validate that the partition hasn't changed and retry. if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) { log.trace("Partition {} for topic {} switched by a concurrent append, retrying", partitionInfo.partition(), topic); continue; } - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); - if (appendResult != null) { + appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + if (appendResult != null && !appendResult.newBatchCreated) { topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster); return appendResult; } } - // we don't have an in-progress record batch try to allocate a new batch - if (abortOnNewBatch) { - // Return a result that will cause another call to append. + // either 1. current topicPartition producerBatch is full - return and prepare for another batch/partition. + // 2. no producerBatch existed for this topicPartition, create a new producerBatch. + if (appendResult == null && abortOnNewBatch) { Review Comment: @sudeshwasnik I am following up with this fix to make kafka to have proper roundrobin. i had a thought why cant we put the partitionid in record value..so if the partitioner called twice the paritioner in record will be there and twice call of partitioner wont happen. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1387 if (record.partition() != null) return record.partition(); this always returns null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org