[jira] [Resolved] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2023-02-20 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan resolved KAFKA-14435.
---
Fix Version/s: 3.3.2
   3.4.0
   Resolution: Fixed

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.0, 3.3.0, 3.2.1, 3.2.2, 3.2.3, 3.3.1
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Critical
> Fix For: 3.3.2, 3.4.0
>
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled

2023-02-20 Thread Purshotam Chauhan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Purshotam Chauhan updated KAFKA-14435:
--
Affects Version/s: 3.3.1
   3.3.0

> Kraft: StandardAuthorizer allowing a non-authorized user when 
> `allow.everyone.if.no.acl.found` is enabled
> -
>
> Key: KAFKA-14435
> URL: https://issues.apache.org/jira/browse/KAFKA-14435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.2.0, 3.3.0, 3.2.1, 3.2.2, 3.2.3, 3.3.1
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Critical
>
> When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow 
> everyone only if there is no ACL present for a particular resource. But if 
> there are ACL present for the resource, then it shouldn't be allowing 
> everyone.
> StandardAuthorizer is allowing the principals for which no ACLs are defined 
> even when the resource has other ACLs.
>  
> This behavior can be validated with the following test case:
>  
> {code:java}
> @Test
> public void testAllowEveryoneConfig() throws Exception {
> StandardAuthorizer authorizer = new StandardAuthorizer();
> HashMap configs = new HashMap<>();
> configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris");
> configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true");
> authorizer.configure(configs);
> authorizer.start(new 
> AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
> authorizer.completeInitialLoad();
> // Allow User:Alice to read topic "foobar"
> List acls = asList(
> withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", 
> WILDCARD, READ, ALLOW))
> );
> acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl()));
> // User:Bob shouldn't be allowed to read topic "foobar"
> assertEquals(singletonList(DENIED),
> authorizer.authorize(new MockAuthorizableRequestContext.Builder().
> setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(),
> singletonList(newAction(READ, TOPIC, "foobar";
> }
>  {code}
>  
> In the above test, `User:Bob` should be DENIED but the above test case fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy merged pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2023-02-20 Thread via GitHub


omkreddy merged PR #11976:
URL: https://github.com/apache/kafka/pull/11976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #13244: KAFKA-14495: assert the cache size for each operation

2023-02-20 Thread via GitHub


satishd merged PR #13244:
URL: https://github.com/apache/kafka/pull/13244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-20 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112485394


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-// First call to describe the topic times out
 expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-.andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-// Second round
-expectTopicCreation(TOPIC);
-expectSendRecord();
-expectSendRecord();
 
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+when(admin.createOrFindTopics(any(NewTopic.class)))
+.thenAnswer(new Answer() {
+boolean firstCall = true;
+
+@Override
+public TopicAdmin.TopicCreationResponse 
answer(InvocationOnMock invocation) {
+if (firstCall) {
+firstCall = false;
+throw new RetriableException(new 
TimeoutException("timeout"));
+}
+return createdTopic(TOPIC);
+}
+});
 
 workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   This one is a little bit trickier, as we cannot do partial verification 
without resetting the mock. 
   
   What I ended up doing was checking that calls to `createOrFindTopics` happen 
twice midway, and verify the arguments, then check once again at the end of the 
test - this last verification is a cumulative of all 3 calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-20 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112474934


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+expectSendRecord(emptyHeaders());
 
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord(2);
+
+List> capturedValues = 
sent.getAllValues();
+assertEquals(2, capturedValues.size());
 }
 
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
+private void expectSendRecord(Headers headers) {
 if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(headers);
 
-expectApplyTransformationChain(anyTimes);
+expectApplyTransformationChain();
 
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+expectTaskGetTopic();
+}
 
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-}
-producerCallbacks.reset();
-}
-return null;
-};
+private ArgumentCaptor> verifySendRecord() {
+return verifySendRecord(1);
+}
 
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
+private ArgumentCaptor> 
verifySendRecord(int times) {
+ArgumentCaptor> sent = 
ArgumentCaptor.forClass(ProducerRecord.class);
+ArgumentCaptor producerCallbacks = 
ArgumentCaptor.forClass(Callback.class);
+verify(producer, times(times)).send(sent.capture(), 
producerCallbacks.capture());
 
-expectTaskGetTopic(anyTimes);
+for (Callback cb : producerCallbacks.getAllValues()) {
+cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 
0, 0, 0L, 0, 0),
+null);
+}
 
 return sent;
 }
 
-private Capture> expectSendRecordAnyTimes() 
{
-return expectSendRecord(TOPIC, true, emptyHeaders());
+private void expectTaskGetTopic() {
+when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer) invocation -> {
+String connector = invocation.getArgument(0, String.class);
+String topic = invocation.getArgument(1, String.class);
+return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+});
 }
 
-private Capture> expectSendRecord() {
-return expectSendRecord(TOPIC, false, emptyHeaders());
-}
+private void verifyTaskGetTopic() {
+ArgumentCaptor connectorCapture = 
ArgumentCaptor.forClass(String.class);
+ArgumentCaptor topicCapture = 
ArgumentCaptor.forClass(String.class);
+ArgumentCaptor newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
+verify(statusBackingStore).getTopic(connectorCapture.capture(), 
topicCapture.capture());
 
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-

[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-20 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112464114


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() {
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-// First round
-expectPreliminaryCalls(OTHER_TOPIC);
-expectTopicCreation(TOPIC);
-expectSendRecord();
-expectSendRecord();
-
-// First call to describe the topic times out
-EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
-.andThrow(new RetriableException(new 
TimeoutException("timeout")));
+expectPreliminaryCalls();
 
-// Second round
-expectTopicCreation(OTHER_TOPIC);
-expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+when(admin.describeTopics(anyString())).thenAnswer(new 
Answer>() {
+int counter = 0;
 
-PowerMock.replayAll();
+@Override
+public Map answer(InvocationOnMock 
invocation) {
+counter++;
+if (counter == 2) {
+throw new RetriableException(new 
TimeoutException("timeout"));
+}
 
-// Try to send 3, make first pass, second fail. Should save last two
+return Collections.emptyMap();
+}
+});

Review Comment:
   It certainly is more readable. 
   
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bachmanity1 commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size

2023-02-20 Thread via GitHub


bachmanity1 commented on PR #13261:
URL: https://github.com/apache/kafka/pull/13261#issuecomment-1437758582

   @cmccabe could you please review this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-20 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112459731


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
 public void testSendRecordsConvertsData() {
 createWorkerTask();
 
-List records = new ArrayList<>();
 // Can just use the same record for key and value
-records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-Capture> sent = 
expectSendRecordAnyTimes();
+List records = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD)
+);
 
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
 workerTask.toSend = records;
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
+
 assertEquals(SERIALIZED_KEY, sent.getValue().key());
 assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsPropagatesTimestamp() {
 final Long timestamp = System.currentTimeMillis();
-
 createWorkerTask();
 
-List records = Collections.singletonList(
-new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
-);
-
-Capture> sent = 
expectSendRecordAnyTimes();
-
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
-workerTask.toSend = records;
+workerTask.toSend = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
 assertEquals(timestamp, sent.getValue().timestamp());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsCorruptTimestamp() {
 final Long timestamp = -3L;
 createWorkerTask();
 
-List records = Collections.singletonList(
+expectSendRecord(emptyHeaders());
+expectTopicCreation(TOPIC);

Review Comment:
   Good catch. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-20 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112456303


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+expectSendRecord(emptyHeaders());
 
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord(2);
+
+List> capturedValues = 
sent.getAllValues();
+assertEquals(2, capturedValues.size());
 }
 
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
+private void expectSendRecord(Headers headers) {
 if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(headers);
 
-expectApplyTransformationChain(anyTimes);
+expectApplyTransformationChain();
 
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+expectTaskGetTopic();
+}
 
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-}
-producerCallbacks.reset();
-}
-return null;
-};
+private ArgumentCaptor> verifySendRecord() {
+return verifySendRecord(1);
+}
 
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
+private ArgumentCaptor> 
verifySendRecord(int times) {
+ArgumentCaptor> sent = 
ArgumentCaptor.forClass(ProducerRecord.class);
+ArgumentCaptor producerCallbacks = 
ArgumentCaptor.forClass(Callback.class);
+verify(producer, times(times)).send(sent.capture(), 
producerCallbacks.capture());
 
-expectTaskGetTopic(anyTimes);
+for (Callback cb : producerCallbacks.getAllValues()) {
+cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 
0, 0, 0L, 0, 0),
+null);
+}
 
 return sent;
 }
 
-private Capture> expectSendRecordAnyTimes() 
{
-return expectSendRecord(TOPIC, true, emptyHeaders());
+private void expectTaskGetTopic() {
+when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer) invocation -> {
+String connector = invocation.getArgument(0, String.class);
+String topic = invocation.getArgument(1, String.class);
+return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+});
 }
 
-private Capture> expectSendRecord() {
-return expectSendRecord(TOPIC, false, emptyHeaders());
-}
+private void verifyTaskGetTopic() {
+ArgumentCaptor connectorCapture = 
ArgumentCaptor.forClass(String.class);
+ArgumentCaptor topicCapture = 
ArgumentCaptor.forClass(String.class);
+ArgumentCaptor newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
+verify(statusBackingStore).getTopic(connectorCapture.capture(), 
topicCapture.capture());
 
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-

[jira] [Created] (KAFKA-14736) Kafka Connect REST API: POST/PUT/DELETE requests are not working

2023-02-20 Thread lingsbigm (Jira)
lingsbigm created KAFKA-14736:
-

 Summary: Kafka Connect REST API: POST/PUT/DELETE requests are not 
working
 Key: KAFKA-14736
 URL: https://issues.apache.org/jira/browse/KAFKA-14736
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 3.1.1
 Environment: deverlopment
Reporter: lingsbigm


Hi,
  We now using debezium 1.8.1. Final with kafka connect in distributed mode, 
But suddenly one day we found that we can't add a new connector and found 
nothing in the log when we try to delete a connector or update the 
configuration of the connector, and not work too. 
  Besides, I found connect-configs topic has no messages before the first 
operation, and it also has some messages when updating or deleting the 
connector, but the connector has nothing changed.
  Have Anyone occurred the same problem too? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13100: MINOR: add size check for tagged fields

2023-02-20 Thread via GitHub


showuon commented on PR #13100:
URL: https://github.com/apache/kafka/pull/13100#issuecomment-1437718642

   @mimaison , sorry, my bad, PR updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kowshik commented on pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values

2023-02-20 Thread via GitHub


kowshik commented on PR #13268:
URL: https://github.com/apache/kafka/pull/13268#issuecomment-1437650383

   @junrao Thanks for the review! I've addressed the comments, the PR is ready 
for another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #13281: [MINOR] Adjust logging with ZK log format

2023-02-20 Thread via GitHub


nizhikov opened a new pull request, #13281:
URL: https://github.com/apache/kafka/pull/13281

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino opened a new pull request, #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-20 Thread via GitHub


rondagostino opened a new pull request, #13280:
URL: https://github.com/apache/kafka/pull/13280

   …topic counts
   
   Performance of KRaft metadata image changes is currently O(<# of topics in 
cluster>). This means the amount of time it takes to create just a single topic 
scales linearly with the number of topics in the entire cluster. This impact 
both controllers and brokers because both use the metadata image to represent 
the KRaft metadata log. The performance of these changes should scale with the 
number of topics being changed – creating a single topic should perform 
similarly regardless of the number of topics in the cluster.
   
   This patch introduces a dependency on the 
[Paguro](https://github.com/GlenKPeterson/Paguro/) library for 
immutable/persistent collection support in Java and leverages persistent data 
structures to avoid copying the entire TopicsImage upon every change.  We 
choose this library because it is relatively small and 
[well-integrated](https://github.com/GlenKPeterson/Paguro/blob/main/inheritanceHierarchy.pdf)
 with the existing Java Collections class hierarchy (the latter property 
especially helps to minimize the changes required to introduce the library into 
the existing code base).  The patch also adds the following JMH benchmarks 
demonstrating the resulting performance changes:
   
   - `TopicsImageSingleRecordChangeBenchmark` tracks how long it takes to 
create a new topic.  This is the benchmark that clearly identifies the O(N) 
behavior in the existing code and that most dramatically illustrates a 
performance improvement.
   
   As shown below, the existing code takes several orders of magnitude longer 
to make a single change than the new code.  The existing code, with 12,500 
topics, took 1.4 milliseconds on my laptop and grows more or less linearly as 
the number of topics grows.  The new code took a constant amount of time (~250 
nanoseconds) regardless of the number of topics in the cluster.
   
   The reason for the improvement is because it is inexpensive to add, update, 
or delete an entry in an immutable, persistent map to create a new persistent 
map.  The new map shares the vast amount of the old map; only the root node and 
any nodes along the path to the node that must change are swapped out, and when 
the reference to the old map is released the swapped-out nodes can be 
garbage-collected.
   
   **Current Code, unpatched**
   Total Topic Count | nanoseconds/op | error
   -- | -- | --
   12,500 | 1,410,901 | 153,461
   25,000 | 3,570,451 | 221,992
   50,000 | 14,143,125 | 1,123,616
   100,000 | 31,126,930 | 4,243,625
   
   **Updated Code**
   Total Topic Count | nanoseconds/op | error
   -- | -- | --
   12,500 | 258 | 13
   25,000 | 265 | 8
   50,000 | 273 | 5
   100,000 | 222 | 4
   
   
   - `TopicsImageZonalOutageBenchmark` simulates a zonal outage where each 
broker in the zone will lose its session – in this benchmark we assume the 
controller deals with them one by one rather than demoting 1/3 of the cluster 
all at once.  Since the number of topics per broker does not change very much, 
we expect O(N) behavior with the current code but not with the updated code, so 
we should see a performance improvement here as well -- and in fact we do.
   
   The existing code scales with the number of topics in the cluster, thus the 
time always doubles as the cluster size doubles, increasing from 5ms to 47ms (a 
factor of 9) as the cluster scales by a factor of 8.  The updated code should 
scale with the number of affected topics, which in this case, based on the 
(debatable) values chosen of 1 replicas per broker and 10 partitions per 
topic, means a factor of 1.6 (from 4167 topics affected to 6667 topics 
affected) as the cluster scaled by a factor of 8.  In fact we see the time 
spent increasing by a factor of 2.6 (from 4.4 ms to 11.6 ms) when the cluster 
scaled by that factor of 8.  This a bit higher than expected, but it is still 
sub-linear (and there is some +/- error in these numbers, so the sub-linear 
behavior is the real point as opposed to the specific number).
   
   **Current Code, unpatched**
   Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics 
Impacted)
   -- | -- | -- | -- | --
   12,500 | 5.2 | 0.4 | 1/36 | 4,167
   25,000 | 10.6 | 0.1 | 1/75 | 5,000
   50,000 | 21.7 | 0.4 | 1/150 | 6,667
   100,000 | 47.7 | 5.2 | 1/300 | 6,667
   
   **Updated Code**
   Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics 
Impacted)
   -- | -- | -- | -- | --
   12,500 | 4.4 | 0.2 | 1/36 | 4,167
   25,000 | 6.9 | 0.2 | 1/75 | 5,000
   50,000 | 10.2 | 2.5 | 1/150 | 6,667
   100,000 | 11.6 | 2.8 | 1/300 | 6,667
   
   
   - `TopicsImageSnapshotLoadBenchmark` simulates the loading of a snapshot 
when the broker starts – i.e. load up 100,000 topics/1M partitions from scratch 
and commit them all at once.  We would expect to see some performance 
degradation here in the updated code, and the 

[jira] [Created] (KAFKA-14735) Improve KRaft metadata image change performance at high topic counts

2023-02-20 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14735:
-

 Summary: Improve KRaft metadata image change performance at high 
topic counts
 Key: KAFKA-14735
 URL: https://issues.apache.org/jira/browse/KAFKA-14735
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.5.0


Performance of KRaft metadata image changes is currently O(<# of topics in 
cluster>).  This means the amount of time it takes to create just a *single* 
topic scales linearly with the number of topics in the entire cluster.  This 
impact both controllers and brokers because both use the metadata image to 
represent the KRaft metadata log.  The performance of these changes should 
scale with the number of topics being changed -- so creating a single topic 
should perform similarly regardless of the number of topics in the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools

2023-02-20 Thread via GitHub


vamossagar12 commented on PR #13127:
URL: https://github.com/apache/kafka/pull/13127#issuecomment-1437373209

   > > Regarding moving to CommandDefaultOptions, is it better if we do it in a 
follow up PR as that is not directly connected to migrating to tools module.
   > 
   > Hi @vamossagar12, I'm ok with doing it in a follow up PR. Do we have a 
Jira for that?
   
   Thanks @fvaleri , here you go: 
https://issues.apache.org/jira/browse/KAFKA-14734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14734) Use CommandDefaultOptions in StreamsResetter

2023-02-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14734:
-

 Summary: Use CommandDefaultOptions in StreamsResetter 
 Key: KAFKA-14734
 URL: https://issues.apache.org/jira/browse/KAFKA-14734
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao


This came up as a suggestion here: 
[https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-02-20 Thread via GitHub


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1437369614

   Thanks @mimaison . I addressed the comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-8752) Ensure plugin classes are instantiable when discovering plugins

2023-02-20 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8752.
--
Resolution: Not A Problem

> Ensure plugin classes are instantiable when discovering plugins
> ---
>
> Key: KAFKA-8752
> URL: https://issues.apache.org/jira/browse/KAFKA-8752
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Minor
> Attachments: stacktrace.log
>
>
> While running integration tests from the IntelliJ IDE, it appears plugins 
> fail to load in {{DelegatingClassLoader.scanUrlsAndAddPlugins}}. The reason 
> was, in this case, that the class 
> {{org.apache.kafka.connect.connector.ConnectorReconfigurationTest$TestConnector}}
>  could not be instantiated - which it does not intend to be.
> The problem does not appear when running integration tests with Gradle as the 
> runtime closure is different from IntelliJ - which includes test sources from 
> module dependencies on the classpath.
> While debugging this minor inconvenience, I could see that 
> {{DelegatingClassLoader}} performs a sanity check on the plugin class to 
> instantiate - as of now, it verifies the class is concrete. A quick fix for 
> the problem highlighted above could to add an extra condition on the Java 
> modifiers of the class to ensure it will be instantiable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module

2023-02-20 Thread via GitHub


Hangleton commented on code in PR #13214:
URL: https://github.com/apache/kafka/pull/13214#discussion_r1112171337


##
tools/src/main/java/org/apache/kafka/tools/MessageReader.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Typical implementations of this interface convert data from an {@link 
InputStream} received via
+ * {@link MessageReader#init(InputStream, Properties)} into a {@link 
ProducerRecord} instance on each
+ * invocation of `{@link MessageReader#readMessage()}`.
+ *
+ * This is used by the {@link ConsoleProducer}.
+ */
+public interface MessageReader {

Review Comment:
   Thanks @fvaleri and apologies for the delay, I will look at the KIP. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13100: MINOR: add size check for tagged fields

2023-02-20 Thread via GitHub


mimaison commented on PR #13100:
URL: https://github.com/apache/kafka/pull/13100#issuecomment-1437298488

   @showuon Thanks for the PR. The new test is failing with
   ```
   org.opentest4j.AssertionFailedError: expected:  but was: 
   ```
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13100/3/testReport/org.apache.kafka.common.protocol.types/ProtocolSerializationTest/Build___JDK_11_and_Scala_2_13___testReadTaggedFieldsSizeTooLarge__/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-02-20 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1437297790

   Hello David, I updated the PR to take into account your comments and have 
been adding tests. Almost ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov closed pull request #13271: KAFKA-14730 AdminOperationException moved to java

2023-02-20 Thread via GitHub


nizhikov closed pull request #13271: KAFKA-14730 AdminOperationException moved 
to java
URL: https://github.com/apache/kafka/pull/13271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java

2023-02-20 Thread via GitHub


nizhikov commented on PR #13271:
URL: https://github.com/apache/kafka/pull/13271#issuecomment-1437279016

   @mimaison OK. Do you have some guidelines on how move should be implemented 
exactly? If straight rewriting don't work here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java

2023-02-20 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1437274047

   @mimaison 
   
   > I think we should directly move the classes to the tools module instead of 
temporarily keeping them in core
   
   Are you suggesting to perform all moving in one PR?
   Or you have some plan how to performa moving in several steps? If yes, 
please, share som guideline on how should I implement it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java

2023-02-20 Thread via GitHub


mimaison commented on PR #13271:
URL: https://github.com/apache/kafka/pull/13271#issuecomment-1437268624

   Thanks for the PR. Rather than updating all the tools, I wonder if we should 
just create the new exception in tools. Then each tool can start using it when 
it's moved. There are a few other PRs in flights so let's avoid creating many 
conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java

2023-02-20 Thread via GitHub


mimaison commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1437252881

   I think we should directly move the classes to the tools module instead of 
temporarily keeping them in core.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-02-20 Thread via GitHub


mimaison commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1112025397


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer

Review Comment:
   Can we update the comment to include the topic argument that's required too?
   
   Also let's format this javadoc comment a bit so it renders nicely



##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+private final static long POLL_TIMEOUT_MS = 6;
+private final static short DEFAULT_REPLICATION_FACTOR = 1;
+private final static int DEFAULT_NUM_PARTITIONS = 1;
+
+public 

[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1437118603

   @dajac Thanks for the review. I have addressed the comments and left some 
questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112029042


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   I wanted to put this test into a class that had FFF enabled, but I couldn't 
find any integration tests for FFF. So included here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112027236


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {
+val partitionList = servers.indices.toList
+
+val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks"
+createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size)
+
+// Racks are in order of broker ids, assign leaders in reverse order
+val topicWithSingleRackPartitions = "topicWithSingleRackPartitions"
+createTopicWithAssignment(topicWithSingleRackPartitions, 
partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap)
+
+// Create consumers with instance ids in ascending order, with racks in 
the same order.
+
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RangeAssignor].getName)
+val consumers = servers.map { server =>
+  consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
server.config.rack.orNull)
+  consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
s"instance-${server.config.brokerId}")
+  createConsumer()
+}
+
+val executor = Executors.newFixedThreadPool(consumers.size)
+def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = {
+  val futures = consumers.zipWithIndex.map { case (consumer, i) =>
+executor.submit(() => awaitAssignment(consumer, assignments(i)), 0)
+  }
+  futures.foreach(future => assertEquals(0, future.get(20, 
TimeUnit.SECONDS)))
+}
 
-class RackAwareAssignor extends RoundRobinAssignor {
-  override def assign(partitionsPerTopic: util.Map[String, Integer], 
subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): 
util.Map[String, util.List[TopicPartition]] = {
-assertEquals(1, subscriptions.size())
-assertEquals(Optional.of("rack-a"), 
subscriptions.values.asScala.head.rackId)
-super.assign(partitionsPerTopic, subscriptions)
+try {
+  // Rack-based assignment results in partitions assigned in reverse order 
since partition racks are in the reverse order.
+  
consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions)))
+  waitForAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithSingleRackPartitions, p
+
+  // Non-rack-aware assignment results in ordered partitions.
+  
consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks)))
+  waitForAssignments(partitionList.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p
+
+  // Rack-aware assignment with co-partitioning results in reverse 
assignment for both topics.
+  consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, 
topicWithAllPartitionsOnAllRacks).asJava))
+  waitForAssignments(partitionList.reverse.map(p => Set(new 
TopicPartition(topicWithAllPartitionsOnAllRacks, p), new 
TopicPartition(topicWithSingleRackPartitions, p

Review Comment:
   No, we don't do any leader-specific assignment, we are assuming all replicas 
are equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112026015


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
-import kafka.server.QuotaType
-import kafka.server.KafkaServer
-import org.apache.kafka.clients.admin.NewPartitions
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
 import scala.collection.mutable
+import scala.collection.mutable.Buffer
+import scala.jdk.CollectionConverters._
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to 
keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
 
+  override def modifyConfigs(props: collection.Seq[Properties]): Unit = {
+super.modifyConfigs(props)
+props.zipWithIndex.foreach{ case (p, i) => 
p.setProperty(KafkaConfig.RackProp, i.toString) }

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112016639


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112015848


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112014546


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
 String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
+Subscription subscription = subscriptionEntry.getValue();
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+for (String topic : subscription.topics()) {
 put(topicToConsumers, topic, memberInfo);
 }
 }
 return topicToConsumers;
 }
 
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey(
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
 for (String memberId : subscriptions.keySet())
 assignment.put(memberId, new ArrayList<>());
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
+
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
+
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 
assignment) {
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+assignmentStates.stream().collect(Collectors.groupingBy(t -> 
t.consumers)).forEach((consumers, states) -> {
+states.stream().collect(Collectors.groupingBy(t -> 
t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
+if (coPartitionedStates.size() > 1)
+assignCoPartitionedWithRackMatching(consumers, 
numPartitions, states, assignment);
+else {
+TopicAssignmentState state = coPartitionedStates.get(0);
+if (state.needsRackAwareAssignment)
+assignRanges(state, state::racksMatch, assignment);
+}
+});
+});
+}
+
+private void assignCoPartitionedWithRackMatching(List consumers,
+ int 

[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-02-20 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011199


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -63,9 +76,19 @@
  * I0: [t0p0, t0p1, t1p0, t1p1]
  * I1: [t0p2, t1p2]
  * 
+ * 
+ * Rack-aware assignment is used if both consumer and partition replica racks 
are available and
+ * some partitions have replicas only on a subset of racks. We attempt to 
match consumer racks with
+ * partition replica racks on a best-effort basis, prioritizing balanced 
assignment over rack alignment.
+ * Topics with equal partition count and same set of subscribers prioritize 
co-partitioning guarantee
+ * over rack alignment. In this case, aligning partition replicas of these 
topics on the same racks
+ * will improve locality for consumers. For example, if partitions 0 of all 
topics have a replica on
+ * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be 
assigned to a consumer
+ * on rack 'a', partition 1 to a consumer on rack 'b' and so on.

Review Comment:
   Yes, that is correct. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -76,43 +99,185 @@ private Map> 
consumersPerTopic(Map> topicToConsumers = new HashMap<>();
 for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13279: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded

2023-02-20 Thread via GitHub


chia7712 opened a new pull request, #13279:
URL: https://github.com/apache/kafka/pull/13279

   https://issues.apache.org/jira/browse/KAFKA-14295
   
   The broker topic metric FetchMessageConversionsPerSec doesn't get recorded 
on a fetch message conversion.
   The bug is that we pass in a callback that expects a MultiRecordsSend in 
KafkaApis:
   ```scala
   def updateConversionStats(send: Send): Unit = {
 send match {
   case send: MultiRecordsSend if send.recordConversionStats != null =>
 send.recordConversionStats.asScala.toMap.foreach {
   case (tp, stats) => updateRecordConversionStats(request, tp, stats)
 }
   case _ =>
 }
   } 
   ```
   But we call this callback with a NetworkSend in the SocketServer:
   ```scala
   selector.completedSends.forEach { send =>
 try {
   val response = inflightResponses.remove(send.destinationId).getOrElse {
 throw new IllegalStateException(s"Send for ${send.destinationId} 
completed, but not in `inflightResponses`")
   }
   updateRequestMetrics(response)
   
   // Invoke send completion callback
   response.onComplete.foreach(onComplete => onComplete(send))
   ```
   Note that Selector.completedSends returns a collection of NetworkSend
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14295) FetchMessageConversionsPerSec meter not recorded

2023-02-20 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-14295:
--

Assignee: Chia-Ping Tsai

> FetchMessageConversionsPerSec meter not recorded
> 
>
> Key: KAFKA-14295
> URL: https://issues.apache.org/jira/browse/KAFKA-14295
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Mao
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on 
> a fetch message conversion.
> The bug is that we pass in a callback that expects a MultiRecordsSend in 
> KafkaApis:
> {code:java}
> def updateConversionStats(send: Send): Unit = {
>   send match {
> case send: MultiRecordsSend if send.recordConversionStats != null =>
>   send.recordConversionStats.asScala.toMap.foreach {
> case (tp, stats) => updateRecordConversionStats(request, tp, stats)
>   }
> case _ =>
>   }
> } {code}
> But we call this callback with a NetworkSend in the SocketServer:
> {code:java}
> selector.completedSends.forEach { send =>
>   try {
> val response = inflightResponses.remove(send.destinationId).getOrElse {
>   throw new IllegalStateException(s"Send for ${send.destinationId} 
> completed, but not in `inflightResponses`")
> }
> updateRequestMetrics(response)
> // Invoke send completion callback
> response.onComplete.foreach(onComplete => onComplete(send))
> ...{code}
> Note that Selector.completedSends returns a collection of NetworkSend



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-20 Thread via GitHub


satishd commented on PR #13255:
URL: https://github.com/apache/kafka/pull/13255#issuecomment-1437063403

   Thanks @junrao @ijuma for your review. Addressed them with inline 
replies/commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-20 Thread via GitHub


lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r950411


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -258,21 +269,27 @@ private List getTasksAndActions() {
 }
 
 private void addTask(final Task task) {
+final TaskId taskId = task.id();
 if (isStateless(task)) {
 addToRestoredTasks((StreamTask) task);
-log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+} else if (topologyMetadata.isPaused(taskId.topologyName())) {
+pausedTasks.put(taskId, task);

Review Comment:
   Sorry, almost missed this comment. I think this would work, but it would 
mean cycling through the list of all tasks in every single iteration of main 
state updater loop. I thought the pause code was only run after `commitMs` to 
reduce this overhead, so I thought it makes sense. It's different for resume, 
because we get a resume signal from the stream thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14733) Update AclAuthorizerTest to run tests for both zk and kraft mode

2023-02-20 Thread Purshotam Chauhan (Jira)
Purshotam Chauhan created KAFKA-14733:
-

 Summary: Update AclAuthorizerTest to run tests for both zk and 
kraft mode
 Key: KAFKA-14733
 URL: https://issues.apache.org/jira/browse/KAFKA-14733
 Project: Kafka
  Issue Type: Improvement
Reporter: Purshotam Chauhan
Assignee: Purshotam Chauhan


Currently, we have two test classes AclAuthorizerTest and 
StandardAuthorizerTest that are used exclusively for zk and kraft mode.

But AclAuthorizerTest has a lot of tests covering various scenarios. We should 
change AclAuthorizerTest to run for both zk and kraft modes so as to keep 
parity between both modes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-20 Thread via GitHub


lucasbru commented on PR #13025:
URL: https://github.com/apache/kafka/pull/13025#issuecomment-1436959926

   > @lucasbru the pause/ resume integration test fails again for J11/S13, 
could you take a look into it?
   
   Ah yes, that was coming from the rebase. should be fixed now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-20 Thread via GitHub


lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r909621


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -3159,7 +3159,7 @@ private void addRecord(final MockConsumer 
mockConsumer,
 }
 
 StreamTask activeTask(final TaskManager taskManager, final TopicPartition 
partition) {
-final Stream standbys = 
taskManager.allTasks().values().stream().filter(Task::isActive);
+final Stream standbys = 
taskManager.allOwnedTasks().values().stream().filter(Task::isActive);

Review Comment:
   Good point, we don't



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-02-20 Thread via GitHub


showuon commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436957769

   > > @tinaselenge , thanks for the PR. Some questions:
   > > 
   > > 1. This PR creates a new `DelegationTokenCommand` class, but there's no 
old `DelegationTokenCommand` class removal. Why is that?
   > > 2. The original `DelegationTokenCommandTest` is an integration test, but 
now we changed to unit test by mockAdminClient, why do we change that?
   > > 
   > > Thanks.
   > 
   > Hi @showuon
   > 
   > 1. I have removed the existing Scala class and its test.
   > 
   > 2. I thought it's good enough to test it using the mock as it's not 
really doing anything specific with the cluster. I understand that changes the 
test behaviour. If you think we should test the tool against an integration 
test cluster, I'm happy to change it back. Please let me know.
   
   For (1), thanks for the update
   For (2), yes, I think the integration test is important, especially after 
KRaft starting to support delegation token, we should rely on integration tests 
to make sure everything works fine on both ZK and KRaft mode. But the mock 
client implementation are also great to be added. Those could also be kept. 
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-02-20 Thread via GitHub


tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436890093

   > @tinaselenge , thanks for the PR. Some questions:
   > 
   > 1. This PR creates a new `DelegationTokenCommand` class, but there's no 
old `DelegationTokenCommand` class removal. Why is that?
   > 2. The original `DelegationTokenCommandTest` is an integration test, but 
now we changed to unit test by mockAdminClient, why do we change that?
   > 
   > Thanks.
   
   Hi @showuon 
   
   1. I have removed the existing Scala class and its test.
   
   2. I thought it's good enough to test it using the mock as it's not really 
doing anything specific with the cluster. I understand that changes the test 
behaviour. If you think we should test the tool against an integration test 
cluster, I'm happy to change it back. Please let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-20 Thread via GitHub


mukkachaitanya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r817237


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1870,7 +1880,16 @@ private Callable 
getConnectorStoppingCallable(final String connectorName)
 };
 }
 
-private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName) {
+/**
+ * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+ * have changed. This method retries infinitely in case of any errors.
+ *
+ * @param initialRequestTime the time in milliseconds when the original 
request was made (i.e. before any retries)
+ * @param connName the name of the connector
+ * @param exponentialBackoff {@link ExponentialBackoff} used to calculate 
the retry backoff duration
+ * @param attempts the number of retry attempts that have been made
+ */
+private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName, ExponentialBackoff exponentialBackoff, int attempts) {

Review Comment:
   I see currently we are always gonna do an ExponentialBackoff. Should we 
simply move the logic to set up the `ExponentialBackoff`in this function? I was 
thinking something like
   ```java
   private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName, ExponentialBackoff exponentialBackoff, int attempts) {
   ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
   RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
   2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
   0);
   reconfigureConnectorTasksWithExpontialBackoff(initialRequestTime, 
connName, exponentialBackoff, attempts + 1);
   }
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1870,7 +1880,16 @@ private Callable 
getConnectorStoppingCallable(final String connectorName)
 };
 }
 
-private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName) {
+/**
+ * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+ * have changed. This method retries infinitely in case of any errors.

Review Comment:
   I am curious if there is a way to not do infinite retries. If we are 
actually retrying infinitely, esp in the case of `startConnector` phase, then 
the connector just doesn't have tasks. Is it possible to somehow bubble up 
errors as part of connector (not task) status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-02-20 Thread via GitHub


mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r724503


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class LogDirsCommand {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+private static void execute(String... args) throws Exception {
+LogDirsCommandOptions options = new LogDirsCommandOptions(args);
+try (Admin adminClient = createAdminClient(options)) {
+execute(options, adminClient);
+}
+}
+
+static void execute(LogDirsCommandOptions options, Admin adminClient) 
throws Exception {
+Set topics = options.topics();
+Set clusterBrokers = 
adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+Set inputBrokers = options.brokers();
+Set existingBrokers = inputBrokers.isEmpty() ? new 
HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers);
+existingBrokers.retainAll(clusterBrokers);
+Set nonExistingBrokers = new HashSet<>(inputBrokers);
+nonExistingBrokers.removeAll(clusterBrokers);
+
+if (!nonExistingBrokers.isEmpty()) {
+throw new TerseException(
+String.format(

Review Comment:
   Nit: we don't need the newline here since we print the exception message 
using `println()` in `mainNoExit()`



##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import