[GitHub] [kafka] showuon commented on a diff in pull request #12058: MINOR: Scala cleanups in core

2022-04-19 Thread GitBox


showuon commented on code in PR #12058:
URL: https://github.com/apache/kafka/pull/12058#discussion_r853770995


##
core/src/main/scala/kafka/server/PartitionMetadataFile.scala:
##
@@ -44,8 +44,7 @@ object PartitionMetadataFile {
   }
 
   class PartitionMetadataReadBuffer[T](location: String,
-   reader: BufferedReader,
-   version: Int) extends Logging {
+   reader: BufferedReader) extends Logging 
{

Review Comment:
   nice cleanup for unused argument



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 opened a new pull request, #12070: KAFKA-13838: Improve the poll method of ConsumerNetworkClient

2022-04-19 Thread GitBox


RivenSun2 opened a new pull request, #12070:
URL: https://github.com/apache/kafka/pull/12070

   Improve the poll method of ConsumerNetworkClient
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-19 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524707#comment-17524707
 ] 

RivenSun commented on KAFKA-13838:
--

Hi [~guozhang]  [~hachikuji]  , [~showuon]
Could you give some suggestions for this issue?
Thanks.

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-19 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524705#comment-17524705
 ] 

RivenSun commented on KAFKA-13838:
--

After looking at the code carefully, the comments on the second trySend call.
`buffer space may have been cleared` should mean that the send in the 
KafkaChannel has completed and was reset to null.

In this case, we should then call networkClient.poll(...) again, otherwise the 
first trySend call will be meaningless the next time we enter 
ConsumerNetworkClient.poll(...).

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13816) Downgrading Connect rebalancing protocol from incremental to eager causes duplicate task instances

2022-04-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524701#comment-17524701
 ] 

Sagar Rao commented on KAFKA-13816:
---

hey [~ChrisEgerton] , thanks for the input! No worries, I will try to take a 
stab at it :) 

> Downgrading Connect rebalancing protocol from incremental to eager causes 
> duplicate task instances
> --
>
> Key: KAFKA-13816
> URL: https://issues.apache.org/jira/browse/KAFKA-13816
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> The rebalancing protocol for a Kafka Connect cluster can be downgraded from 
> incremental to eager by adding a worker to the cluster with 
> {{connect.protocol}} set to {{{}eager{}}}, or by stopping an existing worker 
> in that cluster, reconfiguring it with the new protocol, and restarting it.
> When the worker (re)joins the cluster, a rebalance takes place using the 
> eager protocol, and duplicate task instances are created on the cluster.
> This occurs because:
>  * The leader does not send out an assignment that revokes all connectors and 
> tasks for the cluster during that round
>  * Workers do not respond to the downgrade in protocol by revoking all 
> connectors and tasks that they were running before the rebalance that are not 
> included in the new assignment they received during the rebalance
> It's likely that this bug hasn't surfaced sooner because any subsequent 
> rebalance should cause all connectors and tasks on all each in the cluster to 
> be proactively revoked before the worker rejoins the group.
> [KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-Compatibility,Deprecation,andMigrationPlan]
>  provides one way to address this:
> {quote}To downgrade your cluster to use protocol version 0 from version 1 or 
> higher with {{eager}} rebalancing policy what is required is to switch one of 
> the workers back to {{eager}} mode. 
> {panel}
>  {panel}
> |{{connect.protocol = eager}}|
> Once this worker joins, the group will downgrade to protocol version 0 and 
> {{eager}} rebalancing policy, with immediately release of resources upon 
> joining the group. This process will require a one-time double rebalancing, 
> with the leader detecting the downgrade and first sending a downgraded 
> assignment with empty assigned connectors and tasks and from then on just 
> regular downgraded assignments. 
> {quote}
> However, it's unclear how to accomplish the second round in the double 
> rebalance described above.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-19 Thread RivenSun (Jira)
RivenSun created KAFKA-13838:


 Summary: Improve the poll method of ConsumerNetworkClient
 Key: KAFKA-13838
 URL: https://issues.apache.org/jira/browse/KAFKA-13838
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: RivenSun
Assignee: RivenSun


Briefly describe the process of sending clientRequest on the Kafka Client side, 
which is divided into two steps.

1.Selector.send(send) method
Kafka's underlying tcp connection channel ensures that data is sent to the 
network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at a 
time{*}. And the next InFlightRequest is allowed to be added only if the 
{color:#ff}queue.peekFirst().send.completed(){color} condition is met.
{code:java}
NetworkClient.isReady(node) ->
NetworkClient.canSendRequest(node) -> 
InFlightRequests.canSendMore(node){code}

2. Selector.poll(timeout)
After KafkaChannel sets a send each time, there should be a 
Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments on 
the Selector.send(send) method.
{code:java}
/**
 * Queue the given request for sending in the subsequent {@link #poll(long)} 
calls
 * @param send The request to send
 */
public void send(NetworkSend send) { {code}
Send may become *completed* *only after* the Selector.poll(timeout) method is 
executed, more detail see Selector.write(channel) methos.

 

Let's go back and look at this method: ConsumerNetworkClient.poll(Timer timer, 
PollCondition pollCondition, boolean disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
 client.poll(...)
->
trySend(timer.currentTimeMs());

{code}
There are two problems with this process:

1. After calling the trySend(...) method for the second time, we should 
immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
send generated each time can be consumed by the next Selector.poll() method.

2. The while loop in trySend(...) method can be removed
After a node executes client.send(request, now) for the first time, because the 
first request will never be *completed* here, the subsequent requests will 
never satisfy the client.ready(node, now) condition.
Although the current code will break directly on the second execution of the 
loop, there will be {*}an additional execution of the loop{*}.
Modify the code as follows:
{code:java}
long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;

// send any requests that can be sent now
for (Node node : unsent.nodes()) {
Iterator iterator = unsent.requestIterator(node);
if (iterator.hasNext()) {
pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
if (client.ready(node, now)) {
client.send(iterator.next(), now);
iterator.remove();
}
}
}
return pollDelayMs;
}{code}
3. By the way, the unsent.clean() method that is executed last can also be 
optimized.
Easier to read the code.
{code:java}
public void clean() {
// the lock protects removal from a concurrent put which could otherwise 
mutate the
// queue after it has been removed from the map
synchronized (unsent) {
unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
}
} {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] showuon commented on a diff in pull request #12066: KAFKA-13834: fix drain batch starving issue

2022-04-19 Thread GitBox


showuon commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r853683790


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+

Review Comment:
   nit: remove this line



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -81,7 +82,7 @@ public class RecordAccumulator {
 private final IncompleteBatches incomplete;
 // The following variables are only accessed by the sender thread, so we 
don't need to protect them.
 private final Set muted;
-private int drainIndex;
+private Map nodesDrainIndex;

Review Comment:
   private `final` Map nodesDrainIndex;



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -634,10 +634,20 @@ private List 
drainBatchesForOneNode(Cluster cluster, Node node, i
 ready.add(batch);
 
 batch.drained(now);
+drainIndex = (drainIndex + 1) % parts.size();
 } while (start != drainIndex);
+updateDrainIndex(node.idString(), drainIndex);
 return ready;
 }
 
+int getDrainIndex(String idString) {
+return nodesDrainIndex.computeIfAbsent(idString, s -> 0);
+}
+
+void updateDrainIndex(String idString, int drainIndex) {

Review Comment:
   private method, please



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -98,6 +99,54 @@ public void teardown() {
 this.metrics.close();
 }
 
+@Test
+@SuppressWarnings("unchecked")

Review Comment:
   why unchecked?



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -98,6 +99,54 @@ public void teardown() {
 this.metrics.close();
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testDrainBatchesStarve() throws Exception {
+// test case: node1(tp1,tp2) , node2(tp3,tp4)
+// add tp-4
+int partition4 = 3;
+TopicPartition tp4 = new TopicPartition(topic, partition4);
+PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1,part2, part3,part4),
+Collections.emptySet(), Collections.emptySet());
+
+//  initial data
+accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+// drain 2 record for tp1 , tp3
+Map> batches1 = accum.drain(cluster, new 
HashSet(Arrays.asList(node1,node2)), (int) batchSize, 0);
+assertEquals(2,batches1.size());
+judgeValidTp(batches1,tp1,tp3);
+
+// add record for tp1, tp3
+accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+// drain 2 record for tp2, tp4
+Map> batchss2 = accum.drain(cluster, new 
HashSet(Arrays.asList(node1,node2)), (int) batchSize, 0);
+judgeValidTp(batchss2,tp2,tp4);

Review Comment:
   We can continue the test to make sure next run, the index will start from 
the beginning (i.e. drainIndex % part.size), that is: node1 => tp1, node2 => tp3



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -98,6 +99,54 @@ public void teardown() {
 this.metrics.close();
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testDrainBatchesStarve() throws Exception {
+// test case: node1(tp1,tp2) , node2(tp3,tp4)
+// add tp-4
+int partition4 = 3;
+TopicPartition tp4 = new TopicPartition(topic, partition4);
+PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+Cluster cluster = new Cluster(null, Arrays.a

[GitHub] [kafka] RivenSun2 commented on pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on PR #12010:
URL: https://github.com/apache/kafka/pull/12010#issuecomment-1103427970

   Hi @C0urante and @divijvaidya 
   Thank you for your review.
   I just responded to your comment and resubmitted the code changes as well.
   If you have time, please help to review it again.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-19 Thread shizhenzhen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524687#comment-17524687
 ] 

shizhenzhen commented on KAFKA-13834:
-

[~guozhang] [~showuon]   
Done!
Thanks!

> batch drain for nodes might have starving issue
> ---
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png|width=786,height=266!
>   
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png|width=798,height=526!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerBatch in the second and third TopicPartition queues 
> of each

[jira] [Updated] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-19 Thread shizhenzhen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhenzhen updated KAFKA-13834:

Description: 
h3. 问题代码 problem code

RecordAccumulator#drainBatchesForOneNode

!https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png|width=786,height=266!
  

问题出在这个, private int drainIndex;

The problem is this,private int drainIndex;
h3. 代码预期 code expectations

这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。

因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。

简单来说呢就是下图这样

 

The logic of the code at this end is to calculate the ProducerBatchs sent to 
each Node, which is sent in batches.

Because the amount of requests sent at one time is limited (max.request.size), 
only a few ProducerBatch may be sent at a time. Then after sending this time, 
you need to record which Batch is traversed here, and the next time you 
traverse it again Can continue the last traversal send.

Simply put, it is as follows

 

!image-2022-04-18-17-36-47-393.png|width=798,height=526!

 

 

 
h3. 实际情况 The actual situation

但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。

那么通常会有很多个Node需要进行遍历, 
上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.

正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。

怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。

However, because the index drainIndex above is a global variable shared by 
RecordAccumulator.

Then there are usually many Nodes that need to be traversed, and the index of 
the previous Node will be used by the second and third Nodes, so it is 
impossible to traverse each TopicPartition in a balanced and reasonable manner.

Under normal circumstances, there is nothing wrong with this. If there is no 
extreme situation, it can basically be traversed.

I'm afraid of extreme situations, which will result in many TopicPartitions 
that cannot be traversed, and some messages will not be sent out all the time.
h3. 造成的影响 impact

导致部分消息一直发送不出去、或者很久才能够发送出去。

As a result, some messages cannot be sent out, or can take a long time to be 
sent out.
h3. 触发异常情况的一个Case /  A Case that triggers an exception

该Case场景如下:
 # 生产者向3个Node发送消息
 # 每个Node都是3个TopicPartition
 # 每个TopicPartition队列都一直源源不断的写入消息、
 # max.request.size 刚好只能存放一个ProdcuerBatch的大小。

就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。

开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。

那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。

那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。

这一次的Node遍历结束之后把消息发送之后

又接着上面的请求流程,那么这个时候的drainIndex=3了。

遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。

也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。

也就发送不出去了。

 

The case scenario is as follows:

Producer sends message to 3 Nodes
Each Node is 3 TopicPartitions
Each TopicPartition queue has been continuously writing messages,
max.request.size can only store the size of one ProdcuerBatch.

It is these conditions that cause each Node to receive only one PrdoucerBatch 
message in the TopicPartition queue.

At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
ready to start traversing the ProducerBatch in several queues below it. After 
traversing once, drainIndex + 1. After traversing a queue, it is full of 
requests for this batch.

Then start traversing Node-1. At this time, drainIndex=1, and the second 
TopicPartition is traversed first. Then I found that a Batch was also full.

Then start traversing Node-1. At this time, drainIndex=2, and the third 
TopicPartition is traversed first. Then I found that a Batch was also full.

After this Node traversal is over, the message is sent

Then the above request process is followed, then drainIndex=3 at this time.

Traversing Node-0, which TopicPartition is obtained by taking the modulo 
calculation at this time? Isn't that the first one? Equivalent to the following 
process is exactly the same as above.

As a result, the ProducerBatch in the second and third TopicPartition queues of 
each Node can never be traversed.

It can't be sent.

!https://img-blog.csdnimg.cn/aa2cc2e7a9ff4536a1800d9117e02555.png#pic_center|width=660,height=394!

 
h3. 解决方案  solution

只需要每个Node,维护一个自己的索引就行了。

 

 

Only each Node needs to maintain its own index.

 

  was:
h3. 问题代码 problem code

RecordAccumulator#drainBatchesForOneNode

!https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述

问题出在这个, private int drainIndex;

The problem is this,private int drainIndex;
h3. 代码预期 code expectations

这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。

因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。

简单来说呢就是下图这样

 

The logic of the code at this end is to calculate the ProducerBatchs sent to 
each Node, which is sen

[jira] [Updated] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-19 Thread shizhenzhen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shizhenzhen updated KAFKA-13834:

Summary: batch drain for nodes might have starving issue  (was: Some 
problems with producers choosing batches of messages to send)

> batch drain for nodes might have starving issue
> ---
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerBatch in the second and third TopicPartition queues 
> of eac

[GitHub] [kafka] RivenSun2 commented on pull request #12069: MINOR: Improve postProcessAndValidateIdempotenceConfigs method

2022-04-19 Thread GitBox


RivenSun2 commented on PR #12069:
URL: https://github.com/apache/kafka/pull/12069#issuecomment-1103420473

   Hi @showuon @guozhangwang 
   could you help to review this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853680081


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),
 Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   Hi divijvaidya , here we only need to check if the value is null,
   I can add a case later to override the case you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853680081


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),
 Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   Yes, here we only need to check if the value is null,
   I can add a case later to override the case you said



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 opened a new pull request, #12069: MINOR: Improve postProcessAndValidateIdempotenceConfigs method

2022-04-19 Thread GitBox


RivenSun2 opened a new pull request, #12069:
URL: https://github.com/apache/kafka/pull/12069

   Improve postProcessAndValidateIdempotenceConfigs method
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853664532


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1324,9 +1324,9 @@ object KafkaConfig {
   .define(SslEngineFactoryClassProp, CLASS, null, LOW, 
SslEngineFactoryClassDoc)
 
   /** * Sasl Configuration /
-  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
+  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
   .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
-  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
MEDIUM, SaslEnabledMechanismsDoc)
+  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
new BrokerSecurityConfigs.SaslEnabledMechanismsValidator(), MEDIUM, 
SaslEnabledMechanismsDoc)

Review Comment:
   On a second thought, SASL modules are highly flexible and customizable.
   And this parameter is not a required parameter, which may cause 
compatibility problems when adding a validator.
   So to not make things more complicated, remove the validator for this 
parameter.
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853664320


##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -202,7 +202,7 @@ public static void addClientSaslSupport(ConfigDef config) {
 .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, 
Range.between(0.0, 0.25), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
 .define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
Range.between(0, 900), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
 .define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, 
Range.between(0, 3600), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
-.define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_MECHANISM_DOC)
+.define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), 
ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC)

Review Comment:
   Hi @C0urante 
   On a second thought, SASL modules are highly flexible and customizable.
   And this parameter is not a required parameter, which may cause 
compatibility problems when adding a validator.
   So to not make things more complicated, remove the validator for this 
parameter.
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853657790


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   1. Both `appendSerializerToConfig` and  `appendDeserializerToConfig` methods 
are `protected`, and users cannot access them directly.
   2. The initialization method of KafkaProducer or KafkaConsumer with 
ConsumerConfig/ProducerConfig is also `protected`.
   3. This validator will only be executed after calling 
appendDeserializerToConfig/appendSerializerToConfig method .
   For example, if the user shows that the value of `key.serializer` is set to 
null but the valid keySerializer is passed in, the validator will not throw a 
`ConfigException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853652601


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),
 Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   thanks for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853652462


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -279,7 +281,7 @@ protected static ConfigDef baseConfigDef() {
 "", Importance.LOW,
 CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
 .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
-ConfigDef.Type.STRING, "none", 
ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
+ConfigDef.Type.STRING, 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT), 
in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853652268


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
 assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
 new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 }
+
+@Test
+public void testInvalidKeyDeserializer() {
+Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
+
assertTrue(ce.getMessage().contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+}
+
+@Test

Review Comment:
   Sure, i will improve related testcases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853651665


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null, "entry must be non 
null");
+}
+
+@SuppressWarnings("unchecked")
+List mechanismStrings = (List) value;
+
+if (mechanismStrings.isEmpty()) {
+throw new ConfigException(name, null, "entry must be non-empty 
list");
+}
+
+mechanismStrings.forEach(mechanism -> {
+if (mechanism == null || mechanism.isEmpty()) {
+throw new ConfigException(name, mechanism, "enabled 
mechanism must be non-null or non-empty string");

Review Comment:
   thanks for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853651110


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null, "entry must be non 
null");
+}
+
+@SuppressWarnings("unchecked")
+List mechanismStrings = (List) value;
+
+if (mechanismStrings.isEmpty()) {
+throw new ConfigException(name, null, "entry must be non-empty 
list");
+}
+
+mechanismStrings.forEach(mechanism -> {
+if (mechanism == null || mechanism.isEmpty()) {

Review Comment:
   thanks for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853650633


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {

Review Comment:
   thanks for your review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853650163


##
clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java:
##
@@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) {
 }
 return null;
 }
+
+@Override
+public String toString() {
+return name().toLowerCase(Locale.ROOT);

Review Comment:
   Sure, thanks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853650023


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1324,9 +1324,9 @@ object KafkaConfig {
   .define(SslEngineFactoryClassProp, CLASS, null, LOW, 
SslEngineFactoryClassDoc)
 
   /** * Sasl Configuration /
-  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
+  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
   .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
-  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
MEDIUM, SaslEnabledMechanismsDoc)
+  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
new BrokerSecurityConfigs.SaslEnabledMechanismsValidator(), MEDIUM, 
SaslEnabledMechanismsDoc)

Review Comment:
   I think this validator still executes when SASL is not enabled on the 
broker. Because the validator is executed when KafkaConfig (extends 
AbstractConfig) is initialized.
   
   I'll verify your case later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853646332


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
 assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
 new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 }
+
+@Test
+public void testInvalidKeyDeserializer() {

Review Comment:
   Thanks for your suggestion, I will improve the relevant testcase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853645330


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -486,10 +488,14 @@ public class ConsumerConfig extends AbstractConfig {
 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
 .define(KEY_DESERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruanliang-hualun commented on a diff in pull request #12066: KAFKA-13834: fix drain batch starving issue

2022-04-19 Thread GitBox


ruanliang-hualun commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r853641392


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -48,18 +48,11 @@
 import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;

Review Comment:
   thanks for the advice, I've already corrected it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853583209


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1324,9 +1324,9 @@ object KafkaConfig {
   .define(SslEngineFactoryClassProp, CLASS, null, LOW, 
SslEngineFactoryClassDoc)
 
   /** * Sasl Configuration /
-  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
+  .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
   .define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
-  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
MEDIUM, SaslEnabledMechanismsDoc)
+  .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
new BrokerSecurityConfigs.SaslEnabledMechanismsValidator(), MEDIUM, 
SaslEnabledMechanismsDoc)

Review Comment:
   (@RivenSun2 I think your second comment was meant as a reply to [this 
one](https://github.com/apache/kafka/pull/12010#discussion_r852477821), right?)
   
   In response to the first comment:
   
   Similar point as above, but it's less about having to permit a value of 
`,,,` because people would _want_ to set it, and more because it's not worth 
breaking things for people who already do have it set and can get bitten by the 
upgrade.
   
   Interesting that this validator doesn't seem to take effect when SASL isn't 
enabled on the broker, though. Have you tested this change at all? I understand 
there are unit tests; I'm more wondering if it actually causes a different, 
more-useful error message to get logged to the user when this property is 
invalid (and is actually used by the broker)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853568629


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   This is a fair point... people may be using 
`ProducerConfig`/`ConsumerConfig` instances on their own without using a 
`KafkaProducer`/`KafkaConsumer` to construct them. In that case, if they 
haven't specified a value (or have explicitly specified `null`) for the 
key/value serializer/deserializer class, then upgrading to a version of Kafka 
that contains this change will cause things to break.
   
   I'm unclear on how/why we'd want to change `appendSerializerToConfig`, 
though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853568629


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   This is a fair point... people may be using 
`ProducerConfig`/`ConsumerConfig` instances on their own without using a 
`KafkaProducer`/`KafkaConsumer` to construct them. In that case, if they 
haven't specified a value (or have explicitly specified `null`) for the 
key/value serializer/deserializer class, then upgrading to a version of Kafka 
that contains this change will cause things to break.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853568629


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   I think this is a fair point... people may be using 
`ProducerConfig`/`ConsumerConfig` instances on their own without using a 
`KafkaProducer`/`KafkaConsumer` to construct them. In that case, if they 
haven't specified a value (or have explicitly specified `null`) for the 
key/value serializer/deserializer class, then upgrading to a version of Kafka 
that contains this change will cause things to break.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853564467


##
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##
@@ -393,10 +394,14 @@ public class ProducerConfig extends AbstractConfig {
 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
 .define(KEY_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),
 Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
 .define(VALUE_SERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   We don't need a non-empty validator; any value that's non-null and doesn't 
refer to a valid Java class will already be met with a `ConfigException` (see 
[here](https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L728-L735)
 and 
[here](https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L744)).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853549366


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
 assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
 new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 }
+
+@Test
+public void testInvalidKeyDeserializer() {

Review Comment:
   I was thinking of some way of simulating people using that constructor in 
their consumer with a non-null serializer, but a null value for the property 
for that serializer in the config map. Something like this:
   
   ```java
   Map props = new HashMap<>();
   
   // populate props with basic required properties like bootstrap.servers
   
   props.put("key.deserializer", null);
   props.put("value.deserializer", null);
   Consumer consumer = new KafkaConsumer<>(props, new 
StringDeserializer(), new StringDeserializer());
   ```
   
   It does look like we provide pretty good coverage for that case in the 
`testAppendDeserializerToConfig` though, so we could also hold off for now. 
There is technically a gap in coverage with the case where the value for the 
property is null _and_ the user passes in a null `Deserializer` instance to the 
consumer, but up to you if you feel it's worth addressing or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853542111


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null, "entry must be non 
null");
+}
+
+@SuppressWarnings("unchecked")
+List mechanismStrings = (List) value;
+
+if (mechanismStrings.isEmpty()) {
+throw new ConfigException(name, null, "entry must be non-empty 
list");
+}
+
+mechanismStrings.forEach(mechanism -> {
+if (mechanism == null || mechanism.isEmpty()) {
+throw new ConfigException(name, mechanism, "enabled 
mechanism must be non-null or non-empty string");

Review Comment:
   The message that users will see from this exception will be "Invalid value 
 for configuration sasl.enabled.mechanisms: enabled mechanism must 
be non-null or non-empty string" (see 
[ConfigException](https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/clients/src/main/java/org/apache/kafka/common/config/ConfigException.java#L37)
 for the message generation logic).
   
   Considering the message will already include the name of the property, which 
itself mentions SASL, I'm not sure it's really necessary to specify it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853537865


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {

Review Comment:
   This is not a part of the public API (notice the `internals` in the package 
name) and Javadocs are overkill when the implementation is this straightforward 
IMO.
   
   That said, it might be helpful to declare the class as `private` and move it 
to the only class that currently uses it (`KafkaConfigs.scala`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853538239


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null, "entry must be non 
null");
+}
+
+@SuppressWarnings("unchecked")
+List mechanismStrings = (List) value;
+
+if (mechanismStrings.isEmpty()) {
+throw new ConfigException(name, null, "entry must be non-empty 
list");
+}
+
+mechanismStrings.forEach(mechanism -> {
+if (mechanism == null || mechanism.isEmpty()) {

Review Comment:
   This is unnecessary; strings are [automatically 
trimmed](https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L655)
 when parsed during validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853537865


##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {

Review Comment:
   This is not a part of the public API (notice the `internals` in the package 
name) and Javadocs are overkill when the implementation is this straightforward 
IMO.
   
   That said, it might be helpful to declare the class as `private` just to 
signal that it's not used anywhere outside of `BrokerSecurityConfigs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853536713


##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -202,7 +202,7 @@ public static void addClientSaslSupport(ConfigDef config) {
 .define(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, 
ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER, 
Range.between(0.0, 0.25), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC)
 .define(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS, 
Range.between(0, 900), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC)
 .define(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, 
ConfigDef.Type.SHORT, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS, 
Range.between(0, 3600), ConfigDef.Importance.LOW, 
SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC)
-.define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, 
SaslConfigs.SASL_MECHANISM_DOC)
+.define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, 
SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()), 
ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC)

Review Comment:
   I'm imagining a situation like this: a user has set `sasl.mechanism` to 
something strange like `null` (not understanding that this isn't necessary), 
they haven't configured their consumer/producer to use SASL, then they upgrade 
to a version of Kafka that includes this change, and things unexpectedly start 
breaking for them.
   
   I understand that most people don't put garbage in their configs, but I also 
don't think it's worth the tradeoff here of causing configs that used to be 
valid to suddenly start failing.
   
   What I believe is more warranted here is context-dependent validation that 
uses the same core logic that you propose here (i.e., disallowing null and 
empty values), but only does it if SASL is enabled for the consumer/producer. 
And, if SASL is disabled and the user has provided a value for this property 
anyways, it's probably worth emitting a warning letting them know that this 
value is not being used by the consumer/producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


C0urante commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853527225


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -486,10 +488,14 @@ public class ConsumerConfig extends AbstractConfig {
 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
 .define(KEY_DESERIALIZER_CLASS_CONFIG,
 Type.CLASS,
+ConfigDef.NO_DEFAULT_VALUE,
+new ConfigDef.NonNullValidator(),

Review Comment:
   Yes, I understand the motivation; I was wondering if we would automatically 
catch the case of null values when we'd defined the property with no default 
value.
   
   To answer my own question, an explicitly-set `null` in the original 
`Map` is not the same as having no value set at all, and since 
users can programmatically instantiate `KafkaConsumer` instances, it's possible 
that they may have specified `null` for the `key.deserializer` property and 
others like it. As a result, it is necessary to guard against null values for 
this property, either with preflight validation or downstream manual checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13816) Downgrading Connect rebalancing protocol from incremental to eager causes duplicate task instances

2022-04-19 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524601#comment-17524601
 ] 

Chris Egerton edited comment on KAFKA-13816 at 4/19/22 9:53 PM:


[~sagarrao] I think you're on the right track, but it's a little trickier than 
that since the leader won't necessarily die if it fails to read the config 
topic during a rebalance. In some (probably most) cases, it'll just issue an 
assignment to everyone with the error field set to 
[CONFIG_MISMATCH|https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java#L279].
 This also means that the leader (unless it dies due to unrelated 
circumstances) will also participate in the next rebalance and remain the 
leader.

Beyond that, I'm sorry but I don't really have time to focus on a design for 
this right now; it's why I've created this ticket but haven't assigned it to 
myself. It sounds like you're on the right track but I can't say for certain :(


was (Author: chrisegerton):
[~sagarrao] I think you're on the right track, but it's a little trickier than 
that since the leader won't necessarily die if it fails to read the config 
topic during a rebalance. In some (probably most) cases, it'll just issue an 
assignment to everyone with the error field set to 
[CONFIG_MISMATCH|https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java#L279].
 This also means that the leader (unless it dies due to unrelated 
circumstances) will also participate in the next rebalance and remain the 
leader.

Beyond that, I'm sorry but I don't really have time to focus on a design for 
this right now; it's why I've created this ticket but haven't assigned it to 
myself.

> Downgrading Connect rebalancing protocol from incremental to eager causes 
> duplicate task instances
> --
>
> Key: KAFKA-13816
> URL: https://issues.apache.org/jira/browse/KAFKA-13816
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> The rebalancing protocol for a Kafka Connect cluster can be downgraded from 
> incremental to eager by adding a worker to the cluster with 
> {{connect.protocol}} set to {{{}eager{}}}, or by stopping an existing worker 
> in that cluster, reconfiguring it with the new protocol, and restarting it.
> When the worker (re)joins the cluster, a rebalance takes place using the 
> eager protocol, and duplicate task instances are created on the cluster.
> This occurs because:
>  * The leader does not send out an assignment that revokes all connectors and 
> tasks for the cluster during that round
>  * Workers do not respond to the downgrade in protocol by revoking all 
> connectors and tasks that they were running before the rebalance that are not 
> included in the new assignment they received during the rebalance
> It's likely that this bug hasn't surfaced sooner because any subsequent 
> rebalance should cause all connectors and tasks on all each in the cluster to 
> be proactively revoked before the worker rejoins the group.
> [KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-Compatibility,Deprecation,andMigrationPlan]
>  provides one way to address this:
> {quote}To downgrade your cluster to use protocol version 0 from version 1 or 
> higher with {{eager}} rebalancing policy what is required is to switch one of 
> the workers back to {{eager}} mode. 
> {panel}
>  {panel}
> |{{connect.protocol = eager}}|
> Once this worker joins, the group will downgrade to protocol version 0 and 
> {{eager}} rebalancing policy, with immediately release of resources upon 
> joining the group. This process will require a one-time double rebalancing, 
> with the leader detecting the downgrade and first sending a downgraded 
> assignment with empty assigned connectors and tasks and from then on just 
> regular downgraded assignments. 
> {quote}
> However, it's unclear how to accomplish the second round in the double 
> rebalance described above.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13816) Downgrading Connect rebalancing protocol from incremental to eager causes duplicate task instances

2022-04-19 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524601#comment-17524601
 ] 

Chris Egerton commented on KAFKA-13816:
---

[~sagarrao] I think you're on the right track, but it's a little trickier than 
that since the leader won't necessarily die if it fails to read the config 
topic during a rebalance. In some (probably most) cases, it'll just issue an 
assignment to everyone with the error field set to 
[CONFIG_MISMATCH|https://github.com/apache/kafka/blob/9c3f605fc78f297ecf5accdcdec18471c19cf7d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java#L279].
 This also means that the leader (unless it dies due to unrelated 
circumstances) will also participate in the next rebalance and remain the 
leader.

Beyond that, I'm sorry but I don't really have time to focus on a design for 
this right now; it's why I've created this ticket but haven't assigned it to 
myself.

> Downgrading Connect rebalancing protocol from incremental to eager causes 
> duplicate task instances
> --
>
> Key: KAFKA-13816
> URL: https://issues.apache.org/jira/browse/KAFKA-13816
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> The rebalancing protocol for a Kafka Connect cluster can be downgraded from 
> incremental to eager by adding a worker to the cluster with 
> {{connect.protocol}} set to {{{}eager{}}}, or by stopping an existing worker 
> in that cluster, reconfiguring it with the new protocol, and restarting it.
> When the worker (re)joins the cluster, a rebalance takes place using the 
> eager protocol, and duplicate task instances are created on the cluster.
> This occurs because:
>  * The leader does not send out an assignment that revokes all connectors and 
> tasks for the cluster during that round
>  * Workers do not respond to the downgrade in protocol by revoking all 
> connectors and tasks that they were running before the rebalance that are not 
> included in the new assignment they received during the rebalance
> It's likely that this bug hasn't surfaced sooner because any subsequent 
> rebalance should cause all connectors and tasks on all each in the cluster to 
> be proactively revoked before the worker rejoins the group.
> [KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-Compatibility,Deprecation,andMigrationPlan]
>  provides one way to address this:
> {quote}To downgrade your cluster to use protocol version 0 from version 1 or 
> higher with {{eager}} rebalancing policy what is required is to switch one of 
> the workers back to {{eager}} mode. 
> {panel}
>  {panel}
> |{{connect.protocol = eager}}|
> Once this worker joins, the group will downgrade to protocol version 0 and 
> {{eager}} rebalancing policy, with immediately release of resources upon 
> joining the group. This process will require a one-time double rebalancing, 
> with the leader detecting the downgrade and first sending a downgraded 
> assignment with empty assigned connectors and tasks and from then on just 
> regular downgraded assignments. 
> {quote}
> However, it's unclear how to accomplish the second round in the double 
> rebalance described above.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13837) Return error for Fetch requests from unrecognized followers

2022-04-19 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13837:
---

 Summary: Return error for Fetch requests from unrecognized 
followers
 Key: KAFKA-13837
 URL: https://issues.apache.org/jira/browse/KAFKA-13837
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


If the leader of a partition receives a request from a replica which is not in 
the current replica set, we currently return an empty fetch response with no 
error. I think the rationale for this is that the leader may not have received 
the latest `LeaderAndIsr` update which adds the replica, so we just want the 
follower to retry. The problem with this is that if the `LeaderAndIsr` request 
never arrives, then there might not be an external indication of a problem. 
This probably was the only reasonable option before we added the leader epoch 
to the `Fetch` request API. Now that we have it, it would be clearer to return 
an `UNKNOWN_LEADER_EPOCH` error to indicate that the (replicaId, leaderEpoch) 
tuple is not recognized. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] philipnee commented on a diff in pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-19 Thread GitBox


philipnee commented on code in PR #12064:
URL: https://github.com/apache/kafka/pull/12064#discussion_r853476454


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1905,15 +1905,21 @@ public void testCallbackHandlesError() throws Exception 
{
 Map configs = new HashMap<>();
 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName());

Review Comment:
   Good call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-19 Thread GitBox


junrao commented on code in PR #12064:
URL: https://github.com/apache/kafka/pull/12064#discussion_r853473066


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -1905,15 +1905,21 @@ public void testCallbackHandlesError() throws Exception 
{
 Map configs = new HashMap<>();
 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName());

Review Comment:
   Perhaps the method name could be changed to 
testCallbackAndInterceptorHandleError?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe merged PR #12063:
URL: https://github.com/apache/kafka/pull/12063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on PR #12063:
URL: https://github.com/apache/kafka/pull/12063#issuecomment-1103064630

   Jenkins test failures were `ConnectorTopicsIntegrationTest` and 
`ConnectionQuotasTest`, which are not related and which pass locally. 
Committing. Thanks, all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13827) Data Loss on moving to KRAFT mode from v3.1

2022-04-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524498#comment-17524498
 ] 

Ismael Juma commented on KAFKA-13827:
-

{quote}So if someone is to start using KRAFT mode, it has to be a fresh setup 
and cannot be migrated from the existing cluster. Please correct me if I am 
wrong.
{quote}
 
Correct,

> Data Loss on moving to KRAFT mode from v3.1
> ---
>
> Key: KAFKA-13827
> URL: https://issues.apache.org/jira/browse/KAFKA-13827
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Sharad Garg
>Priority: Major
>
> * Topics created in v2.8/v3.1 that have data are empty (data is deleted) in 
> KRAFT mode. These topics are recreated in KRAFT mode and take some time to 
> show up in listTopics (~90s) after the brokers are up. Below logs are seen in 
> server.logs-
> {code:java}
> [2022-04-07 23:58:49,771] INFO [Controller 1] Created topic 
> kafkaalpha.gargsha.test5 with topic ID APdVTk-9S-uf2q_bm9NpzQ.{nolookups} 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-07 23:58:49,771] INFO [Controller 1] Created partition 
> kafkaalpha.gargsha.test5-0 with topic ID APdVTk-9S-uf2q_bm9NpzQ and 
> PartitionRegistration(replicas=[2], isr=[2], removingReplicas=[], 
> addingReplicas
> =[], leader=2, leaderEpoch=0, partitionEpoch=0).{nolookups} 
> (org.apache.kafka.controller.ReplicationControlManager){code}
>  * Topic configurations altered in v2.8/v3.1 are not persisted in KRAFT mode.
>  * Topics with no data in v2.8 are deleted when migrated to KRAFT v3.1.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13827) Data Loss on moving to KRAFT mode from v3.1

2022-04-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524498#comment-17524498
 ] 

Ismael Juma edited comment on KAFKA-13827 at 4/19/22 6:29 PM:
--

{quote}So if someone is to start using KRAFT mode, it has to be a fresh setup 
and cannot be migrated from the existing cluster. Please correct me if I am 
wrong.
{quote}
 
Correct.


was (Author: ijuma):
{quote}So if someone is to start using KRAFT mode, it has to be a fresh setup 
and cannot be migrated from the existing cluster. Please correct me if I am 
wrong.
{quote}
 
Correct,

> Data Loss on moving to KRAFT mode from v3.1
> ---
>
> Key: KAFKA-13827
> URL: https://issues.apache.org/jira/browse/KAFKA-13827
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Sharad Garg
>Priority: Major
>
> * Topics created in v2.8/v3.1 that have data are empty (data is deleted) in 
> KRAFT mode. These topics are recreated in KRAFT mode and take some time to 
> show up in listTopics (~90s) after the brokers are up. Below logs are seen in 
> server.logs-
> {code:java}
> [2022-04-07 23:58:49,771] INFO [Controller 1] Created topic 
> kafkaalpha.gargsha.test5 with topic ID APdVTk-9S-uf2q_bm9NpzQ.{nolookups} 
> (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-07 23:58:49,771] INFO [Controller 1] Created partition 
> kafkaalpha.gargsha.test5-0 with topic ID APdVTk-9S-uf2q_bm9NpzQ and 
> PartitionRegistration(replicas=[2], isr=[2], removingReplicas=[], 
> addingReplicas
> =[], leader=2, leaderEpoch=0, partitionEpoch=0).{nolookups} 
> (org.apache.kafka.controller.ReplicationControlManager){code}
>  * Topic configurations altered in v2.8/v3.1 are not persisted in KRAFT mode.
>  * Topics with no data in v2.8 are deleted when migrated to KRAFT v3.1.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cmccabe commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r853345314


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -948,18 +980,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 val oldListenerMap = listenersToMap(oldListeners)
 val listenersRemoved = oldListeners.filterNot(e => 
newListenerMap.contains(e.listenerName))
 val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
-
-// Clear SASL login cache to force re-login
-if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-  LoginManager.closeAll()
-
-server.socketServer.removeListeners(listenersRemoved)
-if (listenersAdded.nonEmpty)
-  server.socketServer.addListeners(listenersAdded)
-
-server match {
-  case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-  case _ =>
+if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {

Review Comment:
   `listenerRegistrationsAltered` is about *registered listeners*, which may be 
different from *listeners*.
   
   One example is if you have a registered listener set to 
`PLAINTEXT://myhostname:9092`. Changing that to `PLAINTEXT://myhostname2:9092` 
is a registered listener change which is not possible in KRaft currently.
   
   But, you could have the *listener* change from `PLAINTEXT://0.0.0.0:9092` 
(listen on all interfaces) to `PLAINTEXT://myhostname:9092` (listen only on the 
IP associated with myhostname). *That* would be possible in kraft.
   
   And of course, we only want to alter the znode in ZK mode if the registered 
listeners changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


hachikuji commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r853339103


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -948,18 +980,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 val oldListenerMap = listenersToMap(oldListeners)
 val listenersRemoved = oldListeners.filterNot(e => 
newListenerMap.contains(e.listenerName))
 val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
-
-// Clear SASL login cache to force re-login
-if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-  LoginManager.closeAll()
-
-server.socketServer.removeListeners(listenersRemoved)
-if (listenersAdded.nonEmpty)
-  server.socketServer.addListeners(listenersAdded)
-
-server match {
-  case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-  case _ =>
+if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {

Review Comment:
   Could this check be moved inside the `listenerRegistrationsAltered` block 
below? I'm trying to understand why they handled separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted

2022-04-19 Thread GitBox


junrao commented on code in PR #12029:
URL: https://github.com/apache/kafka/pull/12029#discussion_r853335645


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2703,6 +2711,26 @@ class ReplicaManagerTest {
   assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
   assertFalse(readRecoveryPointCheckpoint().contains(tp0))
   assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
+
+  // verify that there is log to be deleted
+  val logsToBeDeleted = replicaManager.logManager.logsToBeDeleted
+  assertEquals(1, logsToBeDeleted.size())
+  val (removedLog, _) = logsToBeDeleted.take()
+  
assertFalse(removedLog.logDirFailureChannel.hasOfflineLogDir(logDir.toString))
+
+  // trigger the background deletion
+  var kafkaStorageExceptionCaptured = false
+  try {
+removedLog.delete()
+  } catch {
+case _: KafkaStorageException =>
+  kafkaStorageExceptionCaptured = true
+  }
+  assertEquals(expectedExceptionInBackgroundDeletion, 
kafkaStorageExceptionCaptured)

Review Comment:
   @dajac : Yes, perhaps we could have a separate log level test for this 
exception if not covered by existing tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-19 Thread GitBox


philipnee commented on PR #12064:
URL: https://github.com/apache/kafka/pull/12064#issuecomment-1102902016

   Hey @junrao - Much thanks to look into this, I added some tests to the PR, 
let me know if you think the testing strategy is acceptable...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12004: KAFKA-10095: Add stricter assertion in LogCleanerManagerTest

2022-04-19 Thread GitBox


divijvaidya commented on code in PR #12004:
URL: https://github.com/apache/kafka/pull/12004#discussion_r853306673


##
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogCleanerManagerTest extends Logging {
 
 // updateCheckpoints should remove the topicPartition data in the logDir
 cleanerManager.updateCheckpoints(logDir, partitionToRemove = 
Option(topicPartition))
-
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+assertTrue(!cleanerManager.allCleanerCheckpoints.contains(topicPartition))

Review Comment:
   done.



##
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala:
##
@@ -710,14 +710,16 @@ class LogCleanerManagerTest extends Logging {
 assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
 cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
-cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+val endOffset = 1L
+cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
 assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
-
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
+assertEquals(Some(endOffset), 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
 
 cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
 cleanerManager.doneCleaning(topicPartition, log.dir, 1)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r853303350


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -950,16 +963,17 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
 
 // Clear SASL login cache to force re-login
-if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-  LoginManager.closeAll()
-
-server.socketServer.removeListeners(listenersRemoved)
-if (listenersAdded.nonEmpty)
-  server.socketServer.addListeners(listenersAdded)
-
-server match {
-  case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-  case _ =>
+LoginManager.closeAll()

Review Comment:
   I fixed this so that we only call this if an endpoint was added or removed, 
which is how it was previously.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on PR #12063:
URL: https://github.com/apache/kafka/pull/12063#issuecomment-1102884473

   The only test failure I could find was the "heap space exhausted" thing 
which has nothing to do with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r853300475


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 toLoggableProps(resource, props).mkString(","))
   dynamicConfigHandlers(ConfigType.Topic).
 processConfigChanges(resource.name(), props)
-  conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-case BROKER => if (resource.name().isEmpty) {
-  // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-  // These are stored in KRaft with an empty name field.
-  info(s"Updating cluster configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(ConfigEntityName.Default, props)
-} else if (resource.name().equals(brokerId.toString)) {
-  // Apply changes to this broker's dynamic configuration.
-  info(s"Updating broker ${brokerId} with new configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(resource.name(), props)
-}
+case BROKER =>
+  if (resource.name().isEmpty) {
+// Apply changes to "cluster configs" (also known as default 
BROKER configs).
+// These are stored in KRaft with an empty name field.
+info(s"Updating cluster configuration : " +
+  toLoggableProps(resource, props).mkString(","))
+dynamicConfigHandlers(ConfigType.Broker).
+  processConfigChanges(ConfigEntityName.Default, props)
+  } else if (resource.name().equals(brokerId.toString)) {

Review Comment:
   OK, I will change it to == here to make it more idiomatic Scala code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


cmccabe commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r853299678


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -248,6 +248,24 @@ class BrokerMetadataListener(
 }
   }
 
+  // This is used in tests to alter the publisher that is in use by the broker.
+  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
+val event = new AlterPublisherEvent(publisher)
+eventQueue.append(event)
+event.future
+  }
+
+  class AlterPublisherEvent(publisher: MetadataPublisher)
+extends EventQueue.FailureLoggingEvent(log) {
+val future = new CompletableFuture[Void]()
+
+override def run(): Unit = {
+  _publisher = Some(publisher)
+  log.info(s"Set publisher to ${publisher}")

Review Comment:
   @mdedetrich : Thanks for the comment. I would like to put this code outside 
of `BrokerMetadataListener`, but that would mean making `_publisher` public (or 
at least non-private) which is something I really don't want to do. People have 
a tendency to use whatever variables are available to them in junit tests, even 
if there are concurrency problems. We have a lot of examples of that. This 
function is more like a setter. It's simple enough to just put in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12004: KAFKA-10095: Add stricter assertion in LogCleanerManagerTest

2022-04-19 Thread GitBox


mimaison commented on code in PR #12004:
URL: https://github.com/apache/kafka/pull/12004#discussion_r853285333


##
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogCleanerManagerTest extends Logging {
 
 // updateCheckpoints should remove the topicPartition data in the logDir
 cleanerManager.updateCheckpoints(logDir, partitionToRemove = 
Option(topicPartition))
-
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+assertTrue(!cleanerManager.allCleanerCheckpoints.contains(topicPartition))

Review Comment:
   Would 
`assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))` be 
even more readable?



##
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala:
##
@@ -710,14 +710,16 @@ class LogCleanerManagerTest extends Logging {
 assertThrows(classOf[IllegalStateException], () => 
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
 cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
-cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+val endOffset = 1L
+cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
 assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
-
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
+assertEquals(Some(endOffset), 
cleanerManager.allCleanerCheckpoints.get(topicPartition))
 
 cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
 cleanerManager.doneCleaning(topicPartition, log.dir, 1)

Review Comment:
   Should we reuse `endOffsets` here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-04-19 Thread GitBox


ijuma commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1102838142

   cc @apovzner @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12004: KAFKA-10095: Add stricter assertion in LogCleanerManagerTest

2022-04-19 Thread GitBox


divijvaidya commented on PR #12004:
URL: https://github.com/apache/kafka/pull/12004#issuecomment-1102809653

   Requesting review from @abbccdda and @hachikuji since you folks were part of 
a previous PR for this JIRA.
   
   Please take a look when you get a chance 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`

2022-04-19 Thread GitBox


divijvaidya commented on PR #12045:
URL: https://github.com/apache/kafka/pull/12045#issuecomment-1102806383

   Requesting review from @mjsax since you commented on the associated JIRA: 
https://issues.apache.org/jira/browse/KAFKA-12319 
   
   Requesting review from @ijuma @jjkoshy since you folks reviewed the last set 
of changes in `Rate.java` file.
   
   Please take a look when you get a chance 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13654.
--
Fix Version/s: 3.3.0
   Resolution: Fixed

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
> Fix For: 3.3.0
>
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13654) Extend KStream process with new Processor API

2022-04-19 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-13654:


Assignee: Jorge Esteban Quilcate Otoya

> Extend KStream process with new Processor API
> -
>
> Key: KAFKA-13654
> URL: https://issues.apache.org/jira/browse/KAFKA-13654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: kafka-streams, kip-required, needs-kip, streams
>
> Extending KStream#process to use latest Processor API adopted here: 
> https://issues.apache.org/jira/browse/KAFKA-8410
> This new API allow typed returned KStream that will allow to chain results 
> from processors, becoming a new way to transform records with more control 
> over whats forwarded.
> KIP: https://cwiki.apache.org/confluence/x/yKbkCw



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vvcephei merged pull request #11993: KAFKA-13654: Extend KStream process with new Processor API

2022-04-19 Thread GitBox


vvcephei merged PR #11993:
URL: https://github.com/apache/kafka/pull/11993


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12058: MINOR: Scala cleanups in core

2022-04-19 Thread GitBox


mimaison commented on code in PR #12058:
URL: https://github.com/apache/kafka/pull/12058#discussion_r853202342


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -140,13 +140,13 @@ object DelegationTokenManager {
 
 val allow =
 //exclude tokens which are not requested
-  if (!owners.isEmpty && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {
+  if (owners.isDefined && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {

Review Comment:
   `isDefined` and `nonEmpty` are equivalent on `Option`. On `String` there is 
not `isDefined`, so we need to use `nonEmpty`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12010: KAFKA-13793: Add validators for configs that lack validators

2022-04-19 Thread GitBox


divijvaidya commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r853124702


##
clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java:
##
@@ -45,4 +45,9 @@ public static SslClientAuth forConfig(String key) {
 }
 return null;
 }
+
+@Override
+public String toString() {
+return name().toLowerCase(Locale.ROOT);

Review Comment:
   optional
   
   `super.toString().toLowerCase(Locale.ROOT);` ?



##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {

Review Comment:
   please add javadoc for all public classes and methods



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -108,4 +112,42 @@ public void testDefaultPartitionAssignor() {
 assertEquals(Arrays.asList(RangeAssignor.class, 
CooperativeStickyAssignor.class),
 new 
ConsumerConfig(properties).getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
 }
+
+@Test
+public void testInvalidKeyDeserializer() {
+Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, null);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
+
assertTrue(ce.getMessage().contains(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+}
+
+@Test

Review Comment:
   optional suggestion
   
   Create a parameterized test [1] with multiple invalid values such as empty 
string, null, string with whitespace in it. Same could be done for rest of the 
tests as well.
   
   [1] 
https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests



##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session lifetime and that is then subsequently "
 + "used for any purpose other than re-authentication. 
Configuration names can optionally be prefixed with listener prefix and SASL "
 + "mechanism name in lower-case. For example, 
listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=360";
+
+public static class SaslEnabledMechanismsValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null, "entry must be non 
null");
+}
+
+@SuppressWarnings("unchecked")
+List mechanismStrings = (List) value;
+
+if (mechanismStrings.isEmpty()) {
+throw new ConfigException(name, null, "entry must be non-empty 
list");
+}
+
+mechanismStrings.forEach(mechanism -> {
+if (mechanism == null || mechanism.isEmpty()) {

Review Comment:
   A stricter check could be `String.isBlank()` which checks for white spaces 
as well which is invalid in this scenario.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -279,7 +281,7 @@ protected static ConfigDef baseConfigDef() {
 "", Importance.LOW,
 CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
 .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
-ConfigDef.Type.STRING, "none", 
ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
+ConfigDef.Type.STRING, 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT), 
in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)

Review Comment:
   since we have overridden the toString method in SslClientAuth, we can safely 
do SslClientAuth.NONE.toString() here.



##
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java:
##
@@ -89,4 +91,31 @@ public class BrokerSecurityConfigs {
 + "The broker will disconnect any such connection that is not 
re-authenticated within the session life

[jira] [Assigned] (KAFKA-13836) Improve KRaft broker heartbeat logic

2022-04-19 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-13836:
--

Assignee: dengziming

> Improve KRaft broker heartbeat logic
> 
>
> Key: KAFKA-13836
> URL: https://issues.apache.org/jira/browse/KAFKA-13836
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> # Don't advertise an offset to the controller until it has been published
>  # only unfence a broker when it has seen it's own registration



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #11951: KAFKA-13737: Fix flaky test in LeaderElectionCommandTest

2022-04-19 Thread GitBox


dengziming commented on PR #11951:
URL: https://github.com/apache/kafka/pull/11951#issuecomment-1102760776

   Yeah @hachikuji , it's impossible to remove the gap but we can still 
minimize it by:
   1. Don't advertise an offset to the controller until it has been published
   2. Only unfence a broker when it has seen its own registration
   
   I also added a test case for these 2 logics in BrokerLifecycleManagerTest, 
it seems a little messy, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13836) Improve KRaft broker heartbeat logic

2022-04-19 Thread dengziming (Jira)
dengziming created KAFKA-13836:
--

 Summary: Improve KRaft broker heartbeat logic
 Key: KAFKA-13836
 URL: https://issues.apache.org/jira/browse/KAFKA-13836
 Project: Kafka
  Issue Type: Improvement
Reporter: dengziming


# Don't advertise an offset to the controller until it has been published
 # only unfence a broker when it has seen it's own registration



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mimaison opened a new pull request, #12068: MINOR: Fix warnings in Gradle 7

2022-04-19 Thread GitBox


mimaison opened a new pull request, #12068:
URL: https://github.com/apache/kafka/pull/12068

   JavaExec.main and Report.destination have been deprecated and will be removd 
in Gradle 8. Use the new fields (mainClass and outputLocation) instead.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #12067: KAFKA-13780: Generate OpenAPI file for Connect REST API

2022-04-19 Thread GitBox


mimaison opened a new pull request, #12067:
URL: https://github.com/apache/kafka/pull/12067

   New gradle task: connect:runtime:genConnectOpenAPIDocs that generates 
connect_rest.yaml under docs/generated
   This task is executed when siteDocsTar runs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12066: KAFKA-13834: fix drain batch starving issue

2022-04-19 Thread GitBox


RivenSun2 commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r853013703


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -48,18 +48,11 @@
 import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;

Review Comment:
   Please do not import *.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-19 Thread GitBox


RivenSun2 commented on PR #12052:
URL: https://github.com/apache/kafka/pull/12052#issuecomment-1102545521

   Hi @divijvaidya Thanks for your review, just resubmitted the changes.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13834) Some problems with producers choosing batches of messages to send

2022-04-19 Thread ruanliang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524270#comment-17524270
 ] 

ruanliang commented on KAFKA-13834:
---

[~showuon]  [~guozhang]   add the test case  
[https://github.com/apache/kafka/pull/12066] 

> Some problems with producers choosing batches of messages to send
> -
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerBatch in the se

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-04-19 Thread GitBox


vamossagar12 commented on code in PR #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r852939284


##
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java:
##
@@ -87,23 +90,35 @@ public KeyValueIterator backwardFetch(final 
Bytes key,
 return fetch(key, from, to, false);
 }
 
+
 KeyValueIterator fetch(final Bytes key,
   final long from,
   final long to,
   final boolean forward) {
-final List searchSpace = keySchema.segmentsToSearch(segments, from, 
to, forward);
 
-final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
+final long actualFrom = getActualFrom(from);
+
+if (keySchema instanceof WindowKeySchema && to < actualFrom) {
+return KeyValueIterators.emptyIterator();

Review Comment:
   Yeah.. so for the case of WindowKeySchema, if to < from, the underlying code 
throws IllegalArguementException while that's not the case for SessionKeySchema 
and other kinds of schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #11993: KAFKA-13654: Extend KStream process with new Processor API

2022-04-19 Thread GitBox


jeqo commented on code in PR #11993:
URL: https://github.com/apache/kafka/pull/11993#discussion_r852932074


##
streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java:
##
@@ -60,7 +61,7 @@ public void init(final InternalProcessorContext 
context) {
 
 @Override
 public void process(final Record record) {
-processor().process(record);
+mockProcessor.process(record);

Review Comment:
   Fallback to `MockProcessor` instead of `MockApiProcessor` as there is no 
need to migrate this I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #11993: KAFKA-13654: Extend KStream process with new Processor API

2022-04-19 Thread GitBox


jeqo commented on code in PR #11993:
URL: https://github.com/apache/kafka/pull/11993#discussion_r852931420


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##
@@ -37,30 +43,67 @@
 @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
 private final org.apache.kafka.streams.processor.ProcessorSupplier oldProcessorSupplier;
 private final ProcessorSupplier processorSupplier;
+private final FixedKeyProcessorSupplier 
fixedKeyProcessorSupplier;
 private final String processorName;
 
 @SuppressWarnings("deprecation") // Old PAPI compatibility.
 public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier 
processorSupplier,
final String processorName) {
 oldProcessorSupplier = processorSupplier;
 this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+fixedKeyProcessorSupplier = null;
 this.processorName = processorName;
 }
 
 public ProcessorParameters(final ProcessorSupplier 
processorSupplier,
final String processorName) {
 oldProcessorSupplier = null;
 this.processorSupplier = processorSupplier;
+fixedKeyProcessorSupplier = null;
+this.processorName = processorName;
+}
+
+public ProcessorParameters(final FixedKeyProcessorSupplier 
processorSupplier,
+   final String processorName) {
+oldProcessorSupplier = null;
+this.processorSupplier = null;
+fixedKeyProcessorSupplier = processorSupplier;
 this.processorName = processorName;
 }
 
 public ProcessorSupplier processorSupplier() {
 return processorSupplier;
 }
 
-@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-public org.apache.kafka.streams.processor.ProcessorSupplier 
oldProcessorSupplier() {
-return oldProcessorSupplier;
+public FixedKeyProcessorSupplier 
fixedKeyProcessorSupplier() {
+return fixedKeyProcessorSupplier;
+}
+
+public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
+if (processorSupplier != null) {
+topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
+if (processorSupplier.stores() != null) {
+for (final StoreBuilder storeBuilder : 
processorSupplier.stores()) {
+topologyBuilder.addStateStore(storeBuilder, processorName);
+}
+}
+}
+if (fixedKeyProcessorSupplier != null) {
+topologyBuilder.addProcessor(processorName, 
fixedKeyProcessorSupplier, parentNodeNames);
+if (fixedKeyProcessorSupplier.stores() != null) {
+for (final StoreBuilder storeBuilder : 
fixedKeyProcessorSupplier.stores()) {
+topologyBuilder.addStateStore(storeBuilder, processorName);
+}
+}
+}
+
+// temporary hack until KIP-478 is fully implemented
+// Old PAPI. Needs to be migrated.
+if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != 
null) {
+for (final StoreBuilder storeBuilder : 
oldProcessorSupplier.stores()) {
+topologyBuilder.addStateStore(storeBuilder, processorName);
+}
+}

Review Comment:
   Added a couple of tests to KStream to test these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12058: MINOR: Scala cleanups in core

2022-04-19 Thread GitBox


divijvaidya commented on code in PR #12058:
URL: https://github.com/apache/kafka/pull/12058#discussion_r852885365


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -140,13 +140,13 @@ object DelegationTokenManager {
 
 val allow =
 //exclude tokens which are not requested
-  if (!owners.isEmpty && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {
+  if (owners.isDefined && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {

Review Comment:
   although `isDefined` and `nonEmpty` are equivalent but I was wondering if 
there is a reason for using former at some places and the latter at others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-19 Thread GitBox


divijvaidya commented on code in PR #12052:
URL: https://github.com/apache/kafka/pull/12052#discussion_r852846763


##
docs/design.html:
##
@@ -124,6 +124,7 @@ 
 and copied out to user-space every time it is read. This allows messages 
to be consumed at a rate that approaches the limit of the network connection.
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
+When the transport layer uses the SSL protocol, sendfile will not be used 
due to the need to encrypt the data read.

Review Comment:
   Nice addition and clarification here! Adding a few suggestion below for your 
consideration:
   
   1. It might be better if we could add the configuration name here which 
enables SSL protocol. This clarification will help the users understand what 
configuration could have an impact on zero copy performance.
   
   2.  This line is an important callout. Perhaps start with a new paragraph so 
that it jumps out.
   
   3. Perhaps add the reasoning for what prevents usage of `sendfile` with SSL 
such as, 
   > "TLS/SSL libraries operate at the user space (in-kernel TLS is currently 
not supported by Kafka). Due to this restriction, `sendfile` API could not be 
used when transport layer uses SSL protocol".
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12044: KAFKA-12738: implement exponential backoff for tasks

2022-04-19 Thread GitBox


mdedetrich commented on code in PR #12044:
URL: https://github.com/apache/kafka/pull/12044#discussion_r852823850


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskScheduler.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed 
and decides if/when they can
+ * be executed.
+ *
+ * Note: A single instance of this class is shared between all StreamThreads, 
so it must be thread-safe
+ */
+public class TaskScheduler {
+// Reaches the maximum backoff interval in about 5 attempts, at which point
+private static final long INITIAL_BACKOFF_MS = 3 * 1000L;  // wait 3s 
after the first task failure

Review Comment:
   Does it make sense to have these values configurable with the defaults being 
what you hardcoded? Even if its not configurable but `TaskScheduler` is given a 
constructor containing 
`INITIAL_BACKOFF_MS`/`RETRY_BACKOFF_EXP_BASE`/`MAXIMUM_BACKOFF_MS`/`RETRY_BACKOFF_JITTER`
 as parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


mdedetrich commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r852818277


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 toLoggableProps(resource, props).mkString(","))
   dynamicConfigHandlers(ConfigType.Topic).
 processConfigChanges(resource.name(), props)
-  conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-case BROKER => if (resource.name().isEmpty) {
-  // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-  // These are stored in KRaft with an empty name field.
-  info(s"Updating cluster configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(ConfigEntityName.Default, props)
-} else if (resource.name().equals(brokerId.toString)) {
-  // Apply changes to this broker's dynamic configuration.
-  info(s"Updating broker ${brokerId} with new configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(resource.name(), props)
-}
+case BROKER =>
+  if (resource.name().isEmpty) {
+// Apply changes to "cluster configs" (also known as default 
BROKER configs).
+// These are stored in KRaft with an empty name field.
+info(s"Updating cluster configuration : " +
+  toLoggableProps(resource, props).mkString(","))
+dynamicConfigHandlers(ConfigType.Broker).
+  processConfigChanges(ConfigEntityName.Default, props)
+  } else if (resource.name().equals(brokerId.toString)) {

Review Comment:
   I echo this, in Scala pretty much **no one** uses `equals`. Scala 
deliberately overrides `==` so that it always uses `equals` and that is the 
normal way of doing non reference object comparison.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


mdedetrich commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r852818277


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 toLoggableProps(resource, props).mkString(","))
   dynamicConfigHandlers(ConfigType.Topic).
 processConfigChanges(resource.name(), props)
-  conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-case BROKER => if (resource.name().isEmpty) {
-  // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-  // These are stored in KRaft with an empty name field.
-  info(s"Updating cluster configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(ConfigEntityName.Default, props)
-} else if (resource.name().equals(brokerId.toString)) {
-  // Apply changes to this broker's dynamic configuration.
-  info(s"Updating broker ${brokerId} with new configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  dynamicConfigHandlers(ConfigType.Broker).
-processConfigChanges(resource.name(), props)
-}
+case BROKER =>
+  if (resource.name().isEmpty) {
+// Apply changes to "cluster configs" (also known as default 
BROKER configs).
+// These are stored in KRaft with an empty name field.
+info(s"Updating cluster configuration : " +
+  toLoggableProps(resource, props).mkString(","))
+dynamicConfigHandlers(ConfigType.Broker).
+  processConfigChanges(ConfigEntityName.Default, props)
+  } else if (resource.name().equals(brokerId.toString)) {

Review Comment:
   I echo this, in Scala pretty much **no one** uses `equals`. Scala overrides 
`==` so that it always uses `equals` and that is the normal way of doing non 
reference object comparison.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a diff in pull request #12063: KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft

2022-04-19 Thread GitBox


mdedetrich commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r852808807


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -248,6 +248,24 @@ class BrokerMetadataListener(
 }
   }
 
+  // This is used in tests to alter the publisher that is in use by the broker.
+  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
+val event = new AlterPublisherEvent(publisher)
+eventQueue.append(event)
+event.future
+  }
+
+  class AlterPublisherEvent(publisher: MetadataPublisher)
+extends EventQueue.FailureLoggingEvent(log) {
+val future = new CompletableFuture[Void]()
+
+override def run(): Unit = {
+  _publisher = Some(publisher)
+  log.info(s"Set publisher to ${publisher}")

Review Comment:
   If this is only used in tests does it make sense to alter `alterPublisher` 
to give it a second parameter accepting a value that implements 
`EventQueue.FailureLoggingEvent` and then define the `AlterPublisherEvent` 
class in `/core/src/test` rather than `core/src/main`.
   
   That way its more clear the class is only being used for tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruanliang-hualun commented on a diff in pull request #12066: bugfix for KAFKA-13834

2022-04-19 Thread GitBox


ruanliang-hualun commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r852691675


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -560,12 +561,14 @@ private List 
drainBatchesForOneNode(Cluster cluster, Node node, i
 List parts = cluster.partitionsForNode(node.id());
 List ready = new ArrayList<>();
 /* to make starvation less likely this loop doesn't start at 0 */
+int drainIndex = getDrainIndex(node.idString());
 int start = drainIndex = drainIndex % parts.size();
+updateDrainIndex(node.idString(),drainIndex);
 do {
 PartitionInfo part = parts.get(drainIndex);
 TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
-this.drainIndex = (this.drainIndex + 1) % parts.size();
-
+drainIndex = (drainIndex + 1) % parts.size();
+updateDrainIndex(node.idString(),drainIndex);

Review Comment:
   thanks for the advice , i will modify and resubmit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted

2022-04-19 Thread GitBox


dajac commented on code in PR #12029:
URL: https://github.com/apache/kafka/pull/12029#discussion_r852680173


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2703,6 +2711,26 @@ class ReplicaManagerTest {
   assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
   assertFalse(readRecoveryPointCheckpoint().contains(tp0))
   assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
+
+  // verify that there is log to be deleted
+  val logsToBeDeleted = replicaManager.logManager.logsToBeDeleted
+  assertEquals(1, logsToBeDeleted.size())
+  val (removedLog, _) = logsToBeDeleted.take()
+  
assertFalse(removedLog.logDirFailureChannel.hasOfflineLogDir(logDir.toString))
+
+  // trigger the background deletion
+  var kafkaStorageExceptionCaptured = false
+  try {
+removedLog.delete()
+  } catch {
+case _: KafkaStorageException =>
+  kafkaStorageExceptionCaptured = true
+  }
+  assertEquals(expectedExceptionInBackgroundDeletion, 
kafkaStorageExceptionCaptured)

Review Comment:
   I wonder if this test is the appropriate place for this check because it 
does not really test the stop replica path. Would it make sense to have it in a 
separate test? @junrao What do you think?



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2615,7 +2615,10 @@ class ReplicaManagerTest {
 
   @Test
   def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException():
 Unit = {

Review Comment:
   @gitlw Could `maybeFlushMetadataFile` in `renameDir` still throw an 
`IOException`? The test does not force this condition at the moment. Do we want 
to gate `maybeFlushMetadataFile` as well or do we need to update the test to 
trigger it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13832) Flaky test TopicCommandIntegrationTest.testAlterAssignment

2022-04-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13832.
---
Fix Version/s: 3.3.0
 Assignee: dengziming
   Resolution: Fixed

> Flaky test TopicCommandIntegrationTest.testAlterAssignment
> --
>
> Key: KAFKA-13832
> URL: https://issues.apache.org/jira/browse/KAFKA-13832
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
> Fix For: 3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon merged pull request #12060: KAFKA-13832: Fix flaky testAlterAssignment

2022-04-19 Thread GitBox


showuon merged PR #12060:
URL: https://github.com/apache/kafka/pull/12060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12066: bugfix for KAFKA-13834

2022-04-19 Thread GitBox


showuon commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r852661339


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -81,7 +82,7 @@ public class RecordAccumulator {
 private final IncompleteBatches incomplete;
 // The following variables are only accessed by the sender thread, so we 
don't need to protect them.
 private final Set muted;
-private int drainIndex;
+private Map nodesDrainIndex;

Review Comment:
   nit: need a space between `String,` and `Integer` `Map`



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -560,12 +561,14 @@ private List 
drainBatchesForOneNode(Cluster cluster, Node node, i
 List parts = cluster.partitionsForNode(node.id());
 List ready = new ArrayList<>();
 /* to make starvation less likely this loop doesn't start at 0 */
+int drainIndex = getDrainIndex(node.idString());
 int start = drainIndex = drainIndex % parts.size();
+updateDrainIndex(node.idString(),drainIndex);

Review Comment:
   I think we can update Drain Index at the end of `drainBatchesForOneNode`. We 
don't need to update it each time.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -560,12 +561,14 @@ private List 
drainBatchesForOneNode(Cluster cluster, Node node, i
 List parts = cluster.partitionsForNode(node.id());
 List ready = new ArrayList<>();
 /* to make starvation less likely this loop doesn't start at 0 */
+int drainIndex = getDrainIndex(node.idString());
 int start = drainIndex = drainIndex % parts.size();
+updateDrainIndex(node.idString(),drainIndex);
 do {
 PartitionInfo part = parts.get(drainIndex);
 TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
-this.drainIndex = (this.drainIndex + 1) % parts.size();
-
+drainIndex = (drainIndex + 1) % parts.size();
+updateDrainIndex(node.idString(),drainIndex);

Review Comment:
   Same here, don't need to update it each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13834) Some problems with producers choosing batches of messages to send

2022-04-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524118#comment-17524118
 ] 

Luke Chen commented on KAFKA-13834:
---

And BTW, the Jira title might also need to update. Ex: "batch drain for nodes 
might have starving issue"

> Some problems with producers choosing batches of messages to send
> -
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerB