[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2021-02-26 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10437:
---
Attachment: image.png

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> In addition to implementing the KIP, search for and resolve these todos:
> {color:#008dde}TODO will be fixed in KAFKA-10437{color}
> Also, add unit tests in test-utils making sure we can initialize _all_ the 
> kinds of store with the MPC and MPC.getSSC.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2021-02-26 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-10437:
---
Attachment: (was: image.png)

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> In addition to implementing the KIP, search for and resolve these todos:
> {color:#008dde}TODO will be fixed in KAFKA-10437{color}
> Also, add unit tests in test-utils making sure we can initialize _all_ the 
> kinds of store with the MPC and MPC.getSSC.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #10229: MINOR: Account for extra whitespaces in WordCountDemo

2021-02-26 Thread GitBox


dengziming commented on pull request #10229:
URL: https://github.com/apache/kafka/pull/10229#issuecomment-786993345


   @mjsax ,Hello, this is inspired by #10044 which forget 
`WordCountProcessorDemo` and `WordCountTransformerDemo`, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10229: MINOR: Account for extra whitespaces in WordCountDemo

2021-02-26 Thread GitBox


dengziming opened a new pull request #10229:
URL: https://github.com/apache/kafka/pull/10229


   *More detailed description of your change*
   As the title.
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8830: KAFKA-10116: GraalVM native-image prototype

2021-02-26 Thread GitBox


ijuma commented on pull request #8830:
URL: https://github.com/apache/kafka/pull/8830#issuecomment-786989056


   Rebased and regenerated the configs with graalvm ce 21.0.0.2. Need to 
investigate the error during start-up:
   
   > Exception in thread "main" java.lang.NoClassDefFoundError: Could not 
initialize class kafka.utils.Exit$
   >at 
com.oracle.svm.core.classinitialization.ClassInitializationInfo.initialize(ClassInitializationInfo.java:239)
   >at kafka.Kafka$.main(Kafka.scala:122)
   >at kafka.Kafka.main(Kafka.scala)
   > 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584019199



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);

Review comment:
   Hmm, you mean PartitionChangeRecord? I don't see PartitionChangeRecord 
being generated from the topicDeletion request.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -809,6 +824,27 @@ private QuorumController(LogContext logContext,
 () -> replicationControl.unregisterBroker(brokerId));
 }
 
+@Override
+public CompletableFuture>> 
findTopicIds(Collection names) {
+if (names.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendReadEvent("findTopicIds",
+() -> replicationControl.findTopicIds(lastCommittedOffset, names));
+}
+
+@Override
+public CompletableFuture>> 
findTopicNames(Collection ids) {
+if (ids.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendReadEvent("findTopicNames",
+() -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+}
+
+@Override
+public CompletableFuture> 
deleteTopics(Collection ids) {
+if (ids.isEmpty()) return 
CompletableFuture.completedFuture(Collections.emptyMap());
+return appendWriteEvent("deleteTopics",
+() -> replicationControl.deleteTopics(ids));

Review comment:
   We also need to delete the configuration associated with the topic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-02-26 Thread GitBox


showuon commented on pull request #10228:
URL: https://github.com/apache/kafka/pull/10228#issuecomment-786981927


   @abbccdda @ableegoldman @mjsax , could you help review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10228: KAFKA-10251: increase timeout for consumeing records

2021-02-26 Thread GitBox


showuon opened a new pull request #10228:
URL: https://github.com/apache/kafka/pull/10228


   We need to wait for the translation state changed to `READY` to start 
consuming the records. But we didn't have any way to change the 
transationManager state in client, so we can just wait. I've confirmed that if 
we try another time, we can pass the tests. My test code is like this:
   
   ```java
   var isFailed = false
   try {
 pollRecordsUntilTrue(consumer, pollAction,
   waitTimeMs = waitTimeMs,
   msg = s"Consumed ${records.size} records before timeout instead of 
the expected $numRecords records")
   } catch {
 case e: AssertionFailedError => {
   isFailed = true
   System.err.println(s"!!! Consumed ${records.size} records before 
timeout instead of the expected $numRecords records")
 }
   }
   
   if (isFailed) {
 pollRecordsUntilTrue(consumer, pollAction,
   waitTimeMs = 3,
   msg = s"Consumed ${records.size} records before timeout instead of 
the expected $numRecords records")
   
 // if we go to this step, it means it passed in 2nd try
 fail("failed at 1st try")
   }
   ```
   
   And they failed with `failed at 1st try`, which confirmed that we can pass 
the tests by increasing the timeout. Thanks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2021-02-26 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12384:
---

 Summary: Flaky Test 
ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
 Key: KAFKA-12384
 URL: https://issues.apache.org/jira/browse/KAFKA-12384
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Reporter: Matthias J. Sax


{quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: 
<(-1,-1)> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) 
at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-26 Thread GitBox


showuon commented on a change in pull request #10185:
URL: https://github.com/apache/kafka/pull/10185#discussion_r584015842



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -455,27 +470,44 @@ private static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect
 "Connector " + connector.getSimpleName() + " tasks did not 
start in time on cluster: " + connectCluster);
 }
 }
- 
+
+/*
+ * wait for the topic created on the cluster
+ */
+private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, 
String topicName) throws InterruptedException {
+try (final Admin adminClient = cluster.createAdminClient()) {
+waitForCondition(() -> 
adminClient.listTopics().names().get().contains(topicName), 
OFFSET_SYNC_DURATION_MS,
+"Topic: " + topicName + " didn't get created in the cluster"
+);
+}
+}
+
 /*
  * delete all topics of the input kafka cluster
  */
 private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws 
Exception {
-Admin client = cluster.createAdminClient();
-client.deleteTopics(client.listTopics().names().get());
+try (final Admin adminClient = cluster.createAdminClient()) {
+Set topicsToBeDeleted = 
adminClient.listTopics().names().get();
+log.debug("Deleting topics: {} ", topicsToBeDeleted);
+adminClient.deleteTopics(topicsToBeDeleted).all().get();
+} catch (final Throwable e) {

Review comment:
   Also remove the change in PR description





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-02-26 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17292005#comment-17292005
 ] 

Luke Chen commented on KAFKA-10251:
---

This flaky test also points to the same root cause.

kafka.api.TransactionsBounceTest.testWithGroupId()


org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead 
of the expected 200 records
 at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
 at org.junit.jupiter.api.Assertions.fail(Assertions.java:117)
 at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852)
 at 
kafka.api.TransactionsBounceTest.testWithGroupId(TransactionsBounceTest.scala:109)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working

2021-02-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12383:


 Summary: Get RaftClusterTest.java and other KIP-500 junit tests 
working
 Key: KAFKA-12383
 URL: https://issues.apache.org/jira/browse/KAFKA-12383
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Colin McCabe
Assignee: David Arthur






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-26 Thread GitBox


showuon commented on pull request #10185:
URL: https://github.com/apache/kafka/pull/10185#issuecomment-786978491


   @mimaison , thanks for your comments. I've addressed them and updated in 
this commit: 
https://github.com/apache/kafka/pull/10185/commits/de95286d9a6463d1fb1ac4b2d5bfd45360aec3a3.
 Please review again. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12382) Create KIP-500 README for the 2.8 rleease

2021-02-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12382:


 Summary: Create KIP-500 README for the 2.8 rleease
 Key: KAFKA-12382
 URL: https://issues.apache.org/jira/browse/KAFKA-12382
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12382) Create KIP-500 README for the 2.8 release

2021-02-26 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12382:
-
Summary: Create KIP-500 README for the 2.8 release  (was: Create KIP-500 
README for the 2.8 rleease)

> Create KIP-500 README for the 2.8 release
> -
>
> Key: KAFKA-12382
> URL: https://issues.apache.org/jira/browse/KAFKA-12382
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-26 Thread GitBox


showuon commented on a change in pull request #10185:
URL: https://github.com/apache/kafka/pull/10185#discussion_r584013857



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -455,27 +470,44 @@ private static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect
 "Connector " + connector.getSimpleName() + " tasks did not 
start in time on cluster: " + connectCluster);
 }
 }
- 
+
+/*
+ * wait for the topic created on the cluster
+ */
+private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, 
String topicName) throws InterruptedException {
+try (final Admin adminClient = cluster.createAdminClient()) {
+waitForCondition(() -> 
adminClient.listTopics().names().get().contains(topicName), 
OFFSET_SYNC_DURATION_MS,

Review comment:
   You're right, I created a `TOPIC_SYNC_DURATION_MS` for it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic

2021-02-26 Thread GitBox


showuon commented on a change in pull request #10185:
URL: https://github.com/apache/kafka/pull/10185#discussion_r584013620



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -455,27 +470,44 @@ private static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect
 "Connector " + connector.getSimpleName() + " tasks did not 
start in time on cluster: " + connectCluster);
 }
 }
- 
+
+/*
+ * wait for the topic created on the cluster
+ */
+private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, 
String topicName) throws InterruptedException {
+try (final Admin adminClient = cluster.createAdminClient()) {
+waitForCondition(() -> 
adminClient.listTopics().names().get().contains(topicName), 
OFFSET_SYNC_DURATION_MS,
+"Topic: " + topicName + " didn't get created in the cluster"
+);
+}
+}
+
 /*
  * delete all topics of the input kafka cluster
  */
 private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws 
Exception {
-Admin client = cluster.createAdminClient();
-client.deleteTopics(client.listTopics().names().get());
+try (final Admin adminClient = cluster.createAdminClient()) {
+Set topicsToBeDeleted = 
adminClient.listTopics().names().get();
+log.debug("Deleting topics: {} ", topicsToBeDeleted);
+adminClient.deleteTopics(topicsToBeDeleted).all().get();
+} catch (final Throwable e) {

Review comment:
   Make sense to me. Updated. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10226: MINOR: fix kafka-metadata-shell.sh

2021-02-26 Thread GitBox


cmccabe merged pull request #10226:
URL: https://github.com/apache/kafka/pull/10226


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584009213



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);
+log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+}

Review comment:
   We haven't hooked that up yet, correct.  But that logic is in 
`BrokerMetadataListener`. It would probably be better to have a separate PR for 
that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r584007497



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);

Review comment:
   The ISRs should already have been updated by `BrokerChangeRecords` that 
were previously replayed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10199: KAFKA-12374: Add missing config sasl.mechanism.controller.protocol

2021-02-26 Thread GitBox


cmccabe commented on pull request #10199:
URL: https://github.com/apache/kafka/pull/10199#issuecomment-786970988


   I filed https://issues.apache.org/jira/browse/KAFKA-12381 to follow up on 
the auto-topic-creation behavior change discussion.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10199: KAFKA-12374: Add missing config sasl.mechanism.controller.protocol

2021-02-26 Thread GitBox


cmccabe merged pull request #10199:
URL: https://github.com/apache/kafka/pull/10199


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-02-26 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291991#comment-17291991
 ] 

Colin McCabe commented on KAFKA-12381:
--

cc [~rndgstn], [~hachikuji], [~ijuma]

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-02-26 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12381:
-
Affects Version/s: 2.8.0

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-02-26 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-12381:


Assignee: Boyang Chen

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-02-26 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12381:


 Summary: Incompatible change in verifiable_producer.log in 2.8
 Key: KAFKA-12381
 URL: https://issues.apache.org/jira/browse/KAFKA-12381
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe


In test_verifiable_producer.py , we used to see this error message in 
verifiable_producer.log when a topic couldn't be created:

WARN [Producer clientId=producer-1] Error while fetching metadata with 
correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it used 
to pass.

Now we are instead seeing this in the log file:

WARN [Producer clientId=producer-1] Error while fetching metadata with 
correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
(org.apache.kafka.clients.NetworkClient)
And of course now the test fails.

The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
manager.

It is a simple matter to make the test pass -- I have confirmed that it passes 
if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
LEADER_NOT_AVAILABLE.

I think we just need to decide if this change in behavior is acceptable or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #10227: MINOR: add a README for KIP-500

2021-02-26 Thread GitBox


cmccabe opened a new pull request #10227:
URL: https://github.com/apache/kafka/pull/10227


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10226: MINOR: fix kafka-metadata-shell.sh

2021-02-26 Thread GitBox


cmccabe opened a new pull request #10226:
URL: https://github.com/apache/kafka/pull/10226


   * Fix CLASSPATH issues in the startup script
   
   * Fix overly verbose log messages during loading
   
   * Update to use the new MetadataRecordSerde (this is needed now that we
 have a frame version)
   
   * Fix initialization



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10225: MINOR: fix security_test for ZK case due to error change

2021-02-26 Thread GitBox


hachikuji commented on pull request #10225:
URL: https://github.com/apache/kafka/pull/10225#issuecomment-786959937


   @rondagostino Thanks for identifying the issue. Returning 
`INVALID_REPLICATION_FACTOR` seems like a mistake to me if we were previously 
returning `LEADER_NOT_AVAILABLE`. I'd suggest we fix the code. Would you mind 
filing a JIRA for this so that we can mark it as a 2.8 blocker?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10225: MINOR: fix security_test for ZK case due to error change

2021-02-26 Thread GitBox


rondagostino commented on pull request #10225:
URL: https://github.com/apache/kafka/pull/10225#issuecomment-786955706


   @abbccdda, @cmccabe.  Ported from #10199 to discuss separately.  We used to 
see this error message in `verifiable_producer.log` when 
`security_protocol='PLAINTEXT', interbroker_security_protocol='SSL'`:
   
   ```
   WARN [Producer clientId=producer-1] Error while fetching metadata with 
correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
   ```
   The test does a `grep LEADER_NOT_AVAILABLE` on the log in this case, and it 
used to pass.
   
   Now we are instead seeing this in the log file:
   
   ```
   WARN [Producer clientId=producer-1] Error while fetching metadata with 
correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
(org.apache.kafka.clients.NetworkClient)
   ```
   
   And of course now the test fails.
   
   The `INVALID_REPLICATION_FACTOR` is coming from the auto topic creation 
manager as I described above.
   
   It is a simple matter to make the test pass -- I have confirmed that it 
passes if we `grep` for `INVALID_REPLICATION_FACTOR` in the log file instead of 
`LEADER_NOT_AVAILABLE`.
   
   I think we just need to decide if this change in behavior is acceptable or 
not.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10225: MINOR: fix security_test for ZK case due to error change

2021-02-26 Thread GitBox


rondagostino opened a new pull request #10225:
URL: https://github.com/apache/kafka/pull/10225


   The ZooKeeper version of this system test is failing because the producer is 
no longer seeing `LEADER_NOT_AVAILABLE`. When the broker sees a METADATA 
request for the test topic after it restarts the auto topic creation manager is 
determining that the topic needs to be created due to the TLS hostname 
verification failure on the inter-broker security protocol.  It also thinks 
there aren't enough brokers available to meet the default topic replication 
factor (it sees 0 available due to the TLS issue), so it 
returns`INVALID_REPLICATION_FACTOR` for that topic in the Metadata response. In 
other words, the flow has changed and the inability to produce is not 
manifesting as it was before, and the test is failing.  This patch updates the 
test to check for `INVALID_REPLICATION_FACTOR` instead of 
`LEADER_NOT_AVAILABLE`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


junrao commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583984728



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);
+log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+}

Review comment:
   I guess we haven't hooked up the logic to trigger the deletion of the 
replicas of the deleted topic in the broker?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);

Review comment:
   Should we update brokersToIsrs too?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10224: MINOR: Disable transactional streams system tests for Raft quorums

2021-02-26 Thread GitBox


rondagostino opened a new pull request #10224:
URL: https://github.com/apache/kafka/pull/10224


   Transactions are not supported in the KIP-500 early access release. This 
patch disables a system test for Raft metadata quorums that uses transactions 
and that was still enabled after #10194.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-02-26 Thread GitBox


rhauch commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r583981010



##
File path: build.gradle
##
@@ -2026,52 +2043,53 @@ project(':connect:runtime') {
   archivesBaseName = "connect-runtime"
 
   dependencies {
-
-compile project(':connect:api')
-compile project(':clients')
-compile project(':tools')
-compile project(':connect:json')
-compile project(':connect:transforms')
-
-compile libs.slf4jApi
-compile libs.jacksonJaxrsJsonProvider
-compile libs.jerseyContainerServlet
-compile libs.jerseyHk2
-compile libs.jaxbApi // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.activation // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.jettyServer
-compile libs.jettyServlet
-compile libs.jettyServlets
-compile libs.jettyClient
-compile(libs.reflections)
-compile(libs.mavenArtifact)
-
-testCompile project(':clients').sourceSets.test.output
-testCompile libs.easymock
-testCompile libs.junitJupiterApi
-testCompile libs.junitVintageEngine
-testCompile libs.powermockJunit4
-testCompile libs.powermockEasymock
-testCompile libs.mockitoCore
-testCompile libs.httpclient
-
-testCompile project(':clients').sourceSets.test.output
-testCompile project(':core')
-testCompile project(':core').sourceSets.test.output
-
-testRuntime libs.slf4jlog4j
+implementation project(':connect:api')

Review comment:
   Ah, I see that now. If we wanted to be strict, we could leave as an 
implementation dependency. However, I agree that it's safest to use `api` for 
the `connect-api` module.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9714) Flaky Test SslTransportLayerTest#testTLSDefaults

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291965#comment-17291965
 ] 

Matthias J. Sax commented on KAFKA-9714:


Different test method:
{quote}org.opentest4j.AssertionFailedError: Condition not met within timeout 
15000. Metric not updated failed-authentication-total expected:<1.0> but 
was:<170.0> ==> expected:  but was:  at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) 
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196)
 at 
org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155)
 at 
org.apache.kafka.common.network.SslTransportLayerTest.verifySslConfigsWithHandshakeFailure(SslTransportLayerTest.java:1327)
 at 
org.apache.kafka.common.network.SslTransportLayerTest.testInvalidEndpointIdentification(SslTransportLayerTest.java:284){quote}
STDOUT:
{quote}[2021-02-26 22:15:14,750] ERROR Modification time of key store could not 
be obtained: some.truststore.path 
(org.apache.kafka.common.security.ssl.DefaultSslEngineFactory:385) 
java.nio.file.NoSuchFileException: some.truststore.path{quote}

> Flaky Test SslTransportLayerTest#testTLSDefaults
> 
>
> Key: KAFKA-9714
> URL: https://issues.apache.org/jira/browse/KAFKA-9714
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5145/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testTLSDefaults_tlsProtocol_TLSv1_2_/]
> {quote}java.lang.AssertionError: Metric not updated 
> failed-authentication-total expected:<0.0> but was:<1.0> expected:<0.0> but 
> was:<1.0> at org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:555) at 
> org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:194)
>  at 
> org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:156)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testTLSDefaults(SslTransportLayerTest.java:571){quote}
> STDOUT
> {quote}[2020-03-12 17:03:44,617] ERROR Modification time of key store could 
> not be obtained: some.truststore.path 
> (org.apache.kafka.common.security.ssl.SslEngineBuilder:300) 
> java.nio.file.NoSuchFileException: some.truststore.path at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>  at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>  at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>  at 
> java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
>  at 
> java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
>  at 
> java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
>  at java.base/java.nio.file.Files.readAttributes(Files.java:1763) at 
> java.base/java.nio.file.Files.getLastModifiedTime(Files.java:2314) at 
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.lastModifiedMs(SslEngineBuilder.java:298)
>  at 
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.(SslEngineBuilder.java:275)
>  at 
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createTruststore(SslEngineBuilder.java:182)
>  at 
> org.apache.kafka.common.security.ssl.SslEngineBuilder.(SslEngineBuilder.java:100)
>  at 
> org.apache.kafka.common.security.ssl.SslFactory.createNewSslEngineBuilder(SslFactory.java:140)
>  at 
> org.apache.kafka.common.security.ssl.SslFactory.validateReconfiguration(SslFactory.java:114)
>  at 
> org.apache.kafka.common.network.SslChannelBuilder.validateReconfiguration(SslChannelBuilder.java:85)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.verifyInvalidReconfigure(SslTransportLayerTest.java:1123)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testServerTruststoreDynamicUpdate(SslTransportLayerTest.java:1113){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291964#comment-17291964
 ] 

Matthias J. Sax commented on KAFKA-10251:
-

Different test method:
{quote}org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
instead of the expected 200 records at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852) at 
kafka.api.TransactionsBounceTest.testWithGroupId(TransactionsBounceTest.scala:109){quote}
STDOUT is full with
{quote}[2021-02-26 22:24:12,274] ERROR Error when sending message to topic 
output-topic with key: 923, value: 923 with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since 
transaction was aborted [2021-02-26 22:24:12,274] ERROR Error when sending 
message to topic output-topic with key: 926, value: 926 with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since 
transaction was aborted
...{quote}

> Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
> -
>
> Key: KAFKA-10251
> URL: https://issues.apache.org/jira/browse/KAFKA-10251
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> h3. Stacktrace
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 200 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at 
> kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109)
>  
>  
> The logs are pretty much just this on repeat:
> {code:java}
> [2020-07-08 23:41:04,034] ERROR Error when sending message to topic 
> output-topic with key: 9955, value: 9955 with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error 
> when sending message to topic output-topic with key: 9959, value: 9959 with 
> error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8711) Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. testControlPlaneRequest

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291963#comment-17291963
 ] 

Matthias J. Sax commented on KAFKA-8711:


Different but related test
{quote}org.opentest4j.AssertionFailedError: Disconnect notification not 
received: 127.0.0.1:46181-127.0.0.1:34506-0 at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at 
kafka.network.SocketServerTest$TestableSocketServer.waitForChannelClose(SocketServerTest.scala:1934)
 at 
kafka.network.SocketServerTest.idleExpiryWithBufferedReceives(SocketServerTest.scala:1567){quote}
log4j:
{quote}[2021-02-26 20:51:36,375] ERROR Uncaught exception in thread 
'data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0': 
(org.apache.kafka.common.utils.KafkaThread:49) 
kafka.network.SocketServerTest$$anon$8

[...]

[2021-02-26 20:52:13,338] ERROR Closing socket for 
127.0.0.1:43677-127.0.0.1:54109-0 because of error (kafka.network.Processor:76) 
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key VOTE which is not enabled [2021-02-26 20:52:13,338] DEBUG Closing selector 
connection 127.0.0.1:43677-127.0.0.1:54109-0 (kafka.network.Processor:62) 
[2021-02-26 20:52:13,339] ERROR Exception while processing request from 
127.0.0.1:43677-127.0.0.1:54109-0 (kafka.network.Processor:76) 
org.apache.kafka.common.errors.InvalidRequestException: Received request api 
key VOTE which is not enabled{quote}

> Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. 
> testControlPlaneRequest
> --
>
> Key: KAFKA-8711
> URL: https://issues.apache.org/jira/browse/KAFKA-8711
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Chandrasekhar
>Priority: Critical
> Attachments: KafkaAUTFailures07242019_PASS2.txt, 
> KafkaUTFailures07242019_PASS2.GIF
>
>
> Cloned Kakfa 2.3.0 source to our git repo and compiled it using 'gradle 
> build', we see the following error consistently:
> Gradle Version 4.7
>  
> testControlPlaneRequest
> java.net.BindException: Address already in use (Bind failed)
>     at java.net.PlainSocketImpl.socketBind(Native Method)
>     at 
> java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
>     at java.net.Socket.bind(Socket.java:644)
>     at java.net.Socket.(Socket.java:433)
>     at java.net.Socket.(Socket.java:286)
>     at kafka.network.SocketServerTest.connect(SocketServerTest.scala:140)
>     at 
> kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1(SocketServerTest.scala:200)
>     at 
> kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1$adapted(SocketServerTest.scala:199)
>     at 
> kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1141)
>     at 
> kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:199)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>     at 

[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart

2021-02-26 Thread Leonid Ilyevsky (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291956#comment-17291956
 ] 

Leonid Ilyevsky commented on KAFKA-10582:
-

I have the same problem, but in my case just restarting the mirror maker does 
not help.

Here is what I have to do: stop the mirror maker, then delete the 
*mm2-offset-syncs..internal* topic on the source cluster, and 
then start the mirror maker again.

I am using the mirror maker that comes with Confluent 6.1.0, and I run 3 
instances on the same hosts as the target cluster.

> Mirror Maker 2 not replicating new topics until restart
> ---
>
> Key: KAFKA-10582
> URL: https://issues.apache.org/jira/browse/KAFKA-10582
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.1
> Environment: RHEL 7 Linux.
>Reporter: Robert Martin
>Priority: Minor
>
> We are using Mirror Maker 2 from the 2.5.1 release for replication on some 
> clusters.  Replication is working as expected for existing topics.  When we 
> create a new topic, however, Mirror Maker 2 creates the replicated topic as 
> expected but never starts replicating it.  If we restart Mirror Maker 2 
> within 2-3 minutes the topic starts replicating as expected.  From 
> documentation we haveve seen it appears this should start replicating without 
> a restart based on the settings we have.
> *Example:*
> Create topic "mytesttopic" on source cluster
> MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue
> MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic"
> Restart MirrorMaker 2 and now replication works for "mytesttopic" -> 
> "source.mytesttopic"
> *Example config:*
> name = source->target
> group.id = source-to-target
> clusters = source, target
> source.bootstrap.servers = sourcehosts:9092
> target.bootstrap.servers = targethosts:9092
> source->target.enabled = true
> source->target.topics = .*
> target->source = false
> target->source.topics = .*
> replication.factor=3
> checkpoints.topic.replication.factor=3
> heartbeats.topic.replication.factor=3
> offset-syncs.topic.replication.factor=3
> offset.storage.replication.factor=3
> status.storage.replication.factor=3
> config.storage.replication.factor=3
> tasks.max = 16
> refresh.topics.enabled = true
> sync.topic.configs.enabled = true
> refresh.topics.interval.seconds = 300
> refresh.groups.interval.seconds = 300
> readahead.queue.capacity = 100
> emit.checkpoints.enabled = true
> emit.checkpoints.interval.seconds = 5



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

2021-02-26 Thread GitBox


hachikuji opened a new pull request #10223:
URL: https://github.com/apache/kafka/pull/10223


   We now accept topicIds in the `DescribeTopic` request. If the client 
principal does not have `Describe` permission, then we return 
`UNKNOWN_TOPIC_ID` regardless whether the topic exists or not. However, if the 
topic does exist, then we also set its name in the response, which gives the 
user a way to infer existence. This is probably not a major issue since the 
client would still need to find the topicId first, but it is an easy hole to 
plug as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583965997



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+  case (name, idOrError) => if (idOrError.isError) {
+appendResponse(name, ZERO_UUID, idOrError.error())
+  } else {
+maybeAppendToIdToName(idOrError.result(), name)
+  }
+}
+controller.findTopicNames(topicIdsToResolve).get().asScala.foreach {
+  case (id, nameOrError) => if (nameOrError.isError) {
+appendResponse(null, id, nameOrError.error())
+  } else {
+maybeAppendToIdToName(id, nameOrError.result())
+  }
+}
+
+if (!hasClusterAuth) {
+  val authorizedDescribeTopics = 
getDescribableTopics(idToName.values().asScala)
+  val authorizedDeleteTopics = 
getDeletableTopics(idToName.values().asScala)
+  val iterator = idToName.entrySet().iterator()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+val topicName = 

[GitHub] [kafka] ijuma commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


ijuma commented on pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#issuecomment-786935020


   Tests passed, merged to trunk and 2.8.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

2021-02-26 Thread GitBox


rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -259,6 +267,16 @@ public void execute() {
 }
 }
 
+private void closeProducer(Duration duration) {
+if (producer != null) {
+try {
+producer.close(duration);
+} catch (Throwable t) {

Review comment:
   I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for 
shutting down the worker's executor. Again, it's not an issue in runtime, but a 
*potential* issue in our tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

2021-02-26 Thread GitBox


rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -259,6 +267,16 @@ public void execute() {
 }
 }
 
+private void closeProducer(Duration duration) {
+if (producer != null) {
+try {
+producer.close(duration);
+} catch (Throwable t) {

Review comment:
   I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for 
shutting down the worker's executor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12380) Executor in Connect's Worker is not shut down when the worker is

2021-02-26 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-12380:
-

 Summary: Executor in Connect's Worker is not shut down when the 
worker is
 Key: KAFKA-12380
 URL: https://issues.apache.org/jira/browse/KAFKA-12380
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Randall Hauch


The `Worker` class has an [`executor` 
field|https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L100]
 that the public constructor initializes with a new cached thread pool 
([https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127|https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127].]).

When the worker is stopped, it does not shutdown this executor. This is 
normally okay in the Connect runtime and MirrorMaker 2 runtimes, because the 
worker is stopped only when the JVM is stopped (via the shutdown hook in the 
herders).

However, we instantiate and stop the herder many times in our integration 
tests, and this means we're not necessarily shutting down the herder's 
executor. Normally this won't hurt, as long as all of the runnables that the 
executor threads run actually do terminate. But it's possible those threads 
*might* not terminate in all tests. TBH, I don't know that such cases actually 
exist.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma merged pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


ijuma merged pull request #8812:
URL: https://github.com/apache/kafka/pull/8812


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583962844



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().asScala.foreach {
+  case (name, idOrError) => if (idOrError.isError) {
+appendResponse(name, ZERO_UUID, idOrError.error())

Review comment:
   It's certainly awkward that for topic IDs, we need to check existence 
first (since otherwise we have nothing to give to the authorizer) but for topic 
names, we check authorization first. But you're right, this is a leak. I'll fix 
it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583940888



##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+new NotControllerException("This is not the correct controller for 
this cluster.");
+
+public static class Builder {
+private final Map initialTopics = new HashMap<>();
+
+public Builder newInitialTopic(String name, Uuid id) {
+initialTopics.put(name, new MockTopic(name, id));
+return this;
+}
+
+public MockController build() {
+return new MockController(initialTopics.values());
+}
+}
+
+private volatile boolean active = true;
+
+private MockController(Collection initialTopics) {
+for (MockTopic topic : initialTopics) {
+topics.put(topic.id, topic);
+topicNameToId.put(topic.name, topic.id);
+}
+}
+
+@Override
+public CompletableFuture 
alterIsr(AlterIsrRequestData request) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public CompletableFuture 
createTopics(CreateTopicsRequestData request) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public CompletableFuture unregisterBroker(int brokerId) {
+throw new UnsupportedOperationException();
+}
+
+static class MockTopic {
+private final String name;
+private final Uuid id;
+
+MockTopic(String name, Uuid id) {
+this.name = name;
+this.id = id;
+}
+}
+
+private final Map topicNameToId = new HashMap<>();
+
+private final Map topics = new HashMap<>();
+
+@Override
+synchronized public CompletableFuture>>
+findTopicIds(Collection topicNames) {
+Map> results = new HashMap<>();
+for (String topicName : topicNames) {
+if (!topicNameToId.containsKey(topicName)) {
+System.out.println("WATERMELON: findTopicIds failed to find " 
+ topicName);

Review comment:
   removed :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583940677



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
   It is kind of frustrating that there is this much complexity in the 
"apis" class.  At least there is a good unit test for it now, though.  I hope 
that most other APIs won't be this complex.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10222: MINOR: disable test_produce_bench_transactions for Raft metadata quorum

2021-02-26 Thread GitBox


cmccabe merged pull request #10222:
URL: https://github.com/apache/kafka/pull/10222


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-26 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483


   The documentation is auto-generated from the transforms code, via 
[TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java).
 Since this is an existing SMT, there are no changes that that 
`TransformationDoc` class, but we still have to change the `Cast` class 
[here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L57-L61)
 and 
[here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L83-L85)
 to mention that byte arrays can be cast to strings.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-26 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483


   The documentation is auto-generated from the transforms code, via 
[TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java).
 Since this is an existing SMT, there are no changes required for documentation 
(other than changing the overview and config docs in `Cast.java`. Those changes 
should be made 
[here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L57-L61)
 and 
[here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L83-L85).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-26 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483


   The documentation is auto-generated from the transforms code, via 
[TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java).
 Since this is an existing SMT, there are no changes required for documentation 
(other than changing the overview and config docs in `Cast.java`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-26 Thread GitBox


rhauch edited a comment on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483


   The documentation is auto-generated from the transforms code, via 
[TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string

2021-02-26 Thread GitBox


rhauch commented on pull request #9950:
URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483


   The documentation is auto-generated from the code, via 
[TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583886777



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
   There seems to be enough complexity in the handling here that it might 
be worth pulling this logic into a separate class. Not required for this PR, 
but it would be nice to come up with a nicer pattern so that we don't end up 
with a giant class like `KafkaApis`.

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic 

[GitHub] [kafka] rondagostino opened a new pull request #10222: MINOR: disable test_produce_bench_transactions for Raft metadata quorum

2021-02-26 Thread GitBox


rondagostino opened a new pull request #10222:
URL: https://github.com/apache/kafka/pull/10222


   Transactions are not supported in the KIP-500 early access release.  This 
patch disables a system test for Raft metadata quorums that uses transactions 
and that was still enabled after https://github.com/apache/kafka/pull/10194. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

2021-02-26 Thread GitBox


dhruvilshah3 commented on pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#issuecomment-786895408


   Thanks for the review @rajinisivaram. I ran 
`MirrorConnectorsIntegrationSSLTest` a few times locally and it passed. It 
passed in the latest jenkins run as well. Couple of failures in the latest run 
seem unrelated to the changes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583907925



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
 preallocate = config.preallocate))
 }
 
-recoveryPoint = activeSegment.readNextOffset

Review comment:
   We discussed this offline and we decided to stick with the fix in this 
PR for now and to file a separate JIRA to consider flushing unflushed segments 
during recovery. That would provide stronger guarantees after a restart.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583903614



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {

Review comment:
   Good catch.  Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583900849



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {

Review comment:
   Unfortunately, that will not work since we have to shuffle it at the end





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown

2021-02-26 Thread GitBox


ableegoldman commented on pull request #10213:
URL: https://github.com/apache/kafka/pull/10213#issuecomment-786873825


   Merged to trunk and cherrypicked to 2.8 cc @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown

2021-02-26 Thread GitBox


ableegoldman merged pull request #10213:
URL: https://github.com/apache/kafka/pull/10213


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown

2021-02-26 Thread GitBox


ableegoldman commented on pull request #10213:
URL: https://github.com/apache/kafka/pull/10213#issuecomment-786871581


   One unrelated test failure: 
`kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch`
   
   Going to merge this now



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12235) ZkAdminManager.describeConfigs returns no config when 2+ configuration keys are specified

2021-02-26 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12235:
-
Fix Version/s: 2.7.1

> ZkAdminManager.describeConfigs returns no config when 2+ configuration keys 
> are specified
> -
>
> Key: KAFKA-12235
> URL: https://issues.apache.org/jira/browse/KAFKA-12235
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.7.0
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Critical
>  Labels: regression
> Fix For: 2.8.0, 2.7.1
>
>
> When {{ZkAdminManager.describeConfigs}} receives {{DescribeConfigsResource}} 
> with 2 or more {{configurationKeys}} specified, it returns an empty 
> configuration.
> Here's a test for {{ZkAdminManagerTest}} that reproduces this issue:
>   
> {code:scala}
> @Test
> def testDescribeConfigsWithConfigurationKeys(): Unit = {
>   EasyMock.expect(zkClient.getEntityConfigs(ConfigType.Topic, 
> topic)).andReturn(TestUtils.createBrokerConfig(brokerId, "zk"))
>   EasyMock.expect(metadataCache.contains(topic)).andReturn(true)
>   EasyMock.replay(zkClient, metadataCache)
>   val resources = List(new 
> DescribeConfigsRequestData.DescribeConfigsResource()
> .setResourceName(topic)
> .setResourceType(ConfigResource.Type.TOPIC.id)
> .setConfigurationKeys(List("retention.ms", "retention.bytes", 
> "segment.bytes").asJava)
>   )
>   val adminManager = createAdminManager()
>   val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = 
> adminManager.describeConfigs(resources, true, true)
>   assertEquals(Errors.NONE.code, results.head.errorCode())
>   val resultConfigKeys = results.head.configs().asScala.map(r => 
> r.name()).toSet
>   assertEquals(Set("retention.ms", "retention.bytes", "segment.bytes"), 
> resultConfigKeys)
> }
> {code}
> Works fine with one configuration key, though.
> The patch is following shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #9990:
URL: https://github.com/apache/kafka/pull/9990#discussion_r583245201



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
 
   def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => 
DescribeConfigsResponseData.DescribeConfigsResourceResult): 
DescribeConfigsResponseData.DescribeConfigsResult = {
-val filteredConfigPairs = if (resource.configurationKeys == null)
+val filteredConfigPairs = if (resource.configurationKeys == null || 
resource.configurationKeys.isEmpty)

Review comment:
   As you said earlier, the code in 2.7 is broken.  So we should't be going 
based off of that. The correct code in 2.6 does not special-case the empty 
string.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2021-02-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291899#comment-17291899
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9880:
---

Since we turned off bulk loading in 2.6.0 and removed the offending 
compactRange call, I think we can go ahead and close this ticket

> Error while range compacting during bulk loading of FIFO compacted RocksDB 
> Store
> 
>
> Key: KAFKA-9880
> URL: https://issues.apache.org/jira/browse/KAFKA-9880
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Nicolas Carlot
>Priority: Major
>
>  
> When restoring a non empty RocksDB state store, if it is customized to use 
> FIFOCompaction, the following exception is thrown:
>  
> {code:java}
> //
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
> compacting during restoring  store merge_store
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
> at org.rocksdb.RocksDB.compactRange(Native Method) 
> ~[kafka-stream-router.jar:?]
> at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
> ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
>  ~[kafka-stream-router.jar:?]
> ... 11 more
> {code}
>  
>  
> Compaction is configured through an implementation of RocksDBConfigSetter. 
> The exception si gone as soon as I remove:
> {code:java}
> // 
> CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> fifoOptions.setMaxTableFilesSize(maxSize);
> fifoOptions.setAllowCompaction(true);
> options.setCompactionOptionsFIFO(fifoOptions);
> options.setCompactionStyle(CompactionStyle.FIFO);
> {code}
>  
>  
> Bulk loading works fine when the store is non-existent / empty. This occurs 
> only when there are a minimum amount of data in it. I guess it happens when 
> the amount SST layers is increased.
> I'm currently using a forked version of Kafka 2.4.1 customizing the 
> RocksDBStore class with this modification as a work around:
>  
> {code:java}
> //
> public void toggleDbForBulkLoading() {
>   try {
> db.compactRange(columnFamily, true, 1, 0);
>   } catch (final RocksDBException e) {
> try {
>   if (columnFamily.getDescriptor().getOptions().compactionStyle() != 
> CompactionStyle.FIFO) {
> throw new ProcessorStateException("Error while range compacting 
> during restoring  store " + name, e);
>   }
>   else {
> log.warn("Compaction of store " + name + " for bulk loading 
> failed. Will continue without compacted store, which will be slower.", e);
>   }
> } catch (RocksDBException e1) {
>   throw new ProcessorStateException("Error while range compacting 
> during restoring  store " + name, e);
> }
>   }

[jira] [Resolved] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

2021-02-26 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-9880.
---
Fix Version/s: 2.6.0
   Resolution: Fixed

> Error while range compacting during bulk loading of FIFO compacted RocksDB 
> Store
> 
>
> Key: KAFKA-9880
> URL: https://issues.apache.org/jira/browse/KAFKA-9880
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Nicolas Carlot
>Priority: Major
> Fix For: 2.6.0
>
>
>  
> When restoring a non empty RocksDB state store, if it is customized to use 
> FIFOCompaction, the following exception is thrown:
>  
> {code:java}
> //
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range 
> compacting during restoring  store merge_store
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
> at org.rocksdb.RocksDB.compactRange(Native Method) 
> ~[kafka-stream-router.jar:?]
> at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) 
> ~[kafka-stream-router.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
>  ~[kafka-stream-router.jar:?]
> ... 11 more
> {code}
>  
>  
> Compaction is configured through an implementation of RocksDBConfigSetter. 
> The exception si gone as soon as I remove:
> {code:java}
> // 
> CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> fifoOptions.setMaxTableFilesSize(maxSize);
> fifoOptions.setAllowCompaction(true);
> options.setCompactionOptionsFIFO(fifoOptions);
> options.setCompactionStyle(CompactionStyle.FIFO);
> {code}
>  
>  
> Bulk loading works fine when the store is non-existent / empty. This occurs 
> only when there are a minimum amount of data in it. I guess it happens when 
> the amount SST layers is increased.
> I'm currently using a forked version of Kafka 2.4.1 customizing the 
> RocksDBStore class with this modification as a work around:
>  
> {code:java}
> //
> public void toggleDbForBulkLoading() {
>   try {
> db.compactRange(columnFamily, true, 1, 0);
>   } catch (final RocksDBException e) {
> try {
>   if (columnFamily.getDescriptor().getOptions().compactionStyle() != 
> CompactionStyle.FIFO) {
> throw new ProcessorStateException("Error while range compacting 
> during restoring  store " + name, e);
>   }
>   else {
> log.warn("Compaction of store " + name + " for bulk loading 
> failed. Will continue without compacted store, which will be slower.", e);
>   }
> } catch (RocksDBException e1) {
>   throw new ProcessorStateException("Error while range compacting 
> during restoring  store " + name, e);
> }
>   }
> }
> {code}
>  
> I'm not very proud of this workaround, but it suits my use cases 

[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
 preallocate = config.preallocate))
 }
 
-recoveryPoint = activeSegment.readNextOffset

Review comment:
   I think it is a gap that there is no minimum replication factor before a 
write can get exposed. Any writes that end up seeing the 
`NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are 
unacknowledged writes, and the producer is expected to retry, but the consumer 
can still read them once the ISR shrinks and we would still view it as "data 
loss" if the broker failed before they could be flushed to disk. With the 
"strict min isr" proposal, the leader is not allowed to shrink the ISR lower 
than some replication factor, which helps to plug this hole.
   
   Going back to @purplefox's suggestion, it does seem like a good idea to 
flush segments beyond the recovery point during recovery. It kind of serves to 
constrain the initial state of the system which makes it easier to reason about 
(e.g. you only need to worry about the loss of unflushed data from the last 
restart). Some of the flush weaknesses probably still exist though regardless 
of this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r583877084



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   It's a reasonable suggestion. Let me give it a try.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys

2021-02-26 Thread GitBox


cmccabe merged pull request #9990:
URL: https://github.com/apache/kafka/pull/9990


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10212: MINOR: Add cluster-metadata-decoder to DumpLogSegments

2021-02-26 Thread GitBox


cmccabe merged pull request #10212:
URL: https://github.com/apache/kafka/pull/10212


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`

2021-02-26 Thread GitBox


hachikuji merged pull request #9958:
URL: https://github.com/apache/kafka/pull/9958


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #10214: MINOR: fix message and reduce log level of PartitionGroup enforced processing

2021-02-26 Thread GitBox


ableegoldman closed pull request #10214:
URL: https://github.com/apache/kafka/pull/10214


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10214: MINOR: fix message and reduce log level of PartitionGroup enforced processing

2021-02-26 Thread GitBox


ableegoldman commented on pull request #10214:
URL: https://github.com/apache/kafka/pull/10214#issuecomment-786832334


   Closing since this issue should be addressed on the side in 
https://github.com/apache/kafka/pull/10137



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10212: MINOR: Add cluster-metadata-decoder to DumpLogSegments

2021-02-26 Thread GitBox


cmccabe commented on a change in pull request #10212:
URL: https://github.com/apache/kafka/pull/10212#discussion_r583849492



##
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##
@@ -242,7 +246,8 @@ object DumpLogSegments {
   nonConsecutivePairsForLogFilesMap: mutable.Map[String, 
List[(Long, Long)]],
   isDeepIteration: Boolean,
   maxMessageSize: Int,
-  parser: MessageParser[_, _]): Unit = {
+  parser: MessageParser[_, _],
+  skipBatchMetadata: Boolean): Unit = {

Review comment:
   let me rename this to `--skip-record-metadata` to make it clearer. I 
think per-batch metadata is still useful (in particular, offset)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API

2021-02-26 Thread GitBox


ableegoldman commented on a change in pull request #10137:
URL: https://github.com/apache/kafka/pull/10137#discussion_r583848994



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) {
 return false;
 } else {
 enforcedProcessingSensor.record(1.0d, wallClockTime);
-logger.info("Continuing to process although some partition 
timestamps were not buffered locally." +
+logger.trace("Continuing to process although some partitions are 
empty on the broker." +

Review comment:
   Maybe we could leave this detailed logging at TRACE, and just print a 
single message at `warn` the first time this enforced processing occurs?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
 preallocate = config.preallocate))
 }
 
-recoveryPoint = activeSegment.readNextOffset

Review comment:
   I think it is a gap that there is no minimum replication factor before a 
write can get exposed. Any writes that end up seeing the 
`NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are 
unacknowledged writes, and the producer is expected to retry, but the consumer 
can still read them once the ISR shrinks and we would still view it as "data 
loss" if the broker failed before they could be flushed to disk. With the 
"strict min isr" proposal, the leader is not allowed to shrink the ISR lower 
than some replication factor, which helps to plug this hole.
   
   Going back to @purplefox's suggestion, it does seem like a good idea to 
flush segments beyond the recovery point during recovery. It kind of serves to 
constrain the initial state of the system which makes it easier to reason about 
(e.g. you only need to worry about the loss of unflushed data from the previous 
restart). Some of the flush weaknesses probably still exist though regardless 
of this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-02-26 Thread GitBox


ijuma commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r583834500



##
File path: build.gradle
##
@@ -2026,52 +2043,53 @@ project(':connect:runtime') {
   archivesBaseName = "connect-runtime"
 
   dependencies {
-
-compile project(':connect:api')
-compile project(':clients')
-compile project(':tools')
-compile project(':connect:json')
-compile project(':connect:transforms')
-
-compile libs.slf4jApi
-compile libs.jacksonJaxrsJsonProvider
-compile libs.jerseyContainerServlet
-compile libs.jerseyHk2
-compile libs.jaxbApi // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.activation // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.jettyServer
-compile libs.jettyServlet
-compile libs.jettyServlets
-compile libs.jettyClient
-compile(libs.reflections)
-compile(libs.mavenArtifact)
-
-testCompile project(':clients').sourceSets.test.output
-testCompile libs.easymock
-testCompile libs.junitJupiterApi
-testCompile libs.junitVintageEngine
-testCompile libs.powermockJunit4
-testCompile libs.powermockEasymock
-testCompile libs.mockitoCore
-testCompile libs.httpclient
-
-testCompile project(':clients').sourceSets.test.output
-testCompile project(':core')
-testCompile project(':core').sourceSets.test.output
-
-testRuntime libs.slf4jlog4j
+implementation project(':connect:api')

Review comment:
   The difference is that `connect-runtime` doesn't expose `connect-api` 
and others transitively anymore. If `connect-runtime` is used for tests where 
people _only_ depend on `connect-runtime` (this is an anti-pattern, but easy to 
use), things may break for them. That is the reason why I used `api` for the 
`clients` dependency in `core`, so I'll do the same for `connect-runtime`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs

2021-02-26 Thread GitBox


junrao commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r583834117



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
 preallocate = config.preallocate))
 }
 
-recoveryPoint = activeSegment.readNextOffset

Review comment:
   For the case that Tim mentioned, if we defer advancing the recovery 
point, at step 5, the broker will be forced to do log recovery for all 
unflushed data. If the data is corrupted on disk, it will be detected during 
recovery. 
   
   For the other case that Ismael mentioned, it is true that data can be lost 
in that case, but then this is the case where all replicas have failed. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs

2021-02-26 Thread GitBox


dhruvilshah3 commented on a change in pull request #10217:
URL: https://github.com/apache/kafka/pull/10217#discussion_r583832959



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -306,40 +307,84 @@ private void createOffsetSyncsTopic() {
 
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), 
config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
 }
 
-// visible for testing
-void computeAndCreateTopicPartitions()
-throws InterruptedException, ExecutionException {
-Map partitionCounts = knownSourceTopicPartitions.stream()
-.collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
-.collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), 
Entry::getValue));
-Set knownTargetTopics = toTopics(knownTargetTopicPartitions);
-List newTopics = partitionCounts.entrySet().stream()
-.filter(x -> !knownTargetTopics.contains(x.getKey()))
-.map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), 
(short) replicationFactor))
-.collect(Collectors.toList());
-Map newPartitions = 
partitionCounts.entrySet().stream()
-.filter(x -> knownTargetTopics.contains(x.getKey()))
-.collect(Collectors.toMap(Entry::getKey, x -> 
NewPartitions.increaseTo(x.getValue().intValue(;
-createTopicPartitions(partitionCounts, newTopics, newPartitions);
+void computeAndCreateTopicPartitions() throws ExecutionException, 
InterruptedException {
+// get source and target topics with respective partition counts
+Map sourceTopicToPartitionCounts = 
knownSourceTopicPartitions.stream()
+.collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
+.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+Map targetTopicToPartitionCounts = 
knownTargetTopicPartitions.stream()
+.collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
+.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+Set knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+Set knownTargetTopics = targetTopicToPartitionCounts.keySet();
+Map sourceToRemoteTopics = knownSourceTopics.stream()
+.collect(Collectors.toMap(Function.identity(), sourceTopic -> 
formatRemoteTopic(sourceTopic)));
+
+// compute existing and new source topics
+Map> partitionedSourceTopics = 
knownSourceTopics.stream()
+.collect(Collectors.partitioningBy(sourceTopic -> 
knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+Collectors.toSet()));
+Set existingSourceTopics = partitionedSourceTopics.get(true);
+Set newSourceTopics = partitionedSourceTopics.get(false);
+
+// create new topics
+if (!newSourceTopics.isEmpty())
+createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+// compute topics with new partitions
+Map sourceTopicsWithNewPartitions = 
existingSourceTopics.stream()
+.filter(sourceTopic -> {
+String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+return sourceTopicToPartitionCounts.get(sourceTopic) > 
targetTopicToPartitionCounts.get(targetTopic);
+})
+.collect(Collectors.toMap(Function.identity(), 
sourceTopicToPartitionCounts::get));
+
+// create new partitions
+if (!sourceTopicsWithNewPartitions.isEmpty()) {
+Map newTargetPartitions = 
sourceTopicsWithNewPartitions.entrySet().stream()
+.collect(Collectors.toMap(sourceTopicAndPartitionCount -> 
sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+sourceTopicAndPartitionCount -> 
NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue(;
+createNewPartitions(newTargetPartitions);
+}
+}
+
+private void createNewTopics(Set newSourceTopics, Map sourceTopicToPartitionCounts)
+throws ExecutionException, InterruptedException {
+Map sourceTopicToConfig = 
describeTopicConfigs(newSourceTopics);
+List newTopics = newSourceTopics.stream()
+.map(sourceTopic -> {
+String remoteTopic = formatRemoteTopic(sourceTopic);
+int partitionCount = 
sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+Map configs = 
configToMap(sourceTopicToConfig.get(sourceTopic));
+return new NewTopic(remoteTopic, partitionCount, (short) 
replicationFactor)
+.configs(configs);
+})
+

[GitHub] [kafka] mimaison opened a new pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…

2021-02-26 Thread GitBox


mimaison opened a new pull request #10221:
URL: https://github.com/apache/kafka/pull/10221


   … with MirrorMaker2
   
   This commit implements KIP-716. It introduces a new setting 
`offset-syncs.topic.location` that allows specifying where the offset-syncs 
topic is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2

2021-02-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-12379:
---
Description: 
Ticket for KIP-716
https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2

  was:
Ticket for KIP-716
https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2


> KIP-716: Allow configuring the location of the offsetsync topic with 
> MirrorMaker2
> -
>
> Key: KAFKA-12379
> URL: https://issues.apache.org/jira/browse/KAFKA-12379
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> Ticket for KIP-716
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2

2021-02-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-12379:
--

Assignee: Mickael Maison

> KIP-716: Allow configuring the location of the offsetsync topic with 
> MirrorMaker2
> -
>
> Key: KAFKA-12379
> URL: https://issues.apache.org/jira/browse/KAFKA-12379
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> Ticket for KIP-716
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12362) Determine if a Task is idling

2021-02-26 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-12362:
---
Description: determine if a task is idling given the task Id.  (was: 
determine if a task is idling given the task Id.

 

https://github.com/apache/kafka/pull/10180)

> Determine if a Task is idling
> -
>
> Key: KAFKA-12362
> URL: https://issues.apache.org/jira/browse/KAFKA-12362
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> determine if a task is idling given the task Id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2

2021-02-26 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-12379:
--

 Summary: KIP-716: Allow configuring the location of the offsetsync 
topic with MirrorMaker2
 Key: KAFKA-12379
 URL: https://issues.apache.org/jira/browse/KAFKA-12379
 Project: Kafka
  Issue Type: Improvement
Reporter: Mickael Maison


Ticket for KIP-716
https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-12362) Determine if a Task is idling

2021-02-26 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-12362.
--

> Determine if a Task is idling
> -
>
> Key: KAFKA-12362
> URL: https://issues.apache.org/jira/browse/KAFKA-12362
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> determine if a task is idling given the task Id.
>  
> https://github.com/apache/kafka/pull/10180



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12362) Determine if a Task is idling

2021-02-26 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson resolved KAFKA-12362.

Resolution: Abandoned

> Determine if a Task is idling
> -
>
> Key: KAFKA-12362
> URL: https://issues.apache.org/jira/browse/KAFKA-12362
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Major
> Fix For: 3.0.0
>
>
> determine if a task is idling given the task Id.
>  
> https://github.com/apache/kafka/pull/10180



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic

2021-02-26 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291843#comment-17291843
 ] 

Jose Armando Garcia Sancio commented on KAFKA-12376:


Yes [~rounak] . I am working on it at the moment. I'll update the Jira soon 
with more information.

> Use scheduleAtomicAppend for records that need to be atomic
> ---
>
> Key: KAFKA-12376
> URL: https://issues.apache.org/jira/browse/KAFKA-12376
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-02-26 Thread GitBox


wcarlson5 commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r583808502



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) 
{
 closeToError();
 }
 final StreamThread deadThread = (StreamThread) Thread.currentThread();
-threads.remove(deadThread);

Review comment:
   `deadThread.shutdown();` I was referring to this below. But if we don't 
need to keep the same for any reason I am fine either way 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291826#comment-17291826
 ] 

Matthias J. Sax commented on KAFKA-12319:
-

Failed again
{quote}java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.5751503006012 (600 connections / 15.968 sec) ==> expected: <30.0> but was: 
<37.5751503006012> at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:411)
 at scala.collection.immutable.List.foreach(List.scala:333) at 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:411)

[...]

Caused by: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but 
got 37.5751503006012 (600 connections / 15.968 sec) ==> expected: <30.0> but 
was: <37.5751503006012> at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86) at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1003) at 
kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:903)
 at 
kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$2(ConnectionQuotasTest.scala:409){quote}

> Flaky test 
> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
> -
>
> Key: KAFKA-12319
> URL: https://issues.apache.org/jira/browse/KAFKA-12319
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I've seen this test fail a few times locally. But recently I saw it fail on a 
> PR build on Jenkins.
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]
> h3. Error Message
> java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
> Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 
> sec) ==> expected: <30.0> but was: <37.436825357209706>
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12378) If a broker is down for more then `delete.retention.ms` deleted records in a compacted topic can come back.

2021-02-26 Thread Shane (Jira)
Shane created KAFKA-12378:
-

 Summary: If a broker is down for more then `delete.retention.ms` 
deleted records in a compacted topic can come back.
 Key: KAFKA-12378
 URL: https://issues.apache.org/jira/browse/KAFKA-12378
 Project: Kafka
  Issue Type: Bug
Reporter: Shane


If the leader of a compacted topic goes offline, or has replication lag longer 
than the `delete.retention.ms` of a topic, records that are tombstoned can come 
back once the leader catches up then becomes the leader.

 

Example of this happening:
 Topic config:
    name: compacted-topic
    settings: delete.retention.ms=0
    Leader: broker 1
    ISR: broker 1, broker 2, broker 3

 

Producer 1 writes a record `1:foo` 
 Producer 1 writes a record `2:bar` 
 broker 1 goes offline 
 broker 2 takes over leadership
 Producer 1 writes a tombstone `1:NULL`
 broker 2 compacts the topic, which leaves the topic with `1:NULL` and `2:bar` 
in it.
 broker 2 removes the tombstone leaving just `2:bar` in the topic.
 broker 1 comes back online, catches up with replication, takes back leadership
 broker 1 now has `1:foo` and `2:bar` as the data, since the tombstone is 
deleted

At this point the topic is in a strange state, as the brokers have conflicting 
data.

 

 

Suggestion:
 I believe this to be quite a hard problem to solve, so I'm not going to 
suggest any large changes to the codebase, but I think a warning in the docs 
about `delete.retention.ms` is warranted.
 I think adding something that calls out that brokers are also consumers here: 
[https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_delete.retention.ms]
 would be helpful, but even further documentation about what happens when a 
broker is offline for more than `delete.retention.ms` would be nice to see. If 
it helps I'm happy to take a first draft at updating the docs as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-02-26 Thread GitBox


cadonna commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r583801473



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) 
{
 closeToError();
 }
 final StreamThread deadThread = (StreamThread) Thread.currentThread();
-threads.remove(deadThread);

Review comment:
   We cannot wait here until the dead thread is shutdown because the 
shutdown happens after `replaceStreamThread()` throws the exception. So we 
would wait forever. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291820#comment-17291820
 ] 

Matthias J. Sax commented on KAFKA-12284:
-

Failde again: 
{quote}java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicExistsException: Topic 
'primary.test-topic-2' already exists.
{quote}

> Flaky Test 
> MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
> -
>
> Key: KAFKA-12284
> URL: https://issues.apache.org/jira/browse/KAFKA-12284
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470]
> {quote} {{java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}}
> [...]
>  
> {{Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364)
>   ... 92 more
> Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.}}
> {quote}
> STDOUT
> {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats:  (org.apache.kafka.connect.runtime.WorkerSourceTask:354)
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282)
>   at java.lang.Thread.run(Thread.java:748)}}{quote}
> {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state 
> info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"}
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}}
> {{}}
> {quote}



--
This 

[jira] [Commented] (KAFKA-8003) Flaky Test TransactionsTest #testFencingOnTransactionExpiration

2021-02-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291818#comment-17291818
 ] 

Matthias J. Sax commented on KAFKA-8003:


Different test method 
{{kafka.api.TransactionsTest.testSendOffsetsWithGroupId()}}
{quote}org.opentest4j.AssertionFailedError: Topic [topic1] metadata not 
propagated after 6 ms at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at 
kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:852) at 
kafka.utils.TestUtils$.createTopic(TestUtils.scala:367) at 
kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:133){quote}
STDOUT:
{quote}[2021-02-26 11:17:26,311] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/tmp/kafka8173812164698481311.tmp'. Will continue connection to Zookeeper 
server without SASL authentication, if Zookeeper server allows it. 
(org.apache.zookeeper.ClientCnxn:1094) [2021-02-26 11:17:26,311] ERROR 
[ZooKeeperClient Kafka server] Auth failed. 
(kafka.zookeeper.ZooKeeperClient:74) [2021-02-26 11:17:26,647] ERROR 
[RequestSendThread controllerId=0] Controller 0 fails to send a request to 
broker localhost:41953 (id: 2 rack: null) 
(kafka.controller.RequestSendThread:76) java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at 
kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82) at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:234) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){quote}

> Flaky Test TransactionsTest #testFencingOnTransactionExpiration
> ---
>
> Key: KAFKA-8003
> URL: https://issues.apache.org/jira/browse/KAFKA-8003
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.2.3
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/34/]
> {quote}java.lang.AssertionError: expected:<1> but was:<0> at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:645) at 
> org.junit.Assert.assertEquals(Assert.java:631) at 
> kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:510){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

2021-02-26 Thread GitBox


rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583796206



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -259,6 +267,16 @@ public void execute() {
 }
 }
 
+private void closeProducer(Duration duration) {
+if (producer != null) {
+try {
+producer.close(duration);
+} catch (Throwable t) {

Review comment:
   > Right now there aren't any code paths that lead to the worker's 
executor being shut down.
   
   Hmm, that seems to have been done a long time ago. I wonder if that was an 
oversight, or whether that was intentional since in Connect the 
`Worker::stop()` is called when the herder is stopped, which only happens (in 
Connect) when the shutdown hook is called -- at which point the JVM is 
terminating anyway. Luckily MM2 works the same way.
   
   But in our test cases that use `EmbeddedConnectCluster`, those tests are not 
cleaning up all resources of the Worker (and thus Herder) -- we might have 
threads that still keep running. Seems like we should address that in a 
different issue. I'll log something.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener

2021-02-26 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12377:
---

 Summary: Flaky Test 
SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
 Key: KAFKA-12377
 URL: https://issues.apache.org/jira/browse/KAFKA-12377
 Project: Kafka
  Issue Type: Test
  Components: core, security, unit tests
Reporter: Matthias J. Sax


{quote}org.opentest4j.AssertionFailedError: expected:  
but was:  at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
 at 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187)
 at 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210)
 at 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846)
 at 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote}
STDOUT
{quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context 
without a token 
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) 
java.io.IOException: Extensions provided in login context without a token at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
 at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
 at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)

[...]

Caused by: 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException:
 Extensions provided in login context without a token at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192)
 at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
 ... 116 more{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic

2021-02-26 Thread Rounak Datta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291784#comment-17291784
 ] 

Rounak Datta commented on KAFKA-12376:
--

{quote}records that need to be atomic
{quote}
 

Can you help understand which type of records are these? That is, how do we 
identify these records and use `scheduleAtomicAppend` for them?

> Use scheduleAtomicAppend for records that need to be atomic
> ---
>
> Key: KAFKA-12376
> URL: https://issues.apache.org/jira/browse/KAFKA-12376
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down

2021-02-26 Thread GitBox


wcarlson5 commented on a change in pull request #10215:
URL: https://github.com/apache/kafka/pull/10215#discussion_r583787814



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the
+// shutdown then we should just consider this 
thread.id to be burned
+} else {
+threads.remove(streamThread);

Review comment:
   If we purge the dead threads before we add new ones and if we remove the 
assumption that there are no dead threads in the thread list we can just not 
remove the threads in remove thread. This will make it there should be no 
concern about the cache size changing when a thread is removing itself. And 
make the risk we took about memory overflows unnecessary.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the

Review comment:
   Where do we trim this list? I don't thing we do. In the begging of 
`addStreamThread()` can we purge the dead threads? That is the only place it 
should matter

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) 
{
 closeToError();
 }
 final StreamThread deadThread = (StreamThread) Thread.currentThread();
-threads.remove(deadThread);

Review comment:
   I remember that we had the replace use the same ID for a reason. (maybe 
it had to do with rebalancing?). I don't think there should be a problem to try 
to get the same ID by waiting a bit in the replace thread

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) {
 log.warn("Thread " + streamThread.getName() + 
" did not shutdown in the allotted time");
 timeout = true;
+// Don't remove from threads until shutdown is 
complete. We will trim it from the
+// list once it reaches DEAD, and if for some 
reason it's hanging indefinitely in the
+// shutdown then we should just consider this 
thread.id to be burned
+} else {
+threads.remove(streamThread);
 }
 }
-threads.remove(streamThread);
+// Don't remove from threads until shutdown is 
complete since this will let another thread
+// reuse its thread.id. We will trim any DEAD threads 
from the list later
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());

Review comment:
   Perviously we had relied on the fact there were no dead threads in the 
list





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12375) ReplaceStreamThread creates a new consumer with the same name as the one it's replacing

2021-02-26 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291777#comment-17291777
 ] 

Bruno Cadonna commented on KAFKA-12375:
---

I agree with [~ableegoldman] that ensuring a unique thread ID solves also other 
issues like the metrics issue above. We cannot wait for the metrics to be 
removed from the metrics because that happens during the shutdown that means 
after the stream thread has been replaced. 

> ReplaceStreamThread creates a new consumer with the same name as the one it's 
> replacing
> ---
>
> Key: KAFKA-12375
> URL: https://issues.apache.org/jira/browse/KAFKA-12375
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Tomasz Nguyen
>Assignee: Tomasz Nguyen
>Priority: Blocker
> Fix For: 2.8.0
>
>
> I was debugging the kafka-streams soak cluster and noticed that replacing a 
> stream thread was causing the streams application to fail. I have managed to 
> find the following stacktrace:
> {code:java}
> javax.management.InstanceAlreadyExistsException: 
> kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer
>  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>  at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
> at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:815)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338)
>  at 
> org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896)
>  at 
> org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) 
> at 
> org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467)
>  at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
>  
> followed by:
> {code:java}
> Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508)
>  at 
> org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579)
> at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508)
>   at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
> My understanding so far is that we re-use the consumer name across thread 
> generations which can hit a few flavours of a race condition. My suggestion 
> would be to add the generation-id to the consumer name.
> This could be done by adding a thread generation id here
> https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66
> or by adding an overload here: 
> 

[GitHub] [kafka] rhauch commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0

2021-02-26 Thread GitBox


rhauch commented on a change in pull request #10203:
URL: https://github.com/apache/kafka/pull/10203#discussion_r583780438



##
File path: build.gradle
##
@@ -2026,52 +2043,53 @@ project(':connect:runtime') {
   archivesBaseName = "connect-runtime"
 
   dependencies {
-
-compile project(':connect:api')
-compile project(':clients')
-compile project(':tools')
-compile project(':connect:json')
-compile project(':connect:transforms')
-
-compile libs.slf4jApi
-compile libs.jacksonJaxrsJsonProvider
-compile libs.jerseyContainerServlet
-compile libs.jerseyHk2
-compile libs.jaxbApi // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.activation // Jersey dependency that was available in the JDK 
before Java 9
-compile libs.jettyServer
-compile libs.jettyServlet
-compile libs.jettyServlets
-compile libs.jettyClient
-compile(libs.reflections)
-compile(libs.mavenArtifact)
-
-testCompile project(':clients').sourceSets.test.output
-testCompile libs.easymock
-testCompile libs.junitJupiterApi
-testCompile libs.junitVintageEngine
-testCompile libs.powermockJunit4
-testCompile libs.powermockEasymock
-testCompile libs.mockitoCore
-testCompile libs.httpclient
-
-testCompile project(':clients').sourceSets.test.output
-testCompile project(':core')
-testCompile project(':core').sourceSets.test.output
-
-testRuntime libs.slf4jlog4j
+implementation project(':connect:api')

Review comment:
   Connector projects do require the `connect-api` module as a compile-time 
dependency and should not depend on `connect-runtime` as a compile-time 
dependency. Most (if not all) will include `connect-runtime` as a *test* 
dependency, though. 
   
   I don't really see anything that changed in the compile-time dependencies 
relative to the base commit. In particular, `connect-api` was included (on line 
2030) before and is again. I think the removal of the blank line (2029) caused 
this diff page to render a bit strange.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >