[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-10437: --- Attachment: image.png > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > In addition to implementing the KIP, search for and resolve these todos: > {color:#008dde}TODO will be fixed in KAFKA-10437{color} > Also, add unit tests in test-utils making sure we can initialize _all_ the > kinds of store with the MPC and MPC.getSSC. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-10437: --- Attachment: (was: image.png) > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > In addition to implementing the KIP, search for and resolve these todos: > {color:#008dde}TODO will be fixed in KAFKA-10437{color} > Also, add unit tests in test-utils making sure we can initialize _all_ the > kinds of store with the MPC and MPC.getSSC. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #10229: MINOR: Account for extra whitespaces in WordCountDemo
dengziming commented on pull request #10229: URL: https://github.com/apache/kafka/pull/10229#issuecomment-786993345 @mjsax ,Hello, this is inspired by #10044 which forget `WordCountProcessorDemo` and `WordCountTransformerDemo`, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10229: MINOR: Account for extra whitespaces in WordCountDemo
dengziming opened a new pull request #10229: URL: https://github.com/apache/kafka/pull/10229 *More detailed description of your change* As the title. *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8830: KAFKA-10116: GraalVM native-image prototype
ijuma commented on pull request #8830: URL: https://github.com/apache/kafka/pull/8830#issuecomment-786989056 Rebased and regenerated the configs with graalvm ce 21.0.0.2. Need to investigate the error during start-up: > Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Exit$ >at com.oracle.svm.core.classinitialization.ClassInitializationInfo.initialize(ClassInitializationInfo.java:239) >at kafka.Kafka$.main(Kafka.scala:122) >at kafka.Kafka.main(Kafka.scala) > This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
junrao commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r584019199 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); Review comment: Hmm, you mean PartitionChangeRecord? I don't see PartitionChangeRecord being generated from the topicDeletion request. ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -809,6 +824,27 @@ private QuorumController(LogContext logContext, () -> replicationControl.unregisterBroker(brokerId)); } +@Override +public CompletableFuture>> findTopicIds(Collection names) { +if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendReadEvent("findTopicIds", +() -> replicationControl.findTopicIds(lastCommittedOffset, names)); +} + +@Override +public CompletableFuture>> findTopicNames(Collection ids) { +if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendReadEvent("findTopicNames", +() -> replicationControl.findTopicNames(lastCommittedOffset, ids)); +} + +@Override +public CompletableFuture> deleteTopics(Collection ids) { +if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap()); +return appendWriteEvent("deleteTopics", +() -> replicationControl.deleteTopics(ids)); Review comment: We also need to delete the configuration associated with the topic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10228: KAFKA-10251: increase timeout for consumeing records
showuon commented on pull request #10228: URL: https://github.com/apache/kafka/pull/10228#issuecomment-786981927 @abbccdda @ableegoldman @mjsax , could you help review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10228: KAFKA-10251: increase timeout for consumeing records
showuon opened a new pull request #10228: URL: https://github.com/apache/kafka/pull/10228 We need to wait for the translation state changed to `READY` to start consuming the records. But we didn't have any way to change the transationManager state in client, so we can just wait. I've confirmed that if we try another time, we can pass the tests. My test code is like this: ```java var isFailed = false try { pollRecordsUntilTrue(consumer, pollAction, waitTimeMs = waitTimeMs, msg = s"Consumed ${records.size} records before timeout instead of the expected $numRecords records") } catch { case e: AssertionFailedError => { isFailed = true System.err.println(s"!!! Consumed ${records.size} records before timeout instead of the expected $numRecords records") } } if (isFailed) { pollRecordsUntilTrue(consumer, pollAction, waitTimeMs = 3, msg = s"Consumed ${records.size} records before timeout instead of the expected $numRecords records") // if we go to this step, it means it passed in 2nd try fail("failed at 1st try") } ``` And they failed with `failed at 1st try`, which confirmed that we can pass the tests by increasing the timeout. Thanks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
Matthias J. Sax created KAFKA-12384: --- Summary: Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch Key: KAFKA-12384 URL: https://issues.apache.org/jira/browse/KAFKA-12384 Project: Kafka Issue Type: Test Components: core, unit tests Reporter: Matthias J. Sax {quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: <(-1,-1)> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon commented on a change in pull request #10185: URL: https://github.com/apache/kafka/pull/10185#discussion_r584015842 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -455,27 +470,44 @@ private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect "Connector " + connector.getSimpleName() + " tasks did not start in time on cluster: " + connectCluster); } } - + +/* + * wait for the topic created on the cluster + */ +private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, String topicName) throws InterruptedException { +try (final Admin adminClient = cluster.createAdminClient()) { +waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), OFFSET_SYNC_DURATION_MS, +"Topic: " + topicName + " didn't get created in the cluster" +); +} +} + /* * delete all topics of the input kafka cluster */ private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception { -Admin client = cluster.createAdminClient(); -client.deleteTopics(client.listTopics().names().get()); +try (final Admin adminClient = cluster.createAdminClient()) { +Set topicsToBeDeleted = adminClient.listTopics().names().get(); +log.debug("Deleting topics: {} ", topicsToBeDeleted); +adminClient.deleteTopics(topicsToBeDeleted).all().get(); +} catch (final Throwable e) { Review comment: Also remove the change in PR description This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17292005#comment-17292005 ] Luke Chen commented on KAFKA-10251: --- This flaky test also points to the same root cause. kafka.api.TransactionsBounceTest.testWithGroupId() org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead of the expected 200 records at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852) at kafka.api.TransactionsBounceTest.testWithGroupId(TransactionsBounceTest.scala:109) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working
Colin McCabe created KAFKA-12383: Summary: Get RaftClusterTest.java and other KIP-500 junit tests working Key: KAFKA-12383 URL: https://issues.apache.org/jira/browse/KAFKA-12383 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Colin McCabe Assignee: David Arthur -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon commented on pull request #10185: URL: https://github.com/apache/kafka/pull/10185#issuecomment-786978491 @mimaison , thanks for your comments. I've addressed them and updated in this commit: https://github.com/apache/kafka/pull/10185/commits/de95286d9a6463d1fb1ac4b2d5bfd45360aec3a3. Please review again. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12382) Create KIP-500 README for the 2.8 rleease
Colin McCabe created KAFKA-12382: Summary: Create KIP-500 README for the 2.8 rleease Key: KAFKA-12382 URL: https://issues.apache.org/jira/browse/KAFKA-12382 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12382) Create KIP-500 README for the 2.8 release
[ https://issues.apache.org/jira/browse/KAFKA-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12382: - Summary: Create KIP-500 README for the 2.8 release (was: Create KIP-500 README for the 2.8 rleease) > Create KIP-500 README for the 2.8 release > - > > Key: KAFKA-12382 > URL: https://issues.apache.org/jira/browse/KAFKA-12382 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon commented on a change in pull request #10185: URL: https://github.com/apache/kafka/pull/10185#discussion_r584013857 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -455,27 +470,44 @@ private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect "Connector " + connector.getSimpleName() + " tasks did not start in time on cluster: " + connectCluster); } } - + +/* + * wait for the topic created on the cluster + */ +private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, String topicName) throws InterruptedException { +try (final Admin adminClient = cluster.createAdminClient()) { +waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), OFFSET_SYNC_DURATION_MS, Review comment: You're right, I created a `TOPIC_SYNC_DURATION_MS` for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10185: KAFKA-12284: wait for mm2 auto-created the topic
showuon commented on a change in pull request #10185: URL: https://github.com/apache/kafka/pull/10185#discussion_r584013620 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -455,27 +470,44 @@ private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect "Connector " + connector.getSimpleName() + " tasks did not start in time on cluster: " + connectCluster); } } - + +/* + * wait for the topic created on the cluster + */ +private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, String topicName) throws InterruptedException { +try (final Admin adminClient = cluster.createAdminClient()) { +waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), OFFSET_SYNC_DURATION_MS, +"Topic: " + topicName + " didn't get created in the cluster" +); +} +} + /* * delete all topics of the input kafka cluster */ private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception { -Admin client = cluster.createAdminClient(); -client.deleteTopics(client.listTopics().names().get()); +try (final Admin adminClient = cluster.createAdminClient()) { +Set topicsToBeDeleted = adminClient.listTopics().names().get(); +log.debug("Deleting topics: {} ", topicsToBeDeleted); +adminClient.deleteTopics(topicsToBeDeleted).all().get(); +} catch (final Throwable e) { Review comment: Make sense to me. Updated. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10226: MINOR: fix kafka-metadata-shell.sh
cmccabe merged pull request #10226: URL: https://github.com/apache/kafka/pull/10226 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r584009213 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); +log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); +} Review comment: We haven't hooked that up yet, correct. But that logic is in `BrokerMetadataListener`. It would probably be better to have a separate PR for that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r584007497 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); Review comment: The ISRs should already have been updated by `BrokerChangeRecords` that were previously replayed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10199: KAFKA-12374: Add missing config sasl.mechanism.controller.protocol
cmccabe commented on pull request #10199: URL: https://github.com/apache/kafka/pull/10199#issuecomment-786970988 I filed https://issues.apache.org/jira/browse/KAFKA-12381 to follow up on the auto-topic-creation behavior change discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10199: KAFKA-12374: Add missing config sasl.mechanism.controller.protocol
cmccabe merged pull request #10199: URL: https://github.com/apache/kafka/pull/10199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8
[ https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291991#comment-17291991 ] Colin McCabe commented on KAFKA-12381: -- cc [~rndgstn], [~hachikuji], [~ijuma] > Incompatible change in verifiable_producer.log in 2.8 > - > > Key: KAFKA-12381 > URL: https://issues.apache.org/jira/browse/KAFKA-12381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Boyang Chen >Priority: Blocker > > In test_verifiable_producer.py , we used to see this error message in > verifiable_producer.log when a topic couldn't be created: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} > (org.apache.kafka.clients.NetworkClient) > The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it > used to pass. > Now we are instead seeing this in the log file: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} > (org.apache.kafka.clients.NetworkClient) > And of course now the test fails. > The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation > manager. > It is a simple matter to make the test pass -- I have confirmed that it > passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of > LEADER_NOT_AVAILABLE. > I think we just need to decide if this change in behavior is acceptable or > not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8
[ https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12381: - Affects Version/s: 2.8.0 > Incompatible change in verifiable_producer.log in 2.8 > - > > Key: KAFKA-12381 > URL: https://issues.apache.org/jira/browse/KAFKA-12381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Boyang Chen >Priority: Blocker > > In test_verifiable_producer.py , we used to see this error message in > verifiable_producer.log when a topic couldn't be created: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} > (org.apache.kafka.clients.NetworkClient) > The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it > used to pass. > Now we are instead seeing this in the log file: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} > (org.apache.kafka.clients.NetworkClient) > And of course now the test fails. > The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation > manager. > It is a simple matter to make the test pass -- I have confirmed that it > passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of > LEADER_NOT_AVAILABLE. > I think we just need to decide if this change in behavior is acceptable or > not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8
[ https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-12381: Assignee: Boyang Chen > Incompatible change in verifiable_producer.log in 2.8 > - > > Key: KAFKA-12381 > URL: https://issues.apache.org/jira/browse/KAFKA-12381 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Boyang Chen >Priority: Blocker > > In test_verifiable_producer.py , we used to see this error message in > verifiable_producer.log when a topic couldn't be created: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} > (org.apache.kafka.clients.NetworkClient) > The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it > used to pass. > Now we are instead seeing this in the log file: > WARN [Producer clientId=producer-1] Error while fetching metadata with > correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} > (org.apache.kafka.clients.NetworkClient) > And of course now the test fails. > The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation > manager. > It is a simple matter to make the test pass -- I have confirmed that it > passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of > LEADER_NOT_AVAILABLE. > I think we just need to decide if this change in behavior is acceptable or > not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8
Colin McCabe created KAFKA-12381: Summary: Incompatible change in verifiable_producer.log in 2.8 Key: KAFKA-12381 URL: https://issues.apache.org/jira/browse/KAFKA-12381 Project: Kafka Issue Type: Bug Reporter: Colin McCabe In test_verifiable_producer.py , we used to see this error message in verifiable_producer.log when a topic couldn't be created: WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it used to pass. Now we are instead seeing this in the log file: WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) And of course now the test fails. The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation manager. It is a simple matter to make the test pass -- I have confirmed that it passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of LEADER_NOT_AVAILABLE. I think we just need to decide if this change in behavior is acceptable or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #10227: MINOR: add a README for KIP-500
cmccabe opened a new pull request #10227: URL: https://github.com/apache/kafka/pull/10227 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #10226: MINOR: fix kafka-metadata-shell.sh
cmccabe opened a new pull request #10226: URL: https://github.com/apache/kafka/pull/10226 * Fix CLASSPATH issues in the startup script * Fix overly verbose log messages during loading * Update to use the new MetadataRecordSerde (this is needed now that we have a frame version) * Fix initialization This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10225: MINOR: fix security_test for ZK case due to error change
hachikuji commented on pull request #10225: URL: https://github.com/apache/kafka/pull/10225#issuecomment-786959937 @rondagostino Thanks for identifying the issue. Returning `INVALID_REPLICATION_FACTOR` seems like a mistake to me if we were previously returning `LEADER_NOT_AVAILABLE`. I'd suggest we fix the code. Would you mind filing a JIRA for this so that we can mark it as a 2.8 blocker? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10225: MINOR: fix security_test for ZK case due to error change
rondagostino commented on pull request #10225: URL: https://github.com/apache/kafka/pull/10225#issuecomment-786955706 @abbccdda, @cmccabe. Ported from #10199 to discuss separately. We used to see this error message in `verifiable_producer.log` when `security_protocol='PLAINTEXT', interbroker_security_protocol='SSL'`: ``` WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ``` The test does a `grep LEADER_NOT_AVAILABLE` on the log in this case, and it used to pass. Now we are instead seeing this in the log file: ``` WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient) ``` And of course now the test fails. The `INVALID_REPLICATION_FACTOR` is coming from the auto topic creation manager as I described above. It is a simple matter to make the test pass -- I have confirmed that it passes if we `grep` for `INVALID_REPLICATION_FACTOR` in the log file instead of `LEADER_NOT_AVAILABLE`. I think we just need to decide if this change in behavior is acceptable or not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10225: MINOR: fix security_test for ZK case due to error change
rondagostino opened a new pull request #10225: URL: https://github.com/apache/kafka/pull/10225 The ZooKeeper version of this system test is failing because the producer is no longer seeing `LEADER_NOT_AVAILABLE`. When the broker sees a METADATA request for the test topic after it restarts the auto topic creation manager is determining that the topic needs to be created due to the TLS hostname verification failure on the inter-broker security protocol. It also thinks there aren't enough brokers available to meet the default topic replication factor (it sees 0 available due to the TLS issue), so it returns`INVALID_REPLICATION_FACTOR` for that topic in the Metadata response. In other words, the flow has changed and the inability to produce is not manifesting as it was before, and the test is failing. This patch updates the test to check for `INVALID_REPLICATION_FACTOR` instead of `LEADER_NOT_AVAILABLE`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
junrao commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583984728 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); +log.info("Removed topic {} with ID {}.", topic.name, record.topicId()); +} Review comment: I guess we haven't hooked up the logic to trigger the deletion of the replicas of the deleted topic in the broker? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { +TopicControlInfo topic = topics.remove(record.topicId()); +if (topic == null) { +throw new RuntimeException("Can't find topic with ID " + record.topicId() + +" to remove."); +} +topicsByName.remove(topic.name); Review comment: Should we update brokersToIsrs too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10224: MINOR: Disable transactional streams system tests for Raft quorums
rondagostino opened a new pull request #10224: URL: https://github.com/apache/kafka/pull/10224 Transactions are not supported in the KIP-500 early access release. This patch disables a system test for Raft metadata quorums that uses transactions and that was still enabled after #10194. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0
rhauch commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r583981010 ## File path: build.gradle ## @@ -2026,52 +2043,53 @@ project(':connect:runtime') { archivesBaseName = "connect-runtime" dependencies { - -compile project(':connect:api') -compile project(':clients') -compile project(':tools') -compile project(':connect:json') -compile project(':connect:transforms') - -compile libs.slf4jApi -compile libs.jacksonJaxrsJsonProvider -compile libs.jerseyContainerServlet -compile libs.jerseyHk2 -compile libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 -compile libs.activation // Jersey dependency that was available in the JDK before Java 9 -compile libs.jettyServer -compile libs.jettyServlet -compile libs.jettyServlets -compile libs.jettyClient -compile(libs.reflections) -compile(libs.mavenArtifact) - -testCompile project(':clients').sourceSets.test.output -testCompile libs.easymock -testCompile libs.junitJupiterApi -testCompile libs.junitVintageEngine -testCompile libs.powermockJunit4 -testCompile libs.powermockEasymock -testCompile libs.mockitoCore -testCompile libs.httpclient - -testCompile project(':clients').sourceSets.test.output -testCompile project(':core') -testCompile project(':core').sourceSets.test.output - -testRuntime libs.slf4jlog4j +implementation project(':connect:api') Review comment: Ah, I see that now. If we wanted to be strict, we could leave as an implementation dependency. However, I agree that it's safest to use `api` for the `connect-api` module. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9714) Flaky Test SslTransportLayerTest#testTLSDefaults
[ https://issues.apache.org/jira/browse/KAFKA-9714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291965#comment-17291965 ] Matthias J. Sax commented on KAFKA-9714: Different test method: {quote}org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Metric not updated failed-authentication-total expected:<1.0> but was:<170.0> ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:196) at org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:155) at org.apache.kafka.common.network.SslTransportLayerTest.verifySslConfigsWithHandshakeFailure(SslTransportLayerTest.java:1327) at org.apache.kafka.common.network.SslTransportLayerTest.testInvalidEndpointIdentification(SslTransportLayerTest.java:284){quote} STDOUT: {quote}[2021-02-26 22:15:14,750] ERROR Modification time of key store could not be obtained: some.truststore.path (org.apache.kafka.common.security.ssl.DefaultSslEngineFactory:385) java.nio.file.NoSuchFileException: some.truststore.path{quote} > Flaky Test SslTransportLayerTest#testTLSDefaults > > > Key: KAFKA-9714 > URL: https://issues.apache.org/jira/browse/KAFKA-9714 > Project: Kafka > Issue Type: Bug > Components: core, security, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5145/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testTLSDefaults_tlsProtocol_TLSv1_2_/] > {quote}java.lang.AssertionError: Metric not updated > failed-authentication-total expected:<0.0> but was:<1.0> expected:<0.0> but > was:<1.0> at org.junit.Assert.fail(Assert.java:89) at > org.junit.Assert.failNotEquals(Assert.java:835) at > org.junit.Assert.assertEquals(Assert.java:555) at > org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:194) > at > org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:156) > at > org.apache.kafka.common.network.SslTransportLayerTest.testTLSDefaults(SslTransportLayerTest.java:571){quote} > STDOUT > {quote}[2020-03-12 17:03:44,617] ERROR Modification time of key store could > not be obtained: some.truststore.path > (org.apache.kafka.common.security.ssl.SslEngineBuilder:300) > java.nio.file.NoSuchFileException: some.truststore.path at > java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) > at > java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) > at > java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149) > at > java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.base/java.nio.file.Files.readAttributes(Files.java:1763) at > java.base/java.nio.file.Files.getLastModifiedTime(Files.java:2314) at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.lastModifiedMs(SslEngineBuilder.java:298) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.(SslEngineBuilder.java:275) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.createTruststore(SslEngineBuilder.java:182) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.(SslEngineBuilder.java:100) > at > org.apache.kafka.common.security.ssl.SslFactory.createNewSslEngineBuilder(SslFactory.java:140) > at > org.apache.kafka.common.security.ssl.SslFactory.validateReconfiguration(SslFactory.java:114) > at > org.apache.kafka.common.network.SslChannelBuilder.validateReconfiguration(SslChannelBuilder.java:85) > at > org.apache.kafka.common.network.SslTransportLayerTest.verifyInvalidReconfigure(SslTransportLayerTest.java:1123) > at > org.apache.kafka.common.network.SslTransportLayerTest.testServerTruststoreDynamicUpdate(SslTransportLayerTest.java:1113){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10251) Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata
[ https://issues.apache.org/jira/browse/KAFKA-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291964#comment-17291964 ] Matthias J. Sax commented on KAFKA-10251: - Different test method: {quote}org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead of the expected 200 records at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:852) at kafka.api.TransactionsBounceTest.testWithGroupId(TransactionsBounceTest.scala:109){quote} STDOUT is full with {quote}[2021-02-26 22:24:12,274] ERROR Error when sending message to topic output-topic with key: 923, value: 923 with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted [2021-02-26 22:24:12,274] ERROR Error when sending message to topic output-topic with key: 926, value: 926 with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted ...{quote} > Flaky Test kafka.api.TransactionsBounceTest.testWithGroupMetadata > - > > Key: KAFKA-10251 > URL: https://issues.apache.org/jira/browse/KAFKA-10251 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > h3. Stacktrace > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 200 records at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) at > org.scalatest.Assertions.fail$(Assertions.scala:1087) at > org.scalatest.Assertions$.fail(Assertions.scala:1389) at > kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:842) at > kafka.api.TransactionsBounceTest.testWithGroupMetadata(TransactionsBounceTest.scala:109) > > > The logs are pretty much just this on repeat: > {code:java} > [2020-07-08 23:41:04,034] ERROR Error when sending message to topic > output-topic with key: 9955, value: 9955 with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-07-08 23:41:04,034] ERROR Error > when sending message to topic output-topic with key: 9959, value: 9959 with > error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8711) Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. testControlPlaneRequest
[ https://issues.apache.org/jira/browse/KAFKA-8711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291963#comment-17291963 ] Matthias J. Sax commented on KAFKA-8711: Different but related test {quote}org.opentest4j.AssertionFailedError: Disconnect notification not received: 127.0.0.1:46181-127.0.0.1:34506-0 at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.network.SocketServerTest$TestableSocketServer.waitForChannelClose(SocketServerTest.scala:1934) at kafka.network.SocketServerTest.idleExpiryWithBufferedReceives(SocketServerTest.scala:1567){quote} log4j: {quote}[2021-02-26 20:51:36,375] ERROR Uncaught exception in thread 'data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0': (org.apache.kafka.common.utils.KafkaThread:49) kafka.network.SocketServerTest$$anon$8 [...] [2021-02-26 20:52:13,338] ERROR Closing socket for 127.0.0.1:43677-127.0.0.1:54109-0 because of error (kafka.network.Processor:76) org.apache.kafka.common.errors.InvalidRequestException: Received request api key VOTE which is not enabled [2021-02-26 20:52:13,338] DEBUG Closing selector connection 127.0.0.1:43677-127.0.0.1:54109-0 (kafka.network.Processor:62) [2021-02-26 20:52:13,339] ERROR Exception while processing request from 127.0.0.1:43677-127.0.0.1:54109-0 (kafka.network.Processor:76) org.apache.kafka.common.errors.InvalidRequestException: Received request api key VOTE which is not enabled{quote} > Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. > testControlPlaneRequest > -- > > Key: KAFKA-8711 > URL: https://issues.apache.org/jira/browse/KAFKA-8711 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Chandrasekhar >Priority: Critical > Attachments: KafkaAUTFailures07242019_PASS2.txt, > KafkaUTFailures07242019_PASS2.GIF > > > Cloned Kakfa 2.3.0 source to our git repo and compiled it using 'gradle > build', we see the following error consistently: > Gradle Version 4.7 > > testControlPlaneRequest > java.net.BindException: Address already in use (Bind failed) > at java.net.PlainSocketImpl.socketBind(Native Method) > at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > at java.net.Socket.bind(Socket.java:644) > at java.net.Socket.(Socket.java:433) > at java.net.Socket.(Socket.java:286) > at kafka.network.SocketServerTest.connect(SocketServerTest.scala:140) > at > kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1(SocketServerTest.scala:200) > at > kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1$adapted(SocketServerTest.scala:199) > at > kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1141) > at > kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:199) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at
[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart
[ https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291956#comment-17291956 ] Leonid Ilyevsky commented on KAFKA-10582: - I have the same problem, but in my case just restarting the mirror maker does not help. Here is what I have to do: stop the mirror maker, then delete the *mm2-offset-syncs..internal* topic on the source cluster, and then start the mirror maker again. I am using the mirror maker that comes with Confluent 6.1.0, and I run 3 instances on the same hosts as the target cluster. > Mirror Maker 2 not replicating new topics until restart > --- > > Key: KAFKA-10582 > URL: https://issues.apache.org/jira/browse/KAFKA-10582 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.1 > Environment: RHEL 7 Linux. >Reporter: Robert Martin >Priority: Minor > > We are using Mirror Maker 2 from the 2.5.1 release for replication on some > clusters. Replication is working as expected for existing topics. When we > create a new topic, however, Mirror Maker 2 creates the replicated topic as > expected but never starts replicating it. If we restart Mirror Maker 2 > within 2-3 minutes the topic starts replicating as expected. From > documentation we haveve seen it appears this should start replicating without > a restart based on the settings we have. > *Example:* > Create topic "mytesttopic" on source cluster > MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue > MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic" > Restart MirrorMaker 2 and now replication works for "mytesttopic" -> > "source.mytesttopic" > *Example config:* > name = source->target > group.id = source-to-target > clusters = source, target > source.bootstrap.servers = sourcehosts:9092 > target.bootstrap.servers = targethosts:9092 > source->target.enabled = true > source->target.topics = .* > target->source = false > target->source.topics = .* > replication.factor=3 > checkpoints.topic.replication.factor=3 > heartbeats.topic.replication.factor=3 > offset-syncs.topic.replication.factor=3 > offset.storage.replication.factor=3 > status.storage.replication.factor=3 > config.storage.replication.factor=3 > tasks.max = 16 > refresh.topics.enabled = true > sync.topic.configs.enabled = true > refresh.topics.interval.seconds = 300 > refresh.groups.interval.seconds = 300 > readahead.queue.capacity = 100 > emit.checkpoints.enabled = true > emit.checkpoints.interval.seconds = 5 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission
hachikuji opened a new pull request #10223: URL: https://github.com/apache/kafka/pull/10223 We now accept topicIds in the `DescribeTopic` request. If the client principal does not have `Describe` permission, then we return `UNKNOWN_TOPIC_ID` regardless whether the topic exists or not. However, if the topic does exist, then we also set its name in the response, which gives the user a way to infer existence. This is probably not a major issue since the client would still need to find the topicId first, but it is an easy hole to plug as well. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583965997 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} + +val idToName = new util.HashMap[Uuid, String] + +def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } +} +controller.findTopicIds(topicNamesToResolve).get().asScala.foreach { + case (name, idOrError) => if (idOrError.isError) { +appendResponse(name, ZERO_UUID, idOrError.error()) + } else { +maybeAppendToIdToName(idOrError.result(), name) + } +} +controller.findTopicNames(topicIdsToResolve).get().asScala.foreach { + case (id, nameOrError) => if (nameOrError.isError) { +appendResponse(null, id, nameOrError.error()) + } else { +maybeAppendToIdToName(id, nameOrError.result()) + } +} + +if (!hasClusterAuth) { + val authorizedDescribeTopics = getDescribableTopics(idToName.values().asScala) + val authorizedDeleteTopics = getDeletableTopics(idToName.values().asScala) + val iterator = idToName.entrySet().iterator() + while (iterator.hasNext) { +val entry = iterator.next() +val topicName =
[GitHub] [kafka] ijuma commented on pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
ijuma commented on pull request #8812: URL: https://github.com/apache/kafka/pull/8812#issuecomment-786935020 Tests passed, merged to trunk and 2.8. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks
rhauch commented on a change in pull request #10016: URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -259,6 +267,16 @@ public void execute() { } } +private void closeProducer(Duration duration) { +if (producer != null) { +try { +producer.close(duration); +} catch (Throwable t) { Review comment: I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for shutting down the worker's executor. Again, it's not an issue in runtime, but a *potential* issue in our tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks
rhauch commented on a change in pull request #10016: URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -259,6 +267,16 @@ public void execute() { } } +private void closeProducer(Duration duration) { +if (producer != null) { +try { +producer.close(duration); +} catch (Throwable t) { Review comment: I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for shutting down the worker's executor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12380) Executor in Connect's Worker is not shut down when the worker is
Randall Hauch created KAFKA-12380: - Summary: Executor in Connect's Worker is not shut down when the worker is Key: KAFKA-12380 URL: https://issues.apache.org/jira/browse/KAFKA-12380 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Randall Hauch The `Worker` class has an [`executor` field|https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L100] that the public constructor initializes with a new cached thread pool ([https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127|https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127].]). When the worker is stopped, it does not shutdown this executor. This is normally okay in the Connect runtime and MirrorMaker 2 runtimes, because the worker is stopped only when the JVM is stopped (via the shutdown hook in the herders). However, we instantiate and stop the herder many times in our integration tests, and this means we're not necessarily shutting down the herder's executor. Normally this won't hurt, as long as all of the runnables that the executor threads run actually do terminate. But it's possible those threads *might* not terminate in all tests. TBH, I don't know that such cases actually exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma merged pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
ijuma merged pull request #8812: URL: https://github.com/apache/kafka/pull/8812 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583962844 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic id.")) +} + } +} + +val idToName = new util.HashMap[Uuid, String] + +def maybeAppendToIdToName(id: Uuid, name: String): Unit = { + if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) { + appendResponse(name, id, new ApiError(INVALID_REQUEST, + "The same topic was specified by name and by id.")) + idToName.remove(id) + duplicatedTopicIds.add(id) + } +} +controller.findTopicIds(topicNamesToResolve).get().asScala.foreach { + case (name, idOrError) => if (idOrError.isError) { +appendResponse(name, ZERO_UUID, idOrError.error()) Review comment: It's certainly awkward that for topic IDs, we need to check existence first (since otherwise we have nothing to give to the authorizer) but for topic names, we check authorization first. But you're right, this is a leak. I'll fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583940888 ## File path: core/src/test/java/kafka/test/MockController.java ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.message.AlterIsrRequestData; +import org.apache.kafka.common.message.AlterIsrResponseData; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.ElectLeadersRequestData; +import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.controller.Controller; +import org.apache.kafka.controller.ResultOrError; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.metadata.FeatureMapAndEpoch; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + + +public class MockController implements Controller { +private final static NotControllerException NOT_CONTROLLER_EXCEPTION = +new NotControllerException("This is not the correct controller for this cluster."); + +public static class Builder { +private final Map initialTopics = new HashMap<>(); + +public Builder newInitialTopic(String name, Uuid id) { +initialTopics.put(name, new MockTopic(name, id)); +return this; +} + +public MockController build() { +return new MockController(initialTopics.values()); +} +} + +private volatile boolean active = true; + +private MockController(Collection initialTopics) { +for (MockTopic topic : initialTopics) { +topics.put(topic.id, topic); +topicNameToId.put(topic.name, topic.id); +} +} + +@Override +public CompletableFuture alterIsr(AlterIsrRequestData request) { +throw new UnsupportedOperationException(); +} + +@Override +public CompletableFuture createTopics(CreateTopicsRequestData request) { +throw new UnsupportedOperationException(); +} + +@Override +public CompletableFuture unregisterBroker(int brokerId) { +throw new UnsupportedOperationException(); +} + +static class MockTopic { +private final String name; +private final Uuid id; + +MockTopic(String name, Uuid id) { +this.name = name; +this.id = id; +} +} + +private final Map topicNameToId = new HashMap<>(); + +private final Map topics = new HashMap<>(); + +@Override +synchronized public CompletableFuture>> +findTopicIds(Collection topicNames) { +Map> results = new HashMap<>(); +for (String topicName : topicNames) { +if (!topicNameToId.containsKey(topicName)) { +System.out.println("WATERMELON: findTopicIds failed to find " + topicName); Review comment: removed :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583940677 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, Review comment: It is kind of frustrating that there is this much complexity in the "apis" class. At least there is a good unit test for it now, though. I hope that most other APIs won't be this complex. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10222: MINOR: disable test_produce_bench_transactions for Raft metadata quorum
cmccabe merged pull request #10222: URL: https://github.com/apache/kafka/pull/10222 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483 The documentation is auto-generated from the transforms code, via [TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java). Since this is an existing SMT, there are no changes that that `TransformationDoc` class, but we still have to change the `Cast` class [here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L57-L61) and [here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L83-L85) to mention that byte arrays can be cast to strings. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483 The documentation is auto-generated from the transforms code, via [TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java). Since this is an existing SMT, there are no changes required for documentation (other than changing the overview and config docs in `Cast.java`. Those changes should be made [here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L57-L61) and [here](https://github.com/apache/kafka/blob/958f90e710f0de164dce1dda5f45d75a8b8fb8d4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java#L83-L85). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483 The documentation is auto-generated from the transforms code, via [TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java). Since this is an existing SMT, there are no changes required for documentation (other than changing the overview and config docs in `Cast.java`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch edited a comment on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483 The documentation is auto-generated from the transforms code, via [TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9950: KAFKA-12170: Fix for Connect Cast SMT to correctly transform a Byte array into a string
rhauch commented on pull request #9950: URL: https://github.com/apache/kafka/pull/9950#issuecomment-786911483 The documentation is auto-generated from the code, via [TransformationDoc](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
hachikuji commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583886777 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, Review comment: There seems to be enough complexity in the handling here that it might be worth pulling this logic into a separate class. Not required for this PR, but it would be nice to come up with a nicer pattern so that we don't end up with a giant class like `KafkaApis`. ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { +if (!config.deleteTopicEnable) { + if (apiVersion < 3) { +throw new InvalidRequestException("Topic deletion is disabled.") + } else { +throw new TopicDeletionDisabledException() + } +} +val responses = new util.ArrayList[DeletableTopicResult] +val duplicatedTopicNames = new util.HashSet[String] +val topicNamesToResolve = new util.HashSet[String] +val topicIdsToResolve = new util.HashSet[Uuid] +val duplicatedTopicIds = new util.HashSet[Uuid] + +def appendResponse(name: String, id: Uuid, error: ApiError): Unit = { + responses.add(new DeletableTopicResult(). +setName(name). +setTopicId(id). +setErrorCode(error.error().code()). +setErrorMessage(error.message())) +} + +def maybeAppendToTopicNamesToResolve(name: String): Unit = { + if (duplicatedTopicNames.contains(name) || !topicNamesToResolve.add(name)) { +appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, "Duplicate topic name.")) +topicNamesToResolve.remove(name) +duplicatedTopicNames.add(name) + } +} + +def maybeAppendToIdsToResolve(id: Uuid): Unit = { + if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) { +appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate topic ID.")) +topicIdsToResolve.remove(id) +duplicatedTopicIds.add(id) + } +} + + request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve) + +request.topics().iterator().asScala.foreach { + topic => if (topic.name() == null) { +if (topic.topicId.equals(ZERO_UUID)) { + appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST, +"Neither topic name nor id were specified.")) +} else { + maybeAppendToIdsToResolve(topic.topicId()) +} + } else { +if (topic.topicId().equals(ZERO_UUID)) { + maybeAppendToTopicNamesToResolve(topic.name()) +} else { + appendResponse(topic.name(), topic.topicId(), new ApiError(INVALID_REQUEST, +"You may not specify both topic name and topic
[GitHub] [kafka] rondagostino opened a new pull request #10222: MINOR: disable test_produce_bench_transactions for Raft metadata quorum
rondagostino opened a new pull request #10222: URL: https://github.com/apache/kafka/pull/10222 Transactions are not supported in the KIP-500 early access release. This patch disables a system test for Raft metadata quorums that uses transactions and that was still enabled after https://github.com/apache/kafka/pull/10194. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 commented on pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs
dhruvilshah3 commented on pull request #10217: URL: https://github.com/apache/kafka/pull/10217#issuecomment-786895408 Thanks for the review @rajinisivaram. I ran `MirrorConnectorsIntegrationSSLTest` a few times locally and it passed. It passed in the latest jenkins run as well. Couple of failures in the latest run seem unrelated to the changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
ijuma commented on a change in pull request #8812: URL: https://github.com/apache/kafka/pull/8812#discussion_r583907925 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File, preallocate = config.preallocate)) } -recoveryPoint = activeSegment.readNextOffset Review comment: We discussed this offline and we decided to stick with the fix in this PR for now and to file a separate JIRA to consider flushing unflushed segments during recovery. That would provide stronger guarantees after a restart. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583903614 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -349,6 +357,16 @@ public void replay(PartitionChangeRecord record) { log.debug("Applied ISR change record: {}", record.toString()); } +public void replay(RemoveTopicRecord record) { Review comment: Good catch. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller
cmccabe commented on a change in pull request #10184: URL: https://github.com/apache/kafka/pull/10184#discussion_r583900849 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel, requestThrottleMs => createResponseCallback(requestThrottleMs)) } + def handleDeleteTopics(request: RequestChannel.Request): Unit = { +val responses = deleteTopics(request.body[DeleteTopicsRequest].data(), + request.context.apiVersion(), + authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME), + names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n), + names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n)) +requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => { + val responseData = new DeleteTopicsResponseData(). +setResponses(new DeletableTopicResultCollection(responses.iterator())). +setThrottleTimeMs(throttleTimeMs) + new DeleteTopicsResponse(responseData) +}) + } + + def deleteTopics(request: DeleteTopicsRequestData, + apiVersion: Int, + hasClusterAuth: Boolean, + getDescribableTopics: Iterable[String] => Set[String], + getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = { Review comment: Unfortunately, that will not work since we have to shuffle it at the end This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown
ableegoldman commented on pull request #10213: URL: https://github.com/apache/kafka/pull/10213#issuecomment-786873825 Merged to trunk and cherrypicked to 2.8 cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown
ableegoldman merged pull request #10213: URL: https://github.com/apache/kafka/pull/10213 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10213: KAFKA-12375: fix concurrency issue in application shutdown
ableegoldman commented on pull request #10213: URL: https://github.com/apache/kafka/pull/10213#issuecomment-786871581 One unrelated test failure: `kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch` Going to merge this now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12235) ZkAdminManager.describeConfigs returns no config when 2+ configuration keys are specified
[ https://issues.apache.org/jira/browse/KAFKA-12235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12235: - Fix Version/s: 2.7.1 > ZkAdminManager.describeConfigs returns no config when 2+ configuration keys > are specified > - > > Key: KAFKA-12235 > URL: https://issues.apache.org/jira/browse/KAFKA-12235 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0 >Reporter: Ivan Yurchenko >Assignee: Ivan Yurchenko >Priority: Critical > Labels: regression > Fix For: 2.8.0, 2.7.1 > > > When {{ZkAdminManager.describeConfigs}} receives {{DescribeConfigsResource}} > with 2 or more {{configurationKeys}} specified, it returns an empty > configuration. > Here's a test for {{ZkAdminManagerTest}} that reproduces this issue: > > {code:scala} > @Test > def testDescribeConfigsWithConfigurationKeys(): Unit = { > EasyMock.expect(zkClient.getEntityConfigs(ConfigType.Topic, > topic)).andReturn(TestUtils.createBrokerConfig(brokerId, "zk")) > EasyMock.expect(metadataCache.contains(topic)).andReturn(true) > EasyMock.replay(zkClient, metadataCache) > val resources = List(new > DescribeConfigsRequestData.DescribeConfigsResource() > .setResourceName(topic) > .setResourceType(ConfigResource.Type.TOPIC.id) > .setConfigurationKeys(List("retention.ms", "retention.bytes", > "segment.bytes").asJava) > ) > val adminManager = createAdminManager() > val results: List[DescribeConfigsResponseData.DescribeConfigsResult] = > adminManager.describeConfigs(resources, true, true) > assertEquals(Errors.NONE.code, results.head.errorCode()) > val resultConfigKeys = results.head.configs().asScala.map(r => > r.name()).toSet > assertEquals(Set("retention.ms", "retention.bytes", "segment.bytes"), > resultConfigKeys) > } > {code} > Works fine with one configuration key, though. > The patch is following shortly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys
cmccabe commented on a change in pull request #9990: URL: https://github.com/apache/kafka/pull/9990#discussion_r583245201 ## File path: core/src/main/scala/kafka/server/ConfigHelper.scala ## @@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def createResponseConfig(configs: Map[String, Any], createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = { -val filteredConfigPairs = if (resource.configurationKeys == null) +val filteredConfigPairs = if (resource.configurationKeys == null || resource.configurationKeys.isEmpty) Review comment: As you said earlier, the code in 2.7 is broken. So we should't be going based off of that. The correct code in 2.6 does not special-case the empty string. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291899#comment-17291899 ] A. Sophie Blee-Goldman commented on KAFKA-9880: --- Since we turned off bulk loading in 2.6.0 and removed the offending compactRange call, I think we can go ahead and close this ticket > Error while range compacting during bulk loading of FIFO compacted RocksDB > Store > > > Key: KAFKA-9880 > URL: https://issues.apache.org/jira/browse/KAFKA-9880 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Nicolas Carlot >Priority: Major > > > When restoring a non empty RocksDB state store, if it is customized to use > FIFOCompaction, the following exception is thrown: > > {code:java} > // > org.apache.kafka.streams.errors.ProcessorStateException: Error while range > compacting during restoring store merge_store > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > [kafka-stream-router.jar:?] > Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels > at org.rocksdb.RocksDB.compactRange(Native Method) > ~[kafka-stream-router.jar:?] > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) > ~[kafka-stream-router.jar:?] > ... 11 more > {code} > > > Compaction is configured through an implementation of RocksDBConfigSetter. > The exception si gone as soon as I remove: > {code:java} > // > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); > fifoOptions.setMaxTableFilesSize(maxSize); > fifoOptions.setAllowCompaction(true); > options.setCompactionOptionsFIFO(fifoOptions); > options.setCompactionStyle(CompactionStyle.FIFO); > {code} > > > Bulk loading works fine when the store is non-existent / empty. This occurs > only when there are a minimum amount of data in it. I guess it happens when > the amount SST layers is increased. > I'm currently using a forked version of Kafka 2.4.1 customizing the > RocksDBStore class with this modification as a work around: > > {code:java} > // > public void toggleDbForBulkLoading() { > try { > db.compactRange(columnFamily, true, 1, 0); > } catch (final RocksDBException e) { > try { > if (columnFamily.getDescriptor().getOptions().compactionStyle() != > CompactionStyle.FIFO) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > else { > log.warn("Compaction of store " + name + " for bulk loading > failed. Will continue without compacted store, which will be slower.", e); > } > } catch (RocksDBException e1) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > }
[jira] [Resolved] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store
[ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-9880. --- Fix Version/s: 2.6.0 Resolution: Fixed > Error while range compacting during bulk loading of FIFO compacted RocksDB > Store > > > Key: KAFKA-9880 > URL: https://issues.apache.org/jira/browse/KAFKA-9880 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Nicolas Carlot >Priority: Major > Fix For: 2.6.0 > > > > When restoring a non empty RocksDB state store, if it is customized to use > FIFOCompaction, the following exception is thrown: > > {code:java} > // > org.apache.kafka.streams.errors.ProcessorStateException: Error while range > compacting during restoring store merge_store > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > [kafka-stream-router.jar:?] > Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels > at org.rocksdb.RocksDB.compactRange(Native Method) > ~[kafka-stream-router.jar:?] > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) > ~[kafka-stream-router.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) > ~[kafka-stream-router.jar:?] > ... 11 more > {code} > > > Compaction is configured through an implementation of RocksDBConfigSetter. > The exception si gone as soon as I remove: > {code:java} > // > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); > fifoOptions.setMaxTableFilesSize(maxSize); > fifoOptions.setAllowCompaction(true); > options.setCompactionOptionsFIFO(fifoOptions); > options.setCompactionStyle(CompactionStyle.FIFO); > {code} > > > Bulk loading works fine when the store is non-existent / empty. This occurs > only when there are a minimum amount of data in it. I guess it happens when > the amount SST layers is increased. > I'm currently using a forked version of Kafka 2.4.1 customizing the > RocksDBStore class with this modification as a work around: > > {code:java} > // > public void toggleDbForBulkLoading() { > try { > db.compactRange(columnFamily, true, 1, 0); > } catch (final RocksDBException e) { > try { > if (columnFamily.getDescriptor().getOptions().compactionStyle() != > CompactionStyle.FIFO) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > else { > log.warn("Compaction of store " + name + " for bulk loading > failed. Will continue without compacted store, which will be slower.", e); > } > } catch (RocksDBException e1) { > throw new ProcessorStateException("Error while range compacting > during restoring store " + name, e); > } > } > } > {code} > > I'm not very proud of this workaround, but it suits my use cases
[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
hachikuji commented on a change in pull request #8812: URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File, preallocate = config.preallocate)) } -recoveryPoint = activeSegment.readNextOffset Review comment: I think it is a gap that there is no minimum replication factor before a write can get exposed. Any writes that end up seeing the `NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are unacknowledged writes, and the producer is expected to retry, but the consumer can still read them once the ISR shrinks and we would still view it as "data loss" if the broker failed before they could be flushed to disk. With the "strict min isr" proposal, the leader is not allowed to shrink the ISR lower than some replication factor, which helps to plug this hole. Going back to @purplefox's suggestion, it does seem like a good idea to flush segments beyond the recovery point during recovery. It kind of serves to constrain the initial state of the system which makes it easier to reason about (e.g. you only need to worry about the loss of unflushed data from the last restart). Some of the flush weaknesses probably still exist though regardless of this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API
hachikuji commented on a change in pull request #10206: URL: https://github.com/apache/kafka/pull/10206#discussion_r583877084 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ## @@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int, throw new IllegalStateException(s"Unexpected empty transaction metadata returned while putting $txnMetadata"))) } + def listTransactionStates( +filterProducerIds: Set[Long], +filterStateNames: Set[String] + ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = { +inReadLock(stateLock) { + if (loadingPartitions.nonEmpty) { +Left(Errors.COORDINATOR_LOAD_IN_PROGRESS) + } else { +val filterStates = filterStateNames.flatMap(TransactionState.fromName) Review comment: It's a reasonable suggestion. Let me give it a try. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys
cmccabe merged pull request #9990: URL: https://github.com/apache/kafka/pull/9990 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10212: MINOR: Add cluster-metadata-decoder to DumpLogSegments
cmccabe merged pull request #10212: URL: https://github.com/apache/kafka/pull/10212 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9958: MINOR: Clean up group instance id handling in `GroupCoordinator`
hachikuji merged pull request #9958: URL: https://github.com/apache/kafka/pull/9958 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman closed pull request #10214: MINOR: fix message and reduce log level of PartitionGroup enforced processing
ableegoldman closed pull request #10214: URL: https://github.com/apache/kafka/pull/10214 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10214: MINOR: fix message and reduce log level of PartitionGroup enforced processing
ableegoldman commented on pull request #10214: URL: https://github.com/apache/kafka/pull/10214#issuecomment-786832334 Closing since this issue should be addressed on the side in https://github.com/apache/kafka/pull/10137 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10212: MINOR: Add cluster-metadata-decoder to DumpLogSegments
cmccabe commented on a change in pull request #10212: URL: https://github.com/apache/kafka/pull/10212#discussion_r583849492 ## File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala ## @@ -242,7 +246,8 @@ object DumpLogSegments { nonConsecutivePairsForLogFilesMap: mutable.Map[String, List[(Long, Long)]], isDeepIteration: Boolean, maxMessageSize: Int, - parser: MessageParser[_, _]): Unit = { + parser: MessageParser[_, _], + skipBatchMetadata: Boolean): Unit = { Review comment: let me rename this to `--skip-record-metadata` to make it clearer. I think per-batch metadata is still useful (in particular, offset) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10137: KAFKA-12268: Implement task idling semantics via currentLag API
ableegoldman commented on a change in pull request #10137: URL: https://github.com/apache/kafka/pull/10137#discussion_r583848994 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) { return false; } else { enforcedProcessingSensor.record(1.0d, wallClockTime); -logger.info("Continuing to process although some partition timestamps were not buffered locally." + +logger.trace("Continuing to process although some partitions are empty on the broker." + Review comment: Maybe we could leave this detailed logging at TRACE, and just print a single message at `warn` the first time this enforced processing occurs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
hachikuji commented on a change in pull request #8812: URL: https://github.com/apache/kafka/pull/8812#discussion_r583835401 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File, preallocate = config.preallocate)) } -recoveryPoint = activeSegment.readNextOffset Review comment: I think it is a gap that there is no minimum replication factor before a write can get exposed. Any writes that end up seeing the `NOT_ENOUGH_REPLICAS_AFTER_APPEND` error code are more vulnerable. These are unacknowledged writes, and the producer is expected to retry, but the consumer can still read them once the ISR shrinks and we would still view it as "data loss" if the broker failed before they could be flushed to disk. With the "strict min isr" proposal, the leader is not allowed to shrink the ISR lower than some replication factor, which helps to plug this hole. Going back to @purplefox's suggestion, it does seem like a good idea to flush segments beyond the recovery point during recovery. It kind of serves to constrain the initial state of the system which makes it easier to reason about (e.g. you only need to worry about the loss of unflushed data from the previous restart). Some of the flush weaknesses probably still exist though regardless of this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0
ijuma commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r583834500 ## File path: build.gradle ## @@ -2026,52 +2043,53 @@ project(':connect:runtime') { archivesBaseName = "connect-runtime" dependencies { - -compile project(':connect:api') -compile project(':clients') -compile project(':tools') -compile project(':connect:json') -compile project(':connect:transforms') - -compile libs.slf4jApi -compile libs.jacksonJaxrsJsonProvider -compile libs.jerseyContainerServlet -compile libs.jerseyHk2 -compile libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 -compile libs.activation // Jersey dependency that was available in the JDK before Java 9 -compile libs.jettyServer -compile libs.jettyServlet -compile libs.jettyServlets -compile libs.jettyClient -compile(libs.reflections) -compile(libs.mavenArtifact) - -testCompile project(':clients').sourceSets.test.output -testCompile libs.easymock -testCompile libs.junitJupiterApi -testCompile libs.junitVintageEngine -testCompile libs.powermockJunit4 -testCompile libs.powermockEasymock -testCompile libs.mockitoCore -testCompile libs.httpclient - -testCompile project(':clients').sourceSets.test.output -testCompile project(':core') -testCompile project(':core').sourceSets.test.output - -testRuntime libs.slf4jlog4j +implementation project(':connect:api') Review comment: The difference is that `connect-runtime` doesn't expose `connect-api` and others transitively anymore. If `connect-runtime` is used for tests where people _only_ depend on `connect-runtime` (this is an anti-pattern, but easy to use), things may break for them. That is the reason why I used `api` for the `clients` dependency in `core`, so I'll do the same for `connect-runtime`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8812: KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs
junrao commented on a change in pull request #8812: URL: https://github.com/apache/kafka/pull/8812#discussion_r583834117 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File, preallocate = config.preallocate)) } -recoveryPoint = activeSegment.readNextOffset Review comment: For the case that Tim mentioned, if we defer advancing the recovery point, at step 5, the broker will be forced to do log recovery for all unflushed data. If the data is corrupted on disk, it will be detected during recovery. For the other case that Ismael mentioned, it is true that data can be lost in that case, but then this is the case where all replicas have failed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10217: KAFKA-12254: Ensure MM2 creates topics with source topic configs
dhruvilshah3 commented on a change in pull request #10217: URL: https://github.com/apache/kafka/pull/10217#discussion_r583832959 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -306,40 +307,84 @@ private void createOffsetSyncsTopic() { MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig()); } -// visible for testing -void computeAndCreateTopicPartitions() -throws InterruptedException, ExecutionException { -Map partitionCounts = knownSourceTopicPartitions.stream() -.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream() -.collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), Entry::getValue)); -Set knownTargetTopics = toTopics(knownTargetTopicPartitions); -List newTopics = partitionCounts.entrySet().stream() -.filter(x -> !knownTargetTopics.contains(x.getKey())) -.map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor)) -.collect(Collectors.toList()); -Map newPartitions = partitionCounts.entrySet().stream() -.filter(x -> knownTargetTopics.contains(x.getKey())) -.collect(Collectors.toMap(Entry::getKey, x -> NewPartitions.increaseTo(x.getValue().intValue(; -createTopicPartitions(partitionCounts, newTopics, newPartitions); +void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException { +// get source and target topics with respective partition counts +Map sourceTopicToPartitionCounts = knownSourceTopicPartitions.stream() +.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream() +.collect(Collectors.toMap(Entry::getKey, Entry::getValue)); +Map targetTopicToPartitionCounts = knownTargetTopicPartitions.stream() +.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream() +.collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + +Set knownSourceTopics = sourceTopicToPartitionCounts.keySet(); +Set knownTargetTopics = targetTopicToPartitionCounts.keySet(); +Map sourceToRemoteTopics = knownSourceTopics.stream() +.collect(Collectors.toMap(Function.identity(), sourceTopic -> formatRemoteTopic(sourceTopic))); + +// compute existing and new source topics +Map> partitionedSourceTopics = knownSourceTopics.stream() +.collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)), +Collectors.toSet())); +Set existingSourceTopics = partitionedSourceTopics.get(true); +Set newSourceTopics = partitionedSourceTopics.get(false); + +// create new topics +if (!newSourceTopics.isEmpty()) +createNewTopics(newSourceTopics, sourceTopicToPartitionCounts); + +// compute topics with new partitions +Map sourceTopicsWithNewPartitions = existingSourceTopics.stream() +.filter(sourceTopic -> { +String targetTopic = sourceToRemoteTopics.get(sourceTopic); +return sourceTopicToPartitionCounts.get(sourceTopic) > targetTopicToPartitionCounts.get(targetTopic); +}) +.collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get)); + +// create new partitions +if (!sourceTopicsWithNewPartitions.isEmpty()) { +Map newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream() +.collect(Collectors.toMap(sourceTopicAndPartitionCount -> sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()), +sourceTopicAndPartitionCount -> NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue(; +createNewPartitions(newTargetPartitions); +} +} + +private void createNewTopics(Set newSourceTopics, Map sourceTopicToPartitionCounts) +throws ExecutionException, InterruptedException { +Map sourceTopicToConfig = describeTopicConfigs(newSourceTopics); +List newTopics = newSourceTopics.stream() +.map(sourceTopic -> { +String remoteTopic = formatRemoteTopic(sourceTopic); +int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue(); +Map configs = configToMap(sourceTopicToConfig.get(sourceTopic)); +return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor) +.configs(configs); +}) +
[GitHub] [kafka] mimaison opened a new pull request #10221: KAFKA-12379: Allow configuring the location of the offset-syncs topic…
mimaison opened a new pull request #10221: URL: https://github.com/apache/kafka/pull/10221 … with MirrorMaker2 This commit implements KIP-716. It introduces a new setting `offset-syncs.topic.location` that allows specifying where the offset-syncs topic is created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2
[ https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-12379: --- Description: Ticket for KIP-716 https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2 was: Ticket for KIP-716 https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2 > KIP-716: Allow configuring the location of the offsetsync topic with > MirrorMaker2 > - > > Key: KAFKA-12379 > URL: https://issues.apache.org/jira/browse/KAFKA-12379 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > Ticket for KIP-716 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offset-syncs+topic+with+MirrorMaker2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2
[ https://issues.apache.org/jira/browse/KAFKA-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-12379: -- Assignee: Mickael Maison > KIP-716: Allow configuring the location of the offsetsync topic with > MirrorMaker2 > - > > Key: KAFKA-12379 > URL: https://issues.apache.org/jira/browse/KAFKA-12379 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > Ticket for KIP-716 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12362) Determine if a Task is idling
[ https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12362: --- Description: determine if a task is idling given the task Id. (was: determine if a task is idling given the task Id. https://github.com/apache/kafka/pull/10180) > Determine if a Task is idling > - > > Key: KAFKA-12362 > URL: https://issues.apache.org/jira/browse/KAFKA-12362 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > determine if a task is idling given the task Id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12379) KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2
Mickael Maison created KAFKA-12379: -- Summary: KIP-716: Allow configuring the location of the offsetsync topic with MirrorMaker2 Key: KAFKA-12379 URL: https://issues.apache.org/jira/browse/KAFKA-12379 Project: Kafka Issue Type: Improvement Reporter: Mickael Maison Ticket for KIP-716 https://cwiki.apache.org/confluence/display/KAFKA/KIP-716%3A+Allow+configuring+the+location+of+the+offsetsync+topic+with+MirrorMaker2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-12362) Determine if a Task is idling
[ https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson closed KAFKA-12362. -- > Determine if a Task is idling > - > > Key: KAFKA-12362 > URL: https://issues.apache.org/jira/browse/KAFKA-12362 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > determine if a task is idling given the task Id. > > https://github.com/apache/kafka/pull/10180 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12362) Determine if a Task is idling
[ https://issues.apache.org/jira/browse/KAFKA-12362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson resolved KAFKA-12362. Resolution: Abandoned > Determine if a Task is idling > - > > Key: KAFKA-12362 > URL: https://issues.apache.org/jira/browse/KAFKA-12362 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Major > Fix For: 3.0.0 > > > determine if a task is idling given the task Id. > > https://github.com/apache/kafka/pull/10180 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic
[ https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291843#comment-17291843 ] Jose Armando Garcia Sancio commented on KAFKA-12376: Yes [~rounak] . I am working on it at the moment. I'll update the Jira soon with more information. > Use scheduleAtomicAppend for records that need to be atomic > --- > > Key: KAFKA-12376 > URL: https://issues.apache.org/jira/browse/KAFKA-12376 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
wcarlson5 commented on a change in pull request #10215: URL: https://github.com/apache/kafka/pull/10215#discussion_r583808502 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); -threads.remove(deadThread); Review comment: `deadThread.shutdown();` I was referring to this below. But if we don't need to keep the same for any reason I am fine either way This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
[ https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291826#comment-17291826 ] Matthias J. Sax commented on KAFKA-12319: - Failed again {quote}java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 37.5751503006012 (600 connections / 15.968 sec) ==> expected: <30.0> but was: <37.5751503006012> at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:411) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:411) [...] Caused by: org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 37.5751503006012 (600 connections / 15.968 sec) ==> expected: <30.0> but was: <37.5751503006012> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1003) at kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:903) at kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$2(ConnectionQuotasTest.scala:409){quote} > Flaky test > ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() > - > > Key: KAFKA-12319 > URL: https://issues.apache.org/jira/browse/KAFKA-12319 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I've seen this test fail a few times locally. But recently I saw it fail on a > PR build on Jenkins. > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/] > h3. Error Message > java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: > Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 > sec) ==> expected: <30.0> but was: <37.436825357209706> > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12378) If a broker is down for more then `delete.retention.ms` deleted records in a compacted topic can come back.
Shane created KAFKA-12378: - Summary: If a broker is down for more then `delete.retention.ms` deleted records in a compacted topic can come back. Key: KAFKA-12378 URL: https://issues.apache.org/jira/browse/KAFKA-12378 Project: Kafka Issue Type: Bug Reporter: Shane If the leader of a compacted topic goes offline, or has replication lag longer than the `delete.retention.ms` of a topic, records that are tombstoned can come back once the leader catches up then becomes the leader. Example of this happening: Topic config: name: compacted-topic settings: delete.retention.ms=0 Leader: broker 1 ISR: broker 1, broker 2, broker 3 Producer 1 writes a record `1:foo` Producer 1 writes a record `2:bar` broker 1 goes offline broker 2 takes over leadership Producer 1 writes a tombstone `1:NULL` broker 2 compacts the topic, which leaves the topic with `1:NULL` and `2:bar` in it. broker 2 removes the tombstone leaving just `2:bar` in the topic. broker 1 comes back online, catches up with replication, takes back leadership broker 1 now has `1:foo` and `2:bar` as the data, since the tombstone is deleted At this point the topic is in a strange state, as the brokers have conflicting data. Suggestion: I believe this to be quite a hard problem to solve, so I'm not going to suggest any large changes to the codebase, but I think a warning in the docs about `delete.retention.ms` is warranted. I think adding something that calls out that brokers are also consumers here: [https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#topicconfigs_delete.retention.ms] would be helpful, but even further documentation about what happens when a broker is offline for more than `delete.retention.ms` would be nice to see. If it helps I'm happy to take a first draft at updating the docs as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
cadonna commented on a change in pull request #10215: URL: https://github.com/apache/kafka/pull/10215#discussion_r583801473 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); -threads.remove(deadThread); Review comment: We cannot wait here until the dead thread is shutdown because the shutdown happens after `replaceStreamThread()` throws the exception. So we would wait forever. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
[ https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291820#comment-17291820 ] Matthias J. Sax commented on KAFKA-12284: - Failde again: {quote}java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'primary.test-topic-2' already exists. {quote} > Flaky Test > MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync > - > > Key: KAFKA-12284 > URL: https://issues.apache.org/jira/browse/KAFKA-12284 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470] > {quote} {{java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}} > [...] > > {{Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364) > ... 92 more > Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists.}} > {quote} > STDOUT > {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:354) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748)}}{quote} > {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state > info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}} > {{}} > {quote} -- This
[jira] [Commented] (KAFKA-8003) Flaky Test TransactionsTest #testFencingOnTransactionExpiration
[ https://issues.apache.org/jira/browse/KAFKA-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291818#comment-17291818 ] Matthias J. Sax commented on KAFKA-8003: Different test method {{kafka.api.TransactionsTest.testSendOffsetsWithGroupId()}} {quote}org.opentest4j.AssertionFailedError: Topic [topic1] metadata not propagated after 6 ms at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.Assertions.fail(Assertions.java:117) at kafka.utils.TestUtils$.waitForAllPartitionsMetadata(TestUtils.scala:852) at kafka.utils.TestUtils$.createTopic(TestUtils.scala:367) at kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:133){quote} STDOUT: {quote}[2021-02-26 11:17:26,311] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/kafka8173812164698481311.tmp'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:1094) [2021-02-26 11:17:26,311] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient:74) [2021-02-26 11:17:26,647] ERROR [RequestSendThread controllerId=0] Controller 0 fails to send a request to broker localhost:41953 (id: 2 rack: null) (kafka.controller.RequestSendThread:76) java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at kafka.utils.ShutdownableThread.pause(ShutdownableThread.scala:82) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:234) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){quote} > Flaky Test TransactionsTest #testFencingOnTransactionExpiration > --- > > Key: KAFKA-8003 > URL: https://issues.apache.org/jira/browse/KAFKA-8003 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > Fix For: 2.2.3 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/34/] > {quote}java.lang.AssertionError: expected:<1> but was:<0> at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:645) at > org.junit.Assert.assertEquals(Assert.java:631) at > kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:510){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks
rhauch commented on a change in pull request #10016: URL: https://github.com/apache/kafka/pull/10016#discussion_r583796206 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -259,6 +267,16 @@ public void execute() { } } +private void closeProducer(Duration duration) { +if (producer != null) { +try { +producer.close(duration); +} catch (Throwable t) { Review comment: > Right now there aren't any code paths that lead to the worker's executor being shut down. Hmm, that seems to have been done a long time ago. I wonder if that was an oversight, or whether that was intentional since in Connect the `Worker::stop()` is called when the herder is stopped, which only happens (in Connect) when the shutdown hook is called -- at which point the JVM is terminating anyway. Luckily MM2 works the same way. But in our test cases that use `EmbeddedConnectCluster`, those tests are not cleaning up all resources of the Worker (and thus Herder) -- we might have threads that still keep running. Seems like we should address that in a different issue. I'll log something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
Matthias J. Sax created KAFKA-12377: --- Summary: Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener Key: KAFKA-12377 URL: https://issues.apache.org/jira/browse/KAFKA-12377 Project: Kafka Issue Type: Test Components: core, security, unit tests Reporter: Matthias J. Sax {quote}org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote} STDOUT {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context without a token (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) java.io.IOException: Extensions provided in login context without a token at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) [...] Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: Extensions provided in login context without a token at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192) at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) ... 116 more{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic
[ https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291784#comment-17291784 ] Rounak Datta commented on KAFKA-12376: -- {quote}records that need to be atomic {quote} Can you help understand which type of records are these? That is, how do we identify these records and use `scheduleAtomicAppend` for them? > Use scheduleAtomicAppend for records that need to be atomic > --- > > Key: KAFKA-12376 > URL: https://issues.apache.org/jira/browse/KAFKA-12376 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10215: KAFKA-12375: don't reuse thread.id until a thread has fully shut down
wcarlson5 commented on a change in pull request #10215: URL: https://github.com/apache/kafka/pull/10215#discussion_r583787814 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; +// Don't remove from threads until shutdown is complete. We will trim it from the +// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the +// shutdown then we should just consider this thread.id to be burned +} else { +threads.remove(streamThread); Review comment: If we purge the dead threads before we add new ones and if we remove the assumption that there are no dead threads in the thread list we can just not remove the threads in remove thread. This will make it there should be no concern about the cache size changing when a thread is removing itself. And make the risk we took about memory overflows unnecessary. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; +// Don't remove from threads until shutdown is complete. We will trim it from the +// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the Review comment: Where do we trim this list? I don't thing we do. In the begging of `addStreamThread()` can we purge the dead threads? That is the only place it should matter ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -463,9 +464,8 @@ private void replaceStreamThread(final Throwable throwable) { closeToError(); } final StreamThread deadThread = (StreamThread) Thread.currentThread(); -threads.remove(deadThread); Review comment: I remember that we had the replace use the same ID for a reason. (maybe it had to do with rebalancing?). I don't think there should be a problem to try to get the same ID by waiting a bit in the replace thread ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1047,9 +1047,15 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; +// Don't remove from threads until shutdown is complete. We will trim it from the +// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the +// shutdown then we should just consider this thread.id to be burned +} else { +threads.remove(streamThread); } } -threads.remove(streamThread); +// Don't remove from threads until shutdown is complete since this will let another thread +// reuse its thread.id. We will trim any DEAD threads from the list later final long cacheSizePerThread = getCacheSizePerThread(threads.size()); Review comment: Perviously we had relied on the fact there were no dead threads in the list This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12375) ReplaceStreamThread creates a new consumer with the same name as the one it's replacing
[ https://issues.apache.org/jira/browse/KAFKA-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17291777#comment-17291777 ] Bruno Cadonna commented on KAFKA-12375: --- I agree with [~ableegoldman] that ensuring a unique thread ID solves also other issues like the metrics issue above. We cannot wait for the metrics to be removed from the metrics because that happens during the shutdown that means after the stream thread has been replaced. > ReplaceStreamThread creates a new consumer with the same name as the one it's > replacing > --- > > Key: KAFKA-12375 > URL: https://issues.apache.org/jira/browse/KAFKA-12375 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Tomasz Nguyen >Assignee: Tomasz Nguyen >Priority: Blocker > Fix For: 2.8.0 > > > I was debugging the kafka-streams soak cluster and noticed that replacing a > stream thread was causing the streams application to fail. I have managed to > find the following stacktrace: > {code:java} > javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:815) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338) > at > org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896) > at > org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) > at > org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467) > at > org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487) > at > org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) > {code} > > followed by: > {code:java} > Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430) > at > org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261) > at > org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666) > at > org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508) > at > org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579) > at > org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508) > at > org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) > {code} > My understanding so far is that we re-use the consumer name across thread > generations which can hit a few flavours of a race condition. My suggestion > would be to add the generation-id to the consumer name. > This could be done by adding a thread generation id here > https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66 > or by adding an overload here: >
[GitHub] [kafka] rhauch commented on a change in pull request #10203: MINOR: Prepare for Gradle 7.0
rhauch commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r583780438 ## File path: build.gradle ## @@ -2026,52 +2043,53 @@ project(':connect:runtime') { archivesBaseName = "connect-runtime" dependencies { - -compile project(':connect:api') -compile project(':clients') -compile project(':tools') -compile project(':connect:json') -compile project(':connect:transforms') - -compile libs.slf4jApi -compile libs.jacksonJaxrsJsonProvider -compile libs.jerseyContainerServlet -compile libs.jerseyHk2 -compile libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 -compile libs.activation // Jersey dependency that was available in the JDK before Java 9 -compile libs.jettyServer -compile libs.jettyServlet -compile libs.jettyServlets -compile libs.jettyClient -compile(libs.reflections) -compile(libs.mavenArtifact) - -testCompile project(':clients').sourceSets.test.output -testCompile libs.easymock -testCompile libs.junitJupiterApi -testCompile libs.junitVintageEngine -testCompile libs.powermockJunit4 -testCompile libs.powermockEasymock -testCompile libs.mockitoCore -testCompile libs.httpclient - -testCompile project(':clients').sourceSets.test.output -testCompile project(':core') -testCompile project(':core').sourceSets.test.output - -testRuntime libs.slf4jlog4j +implementation project(':connect:api') Review comment: Connector projects do require the `connect-api` module as a compile-time dependency and should not depend on `connect-runtime` as a compile-time dependency. Most (if not all) will include `connect-runtime` as a *test* dependency, though. I don't really see anything that changed in the compile-time dependencies relative to the base commit. In particular, `connect-api` was included (on line 2030) before and is again. I think the removal of the blank line (2029) caused this diff page to render a bit strange. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org