[GitHub] [kafka] showuon commented on pull request #9690: KAFKA-10017: fix flaky EOS-beta upgrade test
showuon commented on pull request #9690: URL: https://github.com/apache/kafka/pull/9690#issuecomment-747243284 @mjsax , I further investigated the issue I found last week: > It's because sometimes, the keys in stream store is empty, and that's why the following computation based on the variable is wrong. Here's the logs I got: (They mapped to the code here, which is in phase 6) ``` keysFirstClientBeta is [] keysSecondClientAlpha is [1, 3] ``` I finally found out the root cause, it's because **the stream is not completed the stable assignment rebalancing** during `keysFromInstance(streams)`. Echo the discussion in my another PR: https://github.com/apache/kafka/pull/9733#discussion_r543859937, we did have "cut off" the unstable rebalances via `assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);`, but with this, we can only make sure the assignment is completed and stable now, but the streams haven't completed the REBALANCING yet. The workflow is like this: Coordinator finished stable assignment of tasks -> notify tasks -> task handles the new assignment -> **stream thread** change state from RUNNING to PARTITIONS_ASSIGNED -> **stream client** change state from RUNNING to REBALANCING -> **stream thread** change state from PARTITIONS_ASSIGNED to RUNNING -> **stream client** change state from REBALANCING to RUNNING And what we can make sure via `assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);` is only the step 1: `Coordinator finished stable assignment of tasks` completes. Also remember, the current stream state is `RUNNING`, which will pass the following checking (`waitForRunning()`). And the empty list is because the 1st unstable assignment to the stream is empty: ``` 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] [2020-12-17 14:07:32,331] INFO [Consumer clientId=appDir1-StreamThread-1-consumer, groupId=appId-1] Updating assignment with 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] Assigned partitions: [] 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] Current owned partitions: [] 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] Added partitions (assigned - owned): [] 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] Revoked partitions (owned - assigned): [] 2020-12-17T14:07:32.331+0800 [DEBUG] [TestEventLogger] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393) ``` So, that's why we got the empty key list form the stream store. As I mentioned in #9733 , after `assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);`, I think we should explicitly wait for specific transition pair, ex: [KeyValue(RUNNING, REBALANCING), KeyValue(REBALANCING, RUNNING)], instead of waiting for RUNNING state only. Also, as you said, there might be more than 2 rebalancing happened after 1 stream started, I think we can have a count for `onAssignmentComplete` after `prepareForRebalance` (unstable + stable count), so that after it's stable assigned, we can know exactly how many rebalancing happened so that we can check the state transition content, ex: with 2 assignment happened, we can check if state transition list has 2 rebalancing value, and the last one is RUNNING state...etc. Anyway, that's my finding, share with you. I'll update in my PR #9733 (maybe next week since a little busy these days). Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol
chia7712 commented on a change in pull request #9754: URL: https://github.com/apache/kafka/pull/9754#discussion_r544844791 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ## @@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) { return deserializeTopicPartitionAssignment(userData); } -// visible for testing static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { -Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); -List topicAssignments = new ArrayList<>(); +return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION); +} + +// visible for testing +static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) { + +List topicAssignments = new ArrayList<>(); for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { -Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); -topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); -topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); -topicAssignments.add(topicAssignment); +StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition() +.setTopic(topicEntry.getKey()) +.setPartitions(topicEntry.getValue()); +topicAssignments.add(topicPartition); +} +StickyAssignorUserData data = new StickyAssignorUserData() +.setPreviousAssignment(topicAssignments); +if (version >= 1) { +memberData.generation.ifPresent(data::setGeneration); } -struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); -if (memberData.generation.isPresent()) -struct.set(GENERATION_KEY_NAME, memberData.generation.get()); -ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct)); -STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct); -buffer.flip(); -return buffer; +return MessageUtil.toVersionPrefixedByteBuffer(version, data); } -private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) { -Struct struct; -ByteBuffer copy = buffer.duplicate(); +private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) { +StickyAssignorUserData data; try { -struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer); +data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version); Review comment: > I think we needn't keep this backtrack compatibility since this code is only executed at client. It seems to me that is not allowed since the different (consumer) clients with different (kafka) versions should be able to work together. Please take a look at https://cwiki.apache.org/confluence/display/KAFKA/KIP-341%3A+Update+Sticky+Assignor%27s+User+Data+Protocol to see how Kafka consider backward/Forward compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongxuwang opened a new pull request #9763: MINOR: Use ApiUtils' methods static imported consistently.
dongxuwang opened a new pull request #9763: URL: https://github.com/apache/kafka/pull/9763 Because there have been using the static import methods of ApiUtils, it is not necessary to import the whole ApiUtils which also keeps the consistency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol
dengziming commented on a change in pull request #9754: URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ## @@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) { return deserializeTopicPartitionAssignment(userData); } -// visible for testing static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { -Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); -List topicAssignments = new ArrayList<>(); +return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION); +} + +// visible for testing +static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) { + +List topicAssignments = new ArrayList<>(); for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { Review comment: Because topicAssignments put all partitions of a topic in a field. I think the name `TopicPartition` is misleading and I changed it to `TopicPartitions`, and also the `TopicPartition` in `ConsumerProtocolAssignment.json` and `ConsumerProtocolSubscription.json` is also misleading, we could also create a new pr the alter them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol
dengziming commented on a change in pull request #9754: URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ## @@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) { return deserializeTopicPartitionAssignment(userData); } -// visible for testing static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { -Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); -List topicAssignments = new ArrayList<>(); +return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION); +} + +// visible for testing +static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) { + +List topicAssignments = new ArrayList<>(); for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { -Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); -topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); -topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); -topicAssignments.add(topicAssignment); +StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition() +.setTopic(topicEntry.getKey()) +.setPartitions(topicEntry.getValue()); +topicAssignments.add(topicPartition); +} +StickyAssignorUserData data = new StickyAssignorUserData() +.setPreviousAssignment(topicAssignments); +if (version >= 1) { +memberData.generation.ifPresent(data::setGeneration); } -struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); -if (memberData.generation.isPresent()) -struct.set(GENERATION_KEY_NAME, memberData.generation.get()); -ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct)); -STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct); -buffer.flip(); -return buffer; +return MessageUtil.toVersionPrefixedByteBuffer(version, data); } -private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) { -Struct struct; -ByteBuffer copy = buffer.duplicate(); +private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) { +StickyAssignorUserData data; try { -struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer); +data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version); Review comment: Yes, the previous ser and de-ser carry no version so the code isn't graceful, I think we needn't keep this backtrack compatibility since this code is only executed at client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol
dengziming commented on a change in pull request #9754: URL: https://github.com/apache/kafka/pull/9754#discussion_r544818885 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ## @@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) { return deserializeTopicPartitionAssignment(userData); } -// visible for testing static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { -Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); -List topicAssignments = new ArrayList<>(); +return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION); +} + +// visible for testing +static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) { + +List topicAssignments = new ArrayList<>(); for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { -Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT); -topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); -topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); -topicAssignments.add(topicAssignment); +StickyAssignorUserData.TopicPartition topicPartition = new StickyAssignorUserData.TopicPartition() +.setTopic(topicEntry.getKey()) +.setPartitions(topicEntry.getValue()); +topicAssignments.add(topicPartition); +} +StickyAssignorUserData data = new StickyAssignorUserData() +.setPreviousAssignment(topicAssignments); +if (version >= 1) { +memberData.generation.ifPresent(data::setGeneration); } -struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); -if (memberData.generation.isPresent()) -struct.set(GENERATION_KEY_NAME, memberData.generation.get()); -ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct)); -STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct); -buffer.flip(); -return buffer; +return MessageUtil.toVersionPrefixedByteBuffer(version, data); } -private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) { -Struct struct; -ByteBuffer copy = buffer.duplicate(); +private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer, short version) { +StickyAssignorUserData data; try { -struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer); +data = new StickyAssignorUserData(new ByteBufferAccessor(buffer), version); Review comment: Yes, the previous ser and de-ser carry no version so the code isn't graceful, I think we needn't keep this backtrack compatability since this code is only executed at client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9754: KAFKA-10856: Convert sticky assignor userData schemas to use generated protocol
dengziming commented on a change in pull request #9754: URL: https://github.com/apache/kafka/pull/9754#discussion_r544811722 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ## @@ -222,51 +204,50 @@ protected MemberData memberData(Subscription subscription) { return deserializeTopicPartitionAssignment(userData); } -// visible for testing static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) { -Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1); -List topicAssignments = new ArrayList<>(); +return serializeTopicPartitionAssignment(memberData, StickyAssignorUserData.HIGHEST_SUPPORTED_VERSION); +} + +// visible for testing +static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData, short version) { + +List topicAssignments = new ArrayList<>(); for (Map.Entry> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) { Review comment: Because topicAssignments put all partitions of a topic in a field. I think the name `TopicPartition` is misleading and I changed it to `TopicPartitions`, and also the `TopicPartition` in ConsumerProtocolAssignment.json and ConsumerProtocolSubscription.json is also misleading, we could also create a new pr the alter them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #9707: KAFKA-10790 Detect/Prevent Deadlock on Producer Network Thread
g1geordie commented on pull request #9707: URL: https://github.com/apache/kafka/pull/9707#issuecomment-747188758 @ijuma hello The close method want to prevent self-join but there are no deadlock . The flush method in callback has a deadlock. The deadlock is because `flush` wait the `sender` send all message . but messages are done after callback complete . **flush in callback , callback will never complete .** `flush`'s semanteme is send message . so wait all message send I think it's necessary . Messages are done after callback I think is option , Or we can make async to `callback` ,then messages will done no matter `callback` is complete. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest
chia7712 commented on pull request #9760: URL: https://github.com/apache/kafka/pull/9760#issuecomment-747184218 @govi20 It seems we don't need to box the primitive type as scala compiler complete the conversion for us, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
dengziming commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: Thank you for your suggestions, I now understand the solution. I remove the topicId in `removePartitionInfo`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
dengziming commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: Thank you for your suggestions, I now understand the solution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
chia7712 commented on a change in pull request #9761: URL: https://github.com/apache/kafka/pull/9761#discussion_r544768525 ## File path: clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java ## @@ -120,4 +121,26 @@ public void iteratorRaisesOnTooLargeRecords() { logInputStream.nextBatch(); } +@Test +public void testReadUnsignedIntFromInputStream() { Review comment: Could you move the tests to ```ByteBufferInputStreamTest```? If there is no such test class, feel free to create new one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
chia7712 commented on pull request #9761: URL: https://github.com/apache/kafka/pull/9761#issuecomment-747165929 @bertber Please take a look at origin PR (https://github.com/apache/kafka/pull/3752/files#diff-ae627decd5dd27d053a7a9b051860f60df6b89c8884e50df2f9ae0de6a4645e5R40). It would be better to fix the scenario of 'len==0'. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9750: MINOR: Change toArray usage for Increase efficiency
chia7712 commented on pull request #9750: URL: https://github.com/apache/kafka/pull/9750#issuecomment-747165373 @APaMio Could you rebase code to include recently big commits? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`
[ https://issues.apache.org/jira/browse/KAFKA-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10861. - Resolution: Fixed > Flaky test `TransactionsTest.testFencingOnSendOffsets` > -- > > Key: KAFKA-10861 > URL: https://issues.apache.org/jira/browse/KAFKA-10861 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > {code} > org.scalatest.exceptions.TestFailedException: Got an unexpected exception > from a fenced producer. > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1107) > at org.scalatest.Assertions.fail$(Assertions.scala:1103) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at > kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at Caused by: > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch (producerId=0, epoch=0) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`
hachikuji merged pull request #9762: URL: https://github.com/apache/kafka/pull/9762 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10862) kafak stream consume from the earliest by default
[ https://issues.apache.org/jira/browse/KAFKA-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250720#comment-17250720 ] A. Sophie Blee-Goldman commented on KAFKA-10862: Hey [~rebekkaxi], you're looking at the default for two different things. The docs you linked to are referring to the consumer client configs, which do default to "latest". You can find the config definition in [ConsumerConfig.|https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L434] The code you're looking at corresponds to the configs for Kafka Streams. Internally Streams embeds a consumer client, and overrides the default auto.offset.reset policy to "earliest". You can find the docs for Streams-specific configs and how they override the client defaults [here|https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values]. So the default for Kafka Streams is indeed different than the default reset policy for a plain consumer app. The reason being that Streams generally wants to start from the beginning in order to process all available data. Why the default for the plain consumer is "latest" is another question :) > kafak stream consume from the earliest by default > - > > Key: KAFKA-10862 > URL: https://issues.apache.org/jira/browse/KAFKA-10862 > Project: Kafka > Issue Type: Bug > Components: config, consumer >Affects Versions: 2.3.1 > Environment: MAC >Reporter: Yuexi Liu >Priority: Major > > on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows > auto.offset.reset is by default using latest, but from code, it is not > > [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884] > and when I create a kafka stream without specified offset reset policy, it > consumed from the beginning -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10862) kafak stream consume from the earliest by default
[ https://issues.apache.org/jira/browse/KAFKA-10862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuexi Liu updated KAFKA-10862: -- Description: on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows auto.offset.reset is by default using latest, but from code, it is not [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884] and when I create a kafka stream without specified offset reset policy, it consumed from the beginning was: on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows auto.offset.reset is by default using latest, but from code, it is not [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884] and when I create a kafka stream without specified offset, it consumed from the beginning > kafak stream consume from the earliest by default > - > > Key: KAFKA-10862 > URL: https://issues.apache.org/jira/browse/KAFKA-10862 > Project: Kafka > Issue Type: Bug > Components: config, consumer >Affects Versions: 2.3.1 > Environment: MAC >Reporter: Yuexi Liu >Priority: Major > > on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows > auto.offset.reset is by default using latest, but from code, it is not > > [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884] > and when I create a kafka stream without specified offset reset policy, it > consumed from the beginning -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10862) kafak stream consume from the earliest by default
Yuexi Liu created KAFKA-10862: - Summary: kafak stream consume from the earliest by default Key: KAFKA-10862 URL: https://issues.apache.org/jira/browse/KAFKA-10862 Project: Kafka Issue Type: Bug Components: config, consumer Affects Versions: 2.3.1 Environment: MAC Reporter: Yuexi Liu on [https://kafka.apache.org/documentation/#auto.offset.reset] it shows auto.offset.reset is by default using latest, but from code, it is not [https://github.com/apache/kafka/blob/72918a98161ba71ff4fa8116fdf8ed02b09a0580/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L884] and when I create a kafka stream without specified offset, it consumed from the beginning -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`
guozhangwang commented on pull request #9762: URL: https://github.com/apache/kafka/pull/9762#issuecomment-747137717 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on pull request #9485: URL: https://github.com/apache/kafka/pull/9485#issuecomment-747116007 Thanks @rajinisivaram for the NIT and test structure suggestions. I've adopted those and re-struct the test classes. Please let me know if we are good to merge now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544713670 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala ## @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.auth.SimpleAclAuthorizer +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl.AclOperation._ +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.PatternType.LITERAL +import org.apache.kafka.common.resource.ResourceType._ +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.annotation.nowarn +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerWrapperTest extends ZooKeeperTestHarness { + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer) + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer) + private var resource: ResourcePattern = _ + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( +newRequestContext, addAcls, authorizeByResourceType, removeAcls) + + class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) { +override def equals(o: scala.Any): Boolean = false + } + + @Before + @nowarn("cat=deprecation") + override def setUp(): Unit = { +super.setUp() + +val props = TestUtils.createBrokerConfig(0, zkConnect) + +props.put(AclAuthorizer.SuperUsersProp, superUsers) +config = KafkaConfig.fromProps(props) +wrappedSimpleAuthorizer.configure(config.originals) + +props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") +config = KafkaConfig.fromProps(props) +wrappedSimpleAuthorizerAllowEveryone.configure(config.originals) + +resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL) +zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, + Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest") + } + + @After + override def tearDown(): Unit = { +val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone) +authorizers.foreach(a => { + a.close() +}) +zooKeeperClient.close() +super.tearDown() + } + + @Test + def testAuthorizeByResourceTypeMultipleAddAndRemove(): Unit = { + authorizerTestFactory.testAuthorizeByResourceTypeMultipleAddAndRemove(wrappedSimpleAuthorizer) + } Review comment: commit 092fec70a9547ec07cba999e77be1c0cf79fa275 commit e5e3d18f57ab22df20133f9841905af384d9b641 These two commits are condensing the class methods and members into the BaseAuthorizerTest. In BaseAuthorizerTest, the only abstract method is an authorizer provider. After overriding the provider, those test cases in it are sufficient to run. Now the test code looks much cleaner. If the changes look too
[GitHub] [kafka] hachikuji opened a new pull request #9762: KAFKA-10861; Fix race condition in flaky test `testFencingOnSendOffsets`
hachikuji opened a new pull request #9762: URL: https://github.com/apache/kafka/pull/9762 I wasn't able to reproduce the failure locally, but it looks like there is a race condition with the sending of the records in the first producer. The test case assumes that these records have been completed before the call to `sendOffsetsToTransaction`, but they very well might not be. It is even possible for the writes from the second producer to arrive first which would then result in the test failure that we are seeing. The solution is to force the send with `flush()`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`
Jason Gustafson created KAFKA-10861: --- Summary: Flaky test `TransactionsTest.testFencingOnSendOffsets` Key: KAFKA-10861 URL: https://issues.apache.org/jira/browse/KAFKA-10861 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson {code} org.scalatest.exceptions.TestFailedException: Got an unexpected exception from a fenced producer. at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1107) at org.scalatest.Assertions.fail$(Assertions.scala:1103) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch (producerId=0, epoch=0) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10861) Flaky test `TransactionsTest.testFencingOnSendOffsets`
[ https://issues.apache.org/jira/browse/KAFKA-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-10861: --- Assignee: Jason Gustafson > Flaky test `TransactionsTest.testFencingOnSendOffsets` > -- > > Key: KAFKA-10861 > URL: https://issues.apache.org/jira/browse/KAFKA-10861 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > {code} > org.scalatest.exceptions.TestFailedException: Got an unexpected exception > from a fenced producer. > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1107) > at org.scalatest.Assertions.fail$(Assertions.scala:1103) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at > kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:373) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at Caused by: > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch (producerId=0, epoch=0) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8162: KAFKA-9595: switch usage of `alterConfigs` to `incrementalAlterConfigs` for kafka-configs tool
hachikuji commented on pull request #8162: URL: https://github.com/apache/kafka/pull/8162#issuecomment-747090036 @agam Going to close this for now. We can reopen once we are ready to pick it back up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji closed pull request #8162: KAFKA-9595: switch usage of `alterConfigs` to `incrementalAlterConfigs` for kafka-configs tool
hachikuji closed pull request #8162: URL: https://github.com/apache/kafka/pull/8162 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-10140) Incremental config api excludes plugin config changes
[ https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reopened KAFKA-10140: - I resolved this by mistake, reopening now > Incremental config api excludes plugin config changes > - > > Key: KAFKA-10140 > URL: https://issues.apache.org/jira/browse/KAFKA-10140 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Critical > Fix For: 2.7.0 > > > I was trying to alter the jmx metric filters using the incremental alter > config api and hit this error: > ``` > java.util.NoSuchElementException: key not found: metrics.jmx.blacklist > at scala.collection.MapLike.default(MapLike.scala:235) > at scala.collection.MapLike.default$(MapLike.scala:234) > at scala.collection.AbstractMap.default(Map.scala:65) > at scala.collection.MapLike.apply(MapLike.scala:144) > at scala.collection.MapLike.apply$(MapLike.scala:143) > at scala.collection.AbstractMap.apply(Map.scala:65) > at kafka.server.AdminManager.listType$1(AdminManager.scala:681) > at > kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693) > at > kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687) > at > kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:154) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589) > at > kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698) > at kafka.server.KafkaApis.handle(KafkaApis.scala:188) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78) > at java.base/java.lang.Thread.run(Thread.java:834) > ``` > It looks like we are only allowing changes to the keys defined in > `KafkaConfig` through this API. This excludes config changes to any plugin > components such as `JmxReporter`. > Note that I was able to use the regular `alterConfig` API to change this > config. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10140) Incremental config api excludes plugin config changes
[ https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10140: Fix Version/s: (was: 2.7.0) > Incremental config api excludes plugin config changes > - > > Key: KAFKA-10140 > URL: https://issues.apache.org/jira/browse/KAFKA-10140 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Critical > > I was trying to alter the jmx metric filters using the incremental alter > config api and hit this error: > ``` > java.util.NoSuchElementException: key not found: metrics.jmx.blacklist > at scala.collection.MapLike.default(MapLike.scala:235) > at scala.collection.MapLike.default$(MapLike.scala:234) > at scala.collection.AbstractMap.default(Map.scala:65) > at scala.collection.MapLike.apply(MapLike.scala:144) > at scala.collection.MapLike.apply$(MapLike.scala:143) > at scala.collection.AbstractMap.apply(Map.scala:65) > at kafka.server.AdminManager.listType$1(AdminManager.scala:681) > at > kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693) > at > kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687) > at > kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:154) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589) > at > kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698) > at kafka.server.KafkaApis.handle(KafkaApis.scala:188) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78) > at java.base/java.lang.Thread.run(Thread.java:834) > ``` > It looks like we are only allowing changes to the keys defined in > `KafkaConfig` through this API. This excludes config changes to any plugin > components such as `JmxReporter`. > Note that I was able to use the regular `alterConfig` API to change this > config. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
vvcephei merged pull request #9708: URL: https://github.com/apache/kafka/pull/9708 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread
ableegoldman commented on a change in pull request #9695: URL: https://github.com/apache/kafka/pull/9695#discussion_r544645060 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int * @return name of the added stream thread or empty if a new stream thread could not be added */ public Optional addStreamThread() { -synchronized (changeThreadCount) { -if (isRunningOrRebalancing()) { -final int threadIdx = getNextThreadIndex(); -final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +if (isRunningOrRebalancing()) { +final int threadIdx; +final long cacheSizePerThread; +synchronized (changeThreadCount) { +threadIdx = getNextThreadIndex(); +cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); resizeThreadCache(cacheSizePerThread); -final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); -synchronized (stateLock) { -if (isRunningOrRebalancing()) { -streamThread.start(); -return Optional.of(streamThread.getName()); -} else { -streamThread.shutdown(); +} +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); + +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +streamThread.shutdown(); +threads.remove(streamThread); +resizeThreadCache(getCacheSizePerThread(threads.size())); +return Optional.empty(); +} +} +} +return Optional.empty(); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread() { +if (isRunningOrRebalancing()) { +for (final StreamThread streamThread : threads) { Review comment: It wouldn't be guaranteed to catch this, but either way I think we should have a test that starts up two threads which both try to `removeThread()` at the same time (and maybe similarly for `addStreamThread`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9695: KAFKA-10500: Remove thread
ableegoldman commented on a change in pull request #9695: URL: https://github.com/apache/kafka/pull/9695#discussion_r544643577 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -924,22 +924,64 @@ private StreamThread createStreamThread(final long cacheSizePerThread, final int * @return name of the added stream thread or empty if a new stream thread could not be added */ public Optional addStreamThread() { -synchronized (changeThreadCount) { -if (isRunningOrRebalancing()) { -final int threadIdx = getNextThreadIndex(); -final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +if (isRunningOrRebalancing()) { +final int threadIdx; +final long cacheSizePerThread; +synchronized (changeThreadCount) { +threadIdx = getNextThreadIndex(); +cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); resizeThreadCache(cacheSizePerThread); -final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); -synchronized (stateLock) { -if (isRunningOrRebalancing()) { -streamThread.start(); -return Optional.of(streamThread.getName()); -} else { -streamThread.shutdown(); +} +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); + +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +streamThread.shutdown(); +threads.remove(streamThread); +resizeThreadCache(getCacheSizePerThread(threads.size())); +return Optional.empty(); +} +} +} +return Optional.empty(); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread() { +if (isRunningOrRebalancing()) { +for (final StreamThread streamThread : threads) { Review comment: Ah I missed the previous PR where you changed the list type. Ok I'm not that familiar with `Collections.synchronizedList` but I'm still worried we may not be safe with this. From the [javadocs](https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)): ``` It is imperative that the user manually synchronize on the returned list when iterating over it: ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
jolshan commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r544634608 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) +else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { +val log = localLog(topicPartition).get +// Check if the topic ID is in memory, if not, it must be new to the broker. +// If the broker previously wrote it to file, it would be recovered on restart after failure. +// If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. +// If the topic ID in the log does not match the one in the request, the broker's topic must be stale. +if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + +s" match the topic Id provided in the request: " + +s"${topicIds.get(topicPartition.topic).toString}.") +} else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) +log.topicId = topicIds.get(topicPartition.topic) + } else { +stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: So I thought through these cases some more and realized that the metadata file will fail to open if formatted incorrectly. So the only case where there could be data written to the file is if the ID is the zero UUID. So I decided to just fail on reading the file if the zero ID is provided. (We will never write zero ID to file.) The rest of this cleaned up pretty nicely. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure
hachikuji commented on a change in pull request #9676: URL: https://github.com/apache/kafka/pull/9676#discussion_r544611285 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1556,6 +1572,8 @@ class Log(@volatile private var _dir: File, done = fetchDataInfo != null || segmentEntry == null } +checkForLogDirFailure() Review comment: Seems more intuitive to move this check before the segment read. I don't think we can totally avoid race conditions with a failure in append since we don't have the lock here. Perhaps we could even move this check to `maybeHandleIOException` so that we handle all cases? ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -2818,6 +2818,22 @@ class LogTest { new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0) } + @Test + def testAppendToOrReadFromLogInFailedLogDir(): Unit = { +val log = createLog(logDir, LogConfig()) +log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) +assertEquals(0, readLog(log, 0, 4096).records.records.iterator.next().offset) +try { + log.maybeHandleIOException("Simulating failed log dir") { Review comment: Another way to trigger an IO exception is to rename the log file. This trick is used in `testAppendToTransactionIndexFailure`. Then we don't need to expose `maybeHandleIOException` for testing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r544593141 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable
[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r544593141 ## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable
[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees
[ https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250598#comment-17250598 ] Kyle Ambroff-Kao commented on KAFKA-10853: -- Thanks for the reply Jun! {quote}As for your proposal of using largestAckedOffset, the challenge is that we allow producers with different ack mode even on the same topic. {quote} Hrm, yeah that's a great point. I think for our internal use case the largestAckedOffset change would be OK since a producer that doesn't use acks: all would be a configuration bug. But it's true that one mis-configured client would ruin it for everyone. I guess my proposal would have to either require some strict ordering of acknowledgements to producers, blocking acknowledgment of a acks=1 producerequest until a acks=all request that came before it completes. That's obviously terrible. Or I guess it could be done with a new topic configuration that put the topic into durable mode, enforcing acks=all reguardless of what the ProduceRequest says. That sounds less terrible. {quote}It sounds like you already have a separate mechanism to detect the slowness of a broker based on request.max.local.time.ms. So, another approach could be communicating this info to the controller so that it can choose to move the leader off the slow broker. It seems that this could solve the problem if request.max.local.time.ms < replica.lag.time.max.ms. {quote} Hrm, that's an interesting proposal. I like this idea. We could change our code to, instead of halt if request.max.local.time.ms is exceeded, just stop accepting client traffic (except for follower fetch requests), and signal to the controller that leadership should be shifted elsewhere. That sounds really nice. I'm nervous about the idea of changing out replica.lag.time.max.ms though. We have had that set to 10 seconds for many years now and I'm nervous about increasing it. 10 seconds already feels like it is too long. And decreasing replica.lag.time.max.ms seems scary too because we don't want false positives. We've set it to 60 seconds through experience and trial and error, since we generally don't see GC pauses that long, but we do see IOPS pause for that long when we have a real storage issue. The other problem is that I think handling failures this way only works for certain failure modes, like a really slow or failing disk. It doesn't handle network partitions at all, since we may not be able to reach the controller. Or the pause may be caused by something else that prevents any thread from being scheduled. I need to think about this for a bit. > Replication protocol deficiencies with workloads requiring high durability > guarantees > - > > Key: KAFKA-10853 > URL: https://issues.apache.org/jira/browse/KAFKA-10853 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Kyle Ambroff-Kao >Priority: Major > > *tl;dr: The definition of ISR and the consistency model from the perspective > of the producer seem a bit out of sync* > We have many systems in production that trade off availability in order to > provide stronger consistency guarantees. Most of these configurations look > like this: > Topic configuration: > * replication factor 3 > * min.insync.replicas=2 > * unclean.leader.election.enable=false > Producer configuration: > * acks=all > Broker configuration: > * replica.lag.time.max.ms=1 > So the goal here is to reduce the chance of ever dropping a message that the > leader has acknowledged to the producer. > This works great, except that we've found some situations in production where > we are forced to enable unclean leader election to recover, which we never > want to do. These situations all seem totally avoidable with some small > tweaks to the replication protocol. > *A scenario we've seen many times* > The following sequence of events are in time order: A replica set for a > topic-partition TP with leader L and replicas R1 and R2. All three replicas > are in ISR. > # Producer sends ProduceRequest R with acks=all that contains a message > batch to the leader L. > # L receives R and appends the batch it contains to the active segment of TP > but does not ack to the producer yet because the request was acks=all > # A storage fault occurs on L which makes all IOPS take a long time but > doesn't cause a hard failure. > # R1 and R2 send follower fetch requests to L which are infinitely delayed > due to the storage fault on L. > # 10 seconds after appending the batch and appending it to the log, L > shrinks the ISR, removing R1 and R2. This is because ISR is defined as at > most replica.lag.time.max.ms milliseconds behind the log append time of the > leader end offset. The leader end offset is a message
[GitHub] [kafka] hachikuji commented on a change in pull request #9713: KAFKA-10825 ZK ISR manager
hachikuji commented on a change in pull request #9713: URL: https://github.com/apache/kafka/pull/9713#discussion_r544581576 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1374,47 +1314,28 @@ class Partition(val topicPartition: TopicPartition, } } - private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = { -val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.toList, zkVersion) -val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr) -if (zkVersionOpt.isDefined) { - isrChangeListener.markShrink() -} -maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt) - } - - private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: Option[Int]): Unit = { -zkVersionOpt match { - case Some(newVersion) => -isrState = CommittedIsr(isr) -zkVersion = newVersion -info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion)) - - case None => -info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating ISR") -isrChangeListener.markFailed() -} - } - private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = { val isrToSend: Set[Int] = proposedIsrState match { case PendingExpandIsr(isr, newInSyncReplicaId) => isr + newInSyncReplicaId case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- outOfSyncReplicaIds case state => +isrChangeListener.markFailed() throw new IllegalStateException(s"Invalid state $state for `AlterIsr` request for partition $topicPartition") } val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, isrToSend.toList, zkVersion) -val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState)) +val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState), controllerEpoch) -if (!alterIsrManager.enqueue(alterIsrItem)) { - isrChangeListener.markFailed() - throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state " + -s"$newLeaderAndIsr for partition $topicPartition") -} - -isrState = proposedIsrState -debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after transition to $proposedIsrState") +alterIsrManager.submit(alterIsrItem, (wasSubmitted: Boolean) => { + if (wasSubmitted) { +isrState = proposedIsrState Review comment: Could we solve the problem by moving this above the call to `submit`? Then the callback logic should work even if the change is made synchronously. In the case that the request fails to be submitted, then we can reset to the previous state. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees
[ https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250568#comment-17250568 ] Jun Rao commented on KAFKA-10853: - [~ambroff] : Thanks for reporting this. I agree that this is real problem. The mechanism for replica.lag.time.max.ms works well when the slowness is in the followers, but not as well when the slowness is in the leader. As for your proposal of using largestAckedOffset, the challenge is that we allow producers with different ack mode even on the same topic. It sounds like you already have a separate mechanism to detect the slowness of a broker based on request.max.local.time.ms. So, another approach could be communicating this info to the controller so that it can choose to move the leader off the slow broker. It seems that this could solve the problem if request.max.local.time.ms < replica.lag.time.max.ms. > Replication protocol deficiencies with workloads requiring high durability > guarantees > - > > Key: KAFKA-10853 > URL: https://issues.apache.org/jira/browse/KAFKA-10853 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Kyle Ambroff-Kao >Priority: Major > > *tl;dr: The definition of ISR and the consistency model from the perspective > of the producer seem a bit out of sync* > We have many systems in production that trade off availability in order to > provide stronger consistency guarantees. Most of these configurations look > like this: > Topic configuration: > * replication factor 3 > * min.insync.replicas=2 > * unclean.leader.election.enable=false > Producer configuration: > * acks=all > Broker configuration: > * replica.lag.time.max.ms=1 > So the goal here is to reduce the chance of ever dropping a message that the > leader has acknowledged to the producer. > This works great, except that we've found some situations in production where > we are forced to enable unclean leader election to recover, which we never > want to do. These situations all seem totally avoidable with some small > tweaks to the replication protocol. > *A scenario we've seen many times* > The following sequence of events are in time order: A replica set for a > topic-partition TP with leader L and replicas R1 and R2. All three replicas > are in ISR. > # Producer sends ProduceRequest R with acks=all that contains a message > batch to the leader L. > # L receives R and appends the batch it contains to the active segment of TP > but does not ack to the producer yet because the request was acks=all > # A storage fault occurs on L which makes all IOPS take a long time but > doesn't cause a hard failure. > # R1 and R2 send follower fetch requests to L which are infinitely delayed > due to the storage fault on L. > # 10 seconds after appending the batch and appending it to the log, L > shrinks the ISR, removing R1 and R2. This is because ISR is defined as at > most replica.lag.time.max.ms milliseconds behind the log append time of the > leader end offset. The leader end offset is a message that has not been > replicated yet. > The storage fault example in step 3 could easily be another kind of fault. > Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or > the producer. > The producer never receives acknowledgement of the ProduceRequest because the > min.insync.replicas constraint was never satisfied. So in terms of data > consistency, everything is working fine. > The problem is recovering from this situation. If the fault on L is not a > temporary blip, then L needs to be replaced. But since L shrunk the ISR, the > only way that leadership can move to either R1 or R2 is to set > unclean.leader.election.enable=true. > This works but it is a potentially unsafe way to recover and move leadership. > It would be better to have other options. > *Recovery could be automatic in this scenario.* > If you think about it, from the perspective of the producer, the write was > not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it > should actually be totally safe for leadership to transition to either R1 or > R2. > It seems that the producer and the leader don't have fully compatible > definitions for what it means for the replica set to be in-sync. If the > leader L used different rules for defining ISR, it could allow self-healing > in this or similar scenarios, since the ISR would not shrink. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10860) JmxTool fails with NPE when object-name contains a wildcard
Bob Barrett created KAFKA-10860: --- Summary: JmxTool fails with NPE when object-name contains a wildcard Key: KAFKA-10860 URL: https://issues.apache.org/jira/browse/KAFKA-10860 Project: Kafka Issue Type: Bug Reporter: Bob Barrett When running JmxTool with a wildcard in the object name, the tool fails with a NullPointerException: {code:java} bin/kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi --object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://localhost:/jmxrmi. Exception in thread "main" java.lang.NullPointerException at kafka.tools.JmxTool$.main(JmxTool.scala:194) at kafka.tools.JmxTool.main(JmxTool.scala) {code} It seems that we never populate the `names` variable when the object name includes a pattern: {code:java} var names: Iterable[ObjectName] = null def namesSet = Option(names).toSet.flatten def foundAllObjects = queries.toSet == namesSet val waitTimeoutMs = 1 if (!hasPatternQueries) { val start = System.currentTimeMillis do { if (names != null) { System.err.println("Could not find all object names, retrying") Thread.sleep(100) } names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala) } while (wait && System.currentTimeMillis - start < waitTimeoutMs && !foundAllObjects) } if (wait && !foundAllObjects) { val missing = (queries.toSet - namesSet).mkString(", ") System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing") System.err.println("Exiting.") sys.exit(1) } val numExpectedAttributes: Map[ObjectName, Int] = if (!attributesWhitelistExists) names.map{name: ObjectName => val mbean = mbsc.getMBeanInfo(name) (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap else { if (!hasPatternQueries) names.map{name: ObjectName => val mbean = mbsc.getMBeanInfo(name) val attributes = mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)) val expectedAttributes = attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]] .filter(attr => attributesWhitelist.get.contains(attr.getName)) (name, expectedAttributes.size)}.toMap.filter(_._2 > 0) else queries.map((_, attributesWhitelist.get.length)).toMap } {code} We need to add logic to query the object names that match the pattern when a pattern is part of the input. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] scott-hendricks commented on a change in pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.
scott-hendricks commented on a change in pull request #9736: URL: https://github.com/apache/kafka/pull/9736#discussion_r544547465 ## File path: tools/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampRandomPayloadGenerator.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.common.utils.Time; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; + +/** + * This class behaves identically to TimestampRandomPayloadGenerator, except the message size follows a gaussian + * distribution. + * + * This should be used in conjunction with TimestampRecordProcessor in the Consumer to measure true end-to-end latency + * of a system. + * + * `messageSizeAverage` - The average size in bytes of each message. + * `messageSizeDeviation` - The standard deviation to use when calculating message size. + * `timestampBytes` - The amount of bytes to use for the timestamp. Usually 8. + * `messagesUntilSizeChange` - The number of messages to keep at the same size. + * `seed` - Used to initialize Random() to remove some non-determinism. + * + * Here is an example spec: + * + * { + *"type": "gaussianTimestampRandom", + *"messageSizeAverage": 512, + *"messageSizeDeviation": 100, + *"timestampBytes": 8, + *"messagesUntilSizeChange": 100 + * } + * + * This will generate messages on a gaussian distribution with an average size each 512-bytes and the first 8 bytes + * encoded with the timestamp. The message sizes will have a standard deviation of 100 bytes, and the size will only + * change every 100 messages. The distribution of messages will be as follows: + * + *The average size of the messages are 512 bytes. + *~68% of the messages are between 412 and 612 bytes + *~95% of the messages are between 312 and 712 bytes + *~99% of the messages are between 212 and 812 bytes + */ + +public class GaussianTimestampRandomPayloadGenerator implements PayloadGenerator { +private final int messageSizeAverage; +private final int messageSizeDeviation; +private final int timestampBytes; +private final int messagesUntilSizeChange; +private final long seed; + +private final Random random = new Random(); +private final ByteBuffer buffer; + +private int messageTracker = 0; +private int messageSize = 0; + +@JsonCreator +public GaussianTimestampRandomPayloadGenerator(@JsonProperty("messageSizeAverage") int messageSizeAverage, + @JsonProperty("messageSizeDeviation") int messageSizeDeviation, + @JsonProperty("timestampBytes") int timestampBytes, Review comment: Removed timestampBytes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544547081 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -116,38 +122,88 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); for (int aclId = 0; aclId < aclCount; aclId++) { -AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId, -"*", AclOperation.READ, AclPermissionType.ALLOW); -entries.add(new AclEntry(ace)); +// The principle in the request context we are using Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544546946 ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTestFactory.scala ## @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} +import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE} +import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} +import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.RequestContext +import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} +import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE +import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, TRANSACTIONAL_ID} +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.server.authorizer.Authorizer +import org.junit.Assert.{assertFalse, assertTrue} + +class AuthorizerTestFactory(val newRequestContext3: (KafkaPrincipal, InetAddress, ApiKeys) => RequestContext, +val addAcls: (Authorizer, Set[AccessControlEntry], ResourcePattern) => Unit, +val authorizeByResourceType: (Authorizer, RequestContext, AclOperation, ResourceType) => Boolean, +val removeAcls: (Authorizer, Set[AccessControlEntry], ResourcePattern) => Unit) { + def newRequestContext(kafkaPrincipal: KafkaPrincipal, inetAddress: InetAddress): RequestContext = +newRequestContext3(kafkaPrincipal, inetAddress, ApiKeys.PRODUCE) + + def testAuthorizeByResourceTypeMultipleAddAndRemove(authorizer: Authorizer): Unit = { +val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") +val host1 = InetAddress.getByName("192.168.1.1") +val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), LITERAL) +val denyRead = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, DENY) +val allowRead = new AccessControlEntry(user1.toString, host1.getHostAddress, READ, ALLOW) +val u1h1Context = newRequestContext(user1, host1) + +for (_ <- 1 to 10) { + assertFalse("User1 from host1 should not have READ access to any topic when no ACL exists", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + + addAcls(authorizer, Set(allowRead), resource1) + assertTrue("User1 from host1 now should have READ access to at least one topic", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + + for (_ <- 1 to 10) { +addAcls(authorizer, Set(denyRead), resource1) +assertFalse("User1 from host1 now should not have READ access to any topic", + authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + +removeAcls(authorizer, Set(denyRead), resource1) +addAcls(authorizer, Set(allowRead), resource1) +assertTrue("User1 from host1 now should have READ access to at least one topic", + authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) + } + + removeAcls(authorizer, Set(allowRead), resource1) + assertFalse("User1 from host1 now should not have READ access to any topic", +authorizeByResourceType(authorizer, u1h1Context, READ, ResourceType.TOPIC)) +} + } + + def testAuthorizeByResourceTypeIsolationUnrelatedDenyWontDominateAllow(authorizer: Authorizer): Unit = { +val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user1") +val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user2") +val host1 = InetAddress.getByName("192.168.1.1") +val host2 = InetAddress.getByName("192.168.1.2") +val resource1 = new ResourcePattern(TOPIC, "sb1" + UUID.randomUUID(), LITERAL) +val resource2 = new ResourcePattern(TOPIC, "sb2" +
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544545938 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, +new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), +0, false, false))) Review comment: commit ec80dc4e55758d83835f3ecde381a988d6dd4779 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +310,137 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName) + +if (isSuperUser(principal)) + return AuthorizationResult.ALLOWED + +val principalStr = principal.toString + +val host = requestContext.clientAddress().getHostAddress +val action = new Action(op, new ResourcePattern(resourceType, "NONE", PatternType.UNKNOWN), 0, true, true) + +val denyLiterals = matchingResources( + principalStr, host, op, AclPermissionType.DENY, resourceType, PatternType.LITERAL) + +if (denyAll(denyLiterals)) { + logAuditMessage(requestContext, action, false) Review comment: commit
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
rajinisivaram commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544515142 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: Yes, that was my suggestion too, but my wording wasn't right. I will reword that to avoid confusion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
jolshan commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r544483664 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) +else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { +val log = localLog(topicPartition).get +// Check if the topic ID is in memory, if not, it must be new to the broker. +// If the broker previously wrote it to file, it would be recovered on restart after failure. +// If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. +// If the topic ID in the log does not match the one in the request, the broker's topic must be stale. +if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + +s" match the topic Id provided in the request: " + +s"${topicIds.get(topicPartition.topic).toString}.") +} else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) +log.topicId = topicIds.get(topicPartition.topic) + } else { +stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: I might think about making this code cleaner in general to avoid so many nested if statements This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
jolshan commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r544483238 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) +else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { +val log = localLog(topicPartition).get +// Check if the topic ID is in memory, if not, it must be new to the broker. +// If the broker previously wrote it to file, it would be recovered on restart after failure. +// If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. +// If the topic ID in the log does not match the one in the request, the broker's topic must be stale. +if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + +s" match the topic Id provided in the request: " + +s"${topicIds.get(topicPartition.topic).toString}.") +} else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) +log.topicId = topicIds.get(topicPartition.topic) + } else { +stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: Oops. I think I cleaned up this block and deleted something. There should be a check if log.topicId.equals(id). If so, then the file exists and we shouldn't go in to the block that says "// There is not yet a topic ID stored in the log." I should also fix the topicIds.get(topicPartition.topic) above and replace with id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
jolshan commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: So it seems like there is some logic to remove the partition states of deleted topics from the MetadataSnapshot. Would we want to do something similar there but with topic Ids? Apologies if I'm missing something This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
jolshan commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: So it seems like there is some logic to remove the partition states of deleted topics from the MetadataSnapshot. Would we want to do something similar there? Apologies if I'm missing something This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bertber opened a new pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest
bertber opened a new pull request #9761: URL: https://github.com/apache/kafka/pull/9761 I made a test for ByteBufferInputStream in the ByteBufferLogInputStreamTest. First, I add a ByteBuffer that it's not empty to the ByteBufferInputStream, in order to verify it. After that, I try to use ByteBufferInputStream's read function and check return value whether it's correct. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10759) ARM support for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-10759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250438#comment-17250438 ] Jun Rao commented on KAFKA-10759: - [~xiaopenglei]: Thanks for the jira. Perhaps we can file an Apache infra ticket to see if ASF jenkins supports or plans to support ARM based servers. > ARM support for Kafka > - > > Key: KAFKA-10759 > URL: https://issues.apache.org/jira/browse/KAFKA-10759 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: PengLei >Priority: Major > Attachments: build_output.log, run_test_output.log > > > ARM support for Kafka. > I tried to deploy the Kafka cluster on the ARM server, but unfortunately I > did not find the official ARM release for Kafka. I think more and more > people will try the same thing as I do. > Now the CI of kafka (in github) is handled by jenkins-ci. While the test is > running under x86 ARCH, the arm ARCH is missing. This leads an problem that > we don't have a way to test every pull request that if it'll break the kafka > deployment on arm or not. Similarly, we cannot provide the ARM release > package without the ARM CI. > If Apache Kafka community has interested with it, I can help for the > integration. > This is the umbrella issue to track the efforts to make Kafka run on ARM > processors. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
jolshan commented on pull request #9626: URL: https://github.com/apache/kafka/pull/9626#issuecomment-746586891 @rajinisivaram Yup, will look at the rebase next. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-746543315 @mimaison thanks so much for all your efforts on reviewing. Really appreciated if you may have time before end of this year to do 1-2 final reviews to merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250410#comment-17250410 ] Justin Jack commented on KAFKA-7641: stanislavkozlovski commented on pull request #6163: KAFKA-7641: Introduce "consumer.group.max.size" config to limit consumer group sizes URL: https://github.com/apache/kafka/pull/6163 This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE. Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error. In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced. I have added two integration tests for both scenarios - a member joining an already-full group and a rolling restart with a new config Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) for more details please visit our [online manga reading directory|https://mangapan.com/] > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > Labels: kip > Fix For: 2.2.0 > > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > KIP-389: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10798) Failed authentication delay doesn't work with some SASL authentication failures
[ https://issues.apache.org/jira/browse/KAFKA-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-10798: --- Fix Version/s: 2.6.2 2.7.1 > Failed authentication delay doesn't work with some SASL authentication > failures > --- > > Key: KAFKA-10798 > URL: https://issues.apache.org/jira/browse/KAFKA-10798 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > KIP-306 introduced the config `connection.failed.authentication.delay.ms` to > delay connection closing on brokers for failed authentication to limit the > rate of retried authentications from clients in order to avoid excessive > authentication load on brokers from failed clients. We rely on authentication > failure response to be delayed in this case to prevent clients from detecting > the failure and retrying sooner. > SaslServerAuthenticator delays response for SaslAuthenticationException, but > not for SaslException, even though SaslException is also converted into > SaslAuthenticationException and processed as an authentication failure by > both server and clients. As a result, connection delay is not applied in many > scenarios like SCRAM authentication failures. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
vvcephei commented on pull request #9708: URL: https://github.com/apache/kafka/pull/9708#issuecomment-746468357 Test failure was unrelated: Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] govi20 commented on pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest
govi20 commented on pull request #9760: URL: https://github.com/apache/kafka/pull/9760#issuecomment-746391613 Hi @chia7712 , can you please review this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] govi20 opened a new pull request #9760: KAFKA-10850: Use 'Int.box' to replace deprecated 'new Integer' from BrokerToControllerRequestThreadTest
govi20 opened a new pull request #9760: URL: https://github.com/apache/kafka/pull/9760 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9748: MINOR: Simplify ApiKeys by relying on ApiMessageType
ijuma merged pull request #9748: URL: https://github.com/apache/kafka/pull/9748 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9748: MINOR: Simplify ApiKeys by relying on ApiMessageType
ijuma commented on pull request #9748: URL: https://github.com/apache/kafka/pull/9748#issuecomment-746383611 JDK8 and 11 passed, 15 had one flaky unrelated failure: > kafka.server.ClientQuotasRequestTest.testAlterIpQuotasRequest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9759: HOTFIX: Access apiversions data via method not field
chia7712 merged pull request #9759: URL: https://github.com/apache/kafka/pull/9759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9759: HOTFIX: Access apiversions data via method not field
chia7712 commented on pull request #9759: URL: https://github.com/apache/kafka/pull/9759#issuecomment-746365374 ``` 22:18:23 Use '--warning-mode all' to show the individual deprecation warnings. 22:18:23 See https://docs.gradle.org/6.7.1/userguide/command_line_interface.html#sec:command_line_warnings 22:18:23 22:18:23 BUILD SUCCESSFUL in 9m 3s 22:18:23 205 actionable tasks: 171 executed, 34 up-to-date 22:18:23 22:18:23 See the profiling report at: file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9759/build/reports/profile/profile-2020-12-16-14-09-11.html 22:18:23 A fine-grained performance profile is available: use the --scan option. ``` build successful :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client
tombentley commented on pull request #9552: URL: https://github.com/apache/kafka/pull/9552#issuecomment-746347168 https://github.com/apache/kafka/pull/9759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9759: HOTFIX: Access apiversions data via method not field
tombentley commented on pull request #9759: URL: https://github.com/apache/kafka/pull/9759#issuecomment-746344348 cc @chia7712 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #9759: HOTFIX: Access apiversions data via method not field
tombentley opened a new pull request #9759: URL: https://github.com/apache/kafka/pull/9759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client
tombentley commented on pull request #9552: URL: https://github.com/apache/kafka/pull/9552#issuecomment-746338918 @chia7712 I'll get right on it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9552: KAFKA-10656: Log the feature flags received by the client
chia7712 commented on pull request #9552: URL: https://github.com/apache/kafka/pull/9552#issuecomment-746336775 @tombentley https://github.com/apache/kafka/commit/1a10c3445e157da1d2fd670c043f19c385465eb0 changes the access modifier of "data" so this PR breaks the build. Could you file a hot fix for it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leah Thomas resolved KAFKA-10417. - Resolution: Fixed > suppress() with cogroup() throws ClassCastException > --- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Wardha Perinkada Kattu >Assignee: Leah Thomas >Priority: Critical > Labels: kafka-streams > Fix For: 2.8.0, 2.7.1 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( > .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250330#comment-17250330 ] Bill Bejeck commented on KAFKA-10292: - Since this is an outstanding issue, I'm changing the blocker status. Also, I've updated the fix version to 2.8.0. Thanks, Bill > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.8.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10292: Fix Version/s: 2.8.0 > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.8.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10292: Priority: Major (was: Blocker) > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10292: Fix Version/s: (was: 2.7.0) > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information
[ https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10656: --- Fix Version/s: (was: 2.7.0) 2.8.0 > NetworkClient.java: print out the feature flags received at DEBUG level, as > well as the other version information > - > > Key: KAFKA-10656 > URL: https://issues.apache.org/jira/browse/KAFKA-10656 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Tom Bentley >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information
[ https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10656. Fix Version/s: 2.7.0 Resolution: Fixed > NetworkClient.java: print out the feature flags received at DEBUG level, as > well as the other version information > - > > Key: KAFKA-10656 > URL: https://issues.apache.org/jira/browse/KAFKA-10656 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Tom Bentley >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9552: KAFKA-10656: Log the feature flags received by the client
chia7712 merged pull request #9552: URL: https://github.com/apache/kafka/pull/9552 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r544223630 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,134 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * It is important to override this interface default in implementations because + * 1. The interface default iterates all AclBindings multiple times, without any indexing, + *which is a CPU intense work. + * 2. The interface default rebuild several sets of strings, which is a memory intense work. + * 3. The interface default cannot perform the audit logging properly + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + +if (authorize(requestContext, Collections.singletonList(new Action( +AclOperation.READ, +new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL), +0, false, false))) Review comment: Use `logIfAllowed=true` since we are granting access in that case. ## File path: core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala ## @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.net.InetAddress +import java.util.UUID + +import kafka.security.auth.SimpleAclAuthorizer +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.zookeeper.ZooKeeperClient +import org.apache.kafka.common.acl.AclOperation._ +import org.apache.kafka.common.acl._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{RequestContext, RequestHeader} +import org.apache.kafka.common.resource.PatternType.LITERAL +import org.apache.kafka.common.resource.ResourceType._ +import org.apache.kafka.common.resource.{ResourcePattern, ResourceType} +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.authorizer._ +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.annotation.nowarn +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class AuthorizerWrapperTest extends ZooKeeperTestHarness { + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer) + @nowarn("cat=deprecation") + private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer) + private var resource: ResourcePattern = _ + private val superUsers = "User:superuser1; User:superuser2" + private val username = "alice" + private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) + private val requestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) + private var config: KafkaConfig = _ + private var zooKeeperClient: ZooKeeperClient = _ + + private val aclAdded: ArrayBuffer[(Authorizer, Set[AccessControlEntry], ResourcePattern)] = ArrayBuffer() + private val authorizerTestFactory = new AuthorizerTestFactory( +
[GitHub] [kafka] UnityLung commented on pull request #9703: KAFKA-10697 Remove ProduceResponse.responses
UnityLung commented on pull request #9703: URL: https://github.com/apache/kafka/pull/9703#issuecomment-746161565 @ijuma Thanks for your reply. I'm going to do it for remove "responses". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] UnityLung closed pull request #9703: KAFKA-10697 Remove ProduceResponse.responses
UnityLung closed pull request #9703: URL: https://github.com/apache/kafka/pull/9703 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
rajinisivaram commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r544204490 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) +else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { +val log = localLog(topicPartition).get +// Check if the topic ID is in memory, if not, it must be new to the broker. +// If the broker previously wrote it to file, it would be recovered on restart after failure. +// If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. +// If the topic ID in the log does not match the one in the request, the broker's topic must be stale. +if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + +s" match the topic Id provided in the request: " + +s"${topicIds.get(topicPartition.topic).toString}.") +} else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) +log.topicId = topicIds.get(topicPartition.topic) + } else { +stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: Hmm, looking at the conditional statements here, it looks like we would write the file the first time we get here because `log.partitionMetadataFile.get.isEmpty()` and the second time we would print a warning even if the id in the file matches the expected id. Unless I missed something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
[ https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250252#comment-17250252 ] Tom Bentley commented on KAFKA-10859: - I think we should improve the test slightly anyway, to cover the case where the file is created after some number of polls. I'll open a PR after you've merged the fix for KAFKA-10846 since there's a dependency on a method added there. > add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce > the loop count to speedup the test > -- > > Key: KAFKA-10859 > URL: https://issues.apache.org/jira/browse/KAFKA-10859 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Tom Bentley >Priority: Major > Labels: newbie > > FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it > loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
[ https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250250#comment-17250250 ] Chia-Ping Tsai commented on KAFKA-10859: I prefer the small patch:) > add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce > the loop count to speedup the test > -- > > Key: KAFKA-10859 > URL: https://issues.apache.org/jira/browse/KAFKA-10859 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Tom Bentley >Priority: Major > Labels: newbie > > FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it > loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-10788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250249#comment-17250249 ] Sagar Rao commented on KAFKA-10788: --- Sure [~guozhang]. [~rohitdeshaws] here is the pr comment for your reference: [https://github.com/apache/kafka/pull/9508#discussion_r527599966] The idea is to mock the `underlyingStore` field instead of creating an instance of it. MeteredKeyValueStoreTest already does something like this. > Streamlining Tests in CachingInMemoryKeyValueStoreTest > -- > > Key: KAFKA-10788 > URL: https://issues.apache.org/jira/browse/KAFKA-10788 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Rohit Deshpande >Priority: Major > Labels: newbie > > While reviewing, kIP-614, it was decided that tests for > [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] > need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached
[ https://issues.apache.org/jira/browse/KAFKA-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250247#comment-17250247 ] Sagar Rao commented on KAFKA-10652: --- [~hachikuji], no problem. BTW, the queries are old and I have created a PR for this change. You can review whenever you get a chance. > Raft leader should flush accumulated writes after a min size is reached > --- > > Key: KAFKA-10652 > URL: https://issues.apache.org/jira/browse/KAFKA-10652 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Sagar Rao >Priority: Major > > In KAFKA-10601, we implemented linger semantics similar to the producer to > let the leader accumulate a batch of writes before fsyncing them to disk. > Currently the fsync is only based on the linger time, but it would be helpful > to make it size-based as well. In other words, if we accumulate a > configurable N bytes, then we should not wait for linger expiration and > should just fsync immediately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9737: KAFKA-10828: Replacing endorsing with acknowledging for voters
vamossagar12 commented on a change in pull request #9737: URL: https://github.com/apache/kafka/pull/9737#discussion_r544195331 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -50,8 +50,8 @@ protected LeaderState( this.highWatermark = Optional.empty(); for (int voterId : voters) { -boolean hasEndorsedLeader = voterId == localId; -this.voterReplicaStates.put(voterId, new VoterState(voterId, hasEndorsedLeader)); +boolean hasAcknowledgedLeader = voterId == localId; Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10759) ARM support for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-10759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250234#comment-17250234 ] PengLei commented on KAFKA-10759: - On the arm machine, I completed the compilation of Kafka and run all the test cases successfully. For details about the log file, please see the attachment. Environment of the machine: OS : Ubuntu-18.04-Arm JDK: Oracle Jdk1.8 (Arm) Gradle: Gradle6.7.1 > ARM support for Kafka > - > > Key: KAFKA-10759 > URL: https://issues.apache.org/jira/browse/KAFKA-10759 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: PengLei >Priority: Major > Attachments: build_output.log, run_test_output.log > > > ARM support for Kafka. > I tried to deploy the Kafka cluster on the ARM server, but unfortunately I > did not find the official ARM release for Kafka. I think more and more > people will try the same thing as I do. > Now the CI of kafka (in github) is handled by jenkins-ci. While the test is > running under x86 ARCH, the arm ARCH is missing. This leads an problem that > we don't have a way to test every pull request that if it'll break the kafka > deployment on arm or not. Similarly, we cannot provide the ARM release > package without the ARM CI. > If Apache Kafka community has interested with it, I can help for the > integration. > This is the umbrella issue to track the efforts to make Kafka run on ARM > processors. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
[ https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250230#comment-17250230 ] Tom Bentley commented on KAFKA-10859: - [~chia7712] good spot! I can add this to my PR if you like? > add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce > the loop count to speedup the test > -- > > Key: KAFKA-10859 > URL: https://issues.apache.org/jira/browse/KAFKA-10859 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Tom Bentley >Priority: Major > Labels: newbie > > FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it > loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp
rajinisivaram commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544150030 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -223,6 +224,31 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size()) } + @Test + def testTopicIdsInResponse(): Unit = { +val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)) +val topic1 = "topic1" +val topic2 = "topic2" +createTopic(topic1, replicaAssignment) +createTopic(topic2, replicaAssignment) + +// if version < 9, return ZERO_UUID in MetadataResponse +val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer)) +assertEquals(2, resp1.topicMetadata.size) +resp1.topicMetadata.forEach { topicMetadata => + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId()) +} + +// from version 10, UUID will be included in MetadataResponse +val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer)) +assertEquals(2, resp2.topicMetadata.size) +resp2.topicMetadata.forEach { topicMetadata => + assertEquals(Errors.NONE, topicMetadata.error) + assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId()) Review comment: we probably also want to assert that the topic id is not null here (even though we currently never return null). ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala +.map(topicState => (topicState.topicName(), topicState.topicId())) +.filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: When a topic is deleted, brokers process UpdateMetadataRequest and remove deleted topics from their cache. We track deletion state in ZooKeeper and as you mentioned, you can get this information by directly going to ZK in kafka-topics.sh. But we don't retain that information in every broker. I would remove topic id in the code segment just below this when the topic is removed from the MetadataCache since we cannot clearly have a map that keeps growing in brokers. Is there a reason why we would want to retain topic id in every broker even after the topic has been deleted? We can't get this information through existing metadata request from brokers anyway. I guess in future, we can add additional metadata to track deleted topic ids if we wanted to, but for now it seems better to delete topic ids from MetadataCache when we delete the topic from the cache. What do you think? ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ## @@ -320,6 +333,7 @@ public String toString() { return "TopicMetadata{" + "error=" + error + ", topic='" + topic + '\'' + +", topicId='" + topicId.toString() + '\'' + Review comment: nit: toString() unnecessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
[ https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley reassigned KAFKA-10859: --- Assignee: Tom Bentley > add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce > the loop count to speedup the test > -- > > Key: KAFKA-10859 > URL: https://issues.apache.org/jira/browse/KAFKA-10859 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Tom Bentley >Priority: Major > Labels: newbie > > FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it > loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reopened KAFKA-10292: Thanks for the information. Reopen it > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250224#comment-17250224 ] Bruno Cadonna edited comment on KAFKA-10292 at 12/16/20, 10:14 AM: --- [~chia7712] Sorry for the late reply. The fix solves only one of the issues with that test. Actually is solves {{test_all_brokers_bounce}}, but not {{test_broker_type_bounce}}. A fix for the latter is in progress here: https://github.com/apache/kafka/pull/9441 I think we should reopen this ticket. was (Author: cadonna): [~chia7712] Sorry for the late reply. I think the fix solves only one of the issues with that test. Actually is solves {{test_all_brokers_bounce}}, but not {{test_broker_type_bounce}}. A fix for the latter is in progress here: https://github.com/apache/kafka/pull/9441 I think we should reopen this ticket. > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250224#comment-17250224 ] Bruno Cadonna commented on KAFKA-10292: --- [~chia7712] Sorry for the late reply. I think the fix solves only one of the issues with that test. Actually is solves {{test_all_brokers_bounce}}, but not {{test_broker_type_bounce}}. A fix for the latter is in progress here: https://github.com/apache/kafka/pull/9441 I think we should reopen this ticket. > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
Chia-Ping Tsai created KAFKA-10859: -- Summary: add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test Key: KAFKA-10859 URL: https://issues.apache.org/jira/browse/KAFKA-10859 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10859) add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce the loop count to speedup the test
[ https://issues.apache.org/jira/browse/KAFKA-10859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10859: --- Labels: newbie (was: ) > add @Test annotation to FileStreamSourceTaskTest.testInvalidFile and reduce > the loop count to speedup the test > -- > > Key: KAFKA-10859 > URL: https://issues.apache.org/jira/browse/KAFKA-10859 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Major > Labels: newbie > > FileStreamSourceTaskTest.testInvalidFile miss a `@Test` annotation. Also, it > loops 100 times which spend about 2m to complete a unit test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed
chia7712 commented on pull request #9735: URL: https://github.com/apache/kafka/pull/9735#issuecomment-746011953 > I think testNormalLifecycleWithResize() covers that when a buffer resize is required, and testNormalLifecycle() in the case where a buffer resize is not required. Or did I miss a case? Sorry that I neglect ```testNormalLifecycleWithResize```. It is a little hard to review code by iPhone. Will take another look later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250212#comment-17250212 ] dengziming edited comment on KAFKA-10858 at 12/16/20, 9:54 AM: --- [~chia7712] Thank you, I searched the issues but failed to find the other pr, I will spend some time to review [https://github.com/apache/kafka/pull/9641]. was (Author: dengziming): [~chia7712] Thank you, I searched the issues but failed to find the other pr, I will spend some time to review it. > Convert connect protocol header schemas to use generated protocol > - > > Key: KAFKA-10858 > URL: https://issues.apache.org/jira/browse/KAFKA-10858 > Project: Kafka > Issue Type: Improvement > Components: protocol >Reporter: dengziming >Assignee: dengziming >Priority: Major > > manual managed schema code in and ConnectProtocol > IncrementalCooperativeConnectProtocol should be replaced by auto-generated > protocol. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17250212#comment-17250212 ] dengziming commented on KAFKA-10858: [~chia7712] Thank you, I searched the issues but failed to find the other pr, I will spend some time to review it. > Convert connect protocol header schemas to use generated protocol > - > > Key: KAFKA-10858 > URL: https://issues.apache.org/jira/browse/KAFKA-10858 > Project: Kafka > Issue Type: Improvement > Components: protocol >Reporter: dengziming >Assignee: dengziming >Priority: Major > > manual managed schema code in and ConnectProtocol > IncrementalCooperativeConnectProtocol should be replaced by auto-generated > protocol. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10858) Convert connect protocol header schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-10858. Resolution: Duplicate > Convert connect protocol header schemas to use generated protocol > - > > Key: KAFKA-10858 > URL: https://issues.apache.org/jira/browse/KAFKA-10858 > Project: Kafka > Issue Type: Improvement > Components: protocol >Reporter: dengziming >Assignee: dengziming >Priority: Major > > manual managed schema code in and ConnectProtocol > IncrementalCooperativeConnectProtocol should be replaced by auto-generated > protocol. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed
tombentley commented on pull request #9735: URL: https://github.com/apache/kafka/pull/9735#issuecomment-746008036 @chia7712 I think `testNormalLifecycleWithResize()` covers that when a buffer resize is required, and `testNormalLifecycle()` in the case where a buffer resize is not required. Or did I miss a case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed
tombentley commented on pull request #9735: URL: https://github.com/apache/kafka/pull/9735#issuecomment-745997459 Thanks for taking a look @chia7712. I've added a test, but had to expose the buffer size. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10292. Fix Version/s: (was: 2.8.0) 2.7.0 Resolution: Fixed > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Sub-task > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > \} > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > \{ > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > \} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)