[GitHub] [kafka] ijuma merged pull request #9977: MINOR: Update zookeeper to 3.5.9
ijuma merged pull request #9977: URL: https://github.com/apache/kafka/pull/9977 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9977: MINOR: Update zookeeper to 3.5.9
ijuma commented on pull request #9977: URL: https://github.com/apache/kafka/pull/9977#issuecomment-768290763 All JUnit tests passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272942#comment-17272942 ] ASF GitHub Bot commented on KAFKA-8930: --- omkreddy commented on pull request #324: URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768372236 We should also add these docs to `kafka/docs` repo This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565448514 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java ## @@ -68,6 +69,19 @@ protected CreateTopicsResult(Map> fu return futures.get(topic).thenApply(TopicMetadataAndConfig::config); } +/** + * Returns a future that provides topic ID for the topic when the request completes. + * + * If broker version doesn't support replication factor in the response, throw + * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. + * If broker returned an error for topic configs, throw appropriate exception. For example, + * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is thrown if user does not + * have permission to describe topic configs. + */ +public KafkaFuture topicId(String topic) { +return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId); +} + /** * Returns a future that provides number of partitions in the topic when the request completes. Review comment: I thought it was odd too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565448192 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => +val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) Review comment: Good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9964: MINOR: remove duplicate code of serializing auto-generated data
chia7712 commented on a change in pull request #9964: URL: https://github.com/apache/kafka/pull/9964#discussion_r565287398 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ## @@ -265,14 +265,7 @@ public ByteBuffer encode() { "Should never try to encode a SubscriptionInfo with version [" + data.version() + "] > LATEST_SUPPORTED_VERSION [" + LATEST_SUPPORTED_VERSION + "]" ); -} else { -final ObjectSerializationCache cache = new ObjectSerializationCache(); -final ByteBuffer buffer = ByteBuffer.allocate(data.size(cache, (short) data.version())); -final ByteBufferAccessor accessor = new ByteBufferAccessor(buffer); -data.write(accessor, cache, (short) data.version()); -buffer.rewind(); Review comment: In this case, the state of ```flip``` is coincide with ```rewind```. This buffer is filled so the ```limit``` is always equal to ```capability``` even though ```rewind``` does not reset ```limit```. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors
bbejeck commented on a change in pull request #9738: URL: https://github.com/apache/kafka/pull/9738#discussion_r565371239 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala ## @@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "setting a name on a filter processor" should "pass the name to the topology" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() +table + .filter((key, value) => key.equals("a") && value == 1, Named.as("my-name")) + .toStream + .to(sinkTopic) + +import scala.jdk.CollectionConverters._ + +val filterNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3) +filterNode.name() shouldBe "my-name" + } + + "setting a name on a count processor" should "pass the name to the topology" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count(Named.as("my-name")) +table.toStream.to(sinkTopic) + +import scala.jdk.CollectionConverters._ + +val filterNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1) +filterNode.name() shouldBe "my-name" Review comment: shouldn't this line verify the `countNode` name? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 edited a comment on pull request #9939: MINOR: fix @link tag in javadoc
tang7526 edited a comment on pull request #9939: URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328 > @tang7526 How about using import for this issue? Other references in Javadocs use import also. Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdespriee commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors
mdespriee commented on pull request #9738: URL: https://github.com/apache/kafka/pull/9738#issuecomment-768282420 @bbejeck of course. Just rebased on trunk and added a couple of tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901 ] feyman edited comment on KAFKA-9689 at 1/27/21, 2:57 PM: - The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). Current implementation levrages the assignment. 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that was (Author: feyman): The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #9847: KAFKA-10703: Better handling and doc for config defaults of topics
tombentley commented on pull request #9847: URL: https://github.com/apache/kafka/pull/9847#issuecomment-768284951 @chia7712 any chance of a 2nd review here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #9982: MINOR: remove some explicit type argument in generator
dengziming opened a new pull request #9982: URL: https://github.com/apache/kafka/pull/9982 *More detailed description of your change* From `ArrayList newCollection = new ArrayList< Integer >(arrayLength)` to `ArrayList newCollection = new ArrayList< Integer >(arrayLength)` *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272938#comment-17272938 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck merged pull request #324: URL: https://github.com/apache/kafka-site/pull/324 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13
chia7712 commented on pull request #9981: URL: https://github.com/apache/kafka/pull/9981#issuecomment-768294345 failed tests pass on my local. will trigger QA again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901 ] feyman commented on KAFKA-9689: --- The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc
chia7712 commented on pull request #9939: URL: https://github.com/apache/kafka/pull/9939#issuecomment-768277706 @tang7526 How about using import for this issue? Other references in Javadocs use import also. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on pull request #9939: MINOR: fix @link tag in javadoc
tang7526 commented on pull request #9939: URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328 > @tang7526 How about using import for this issue? Other references in Javadocs use import also. I tried that before and it didn't work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272977#comment-17272977 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck commented on pull request #324: URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768404929 > We should also add these docs to kafka/docs repo @omkreddy, yes a PR for kafka/docs is coming soon This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273055#comment-17273055 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck commented on pull request #326: URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768461055 ping @miguno for a +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671
wcarlson5 commented on a change in pull request #9969: URL: https://github.com/apache/kafka/pull/9969#discussion_r565518232 ## File path: docs/streams/upgrade-guide.html ## @@ -91,6 +91,29 @@ Streams API We extended StreamJoined to include the options withLoggingEnabled() and withLoggingDisabled() in https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs;>KIP-689. + +We added two new methods to Kafka Streams, namely addThread() and removeThread() in +https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663. +These enabled adding a removing StreamThreads to running KafkaStreams client. + + +We deprecated setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) +in favor of setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) +in https://cwiki.apache.org/confluence/x/lkN4CQ;>KIP-671. +The default handler will close the client and the client will transit to state ERROR. +If you implement a custom handler, the new interface allows you to return a StreamThreadExceptionResponse, +which will determine how the application will respond to a thread failure. + + +Changes in https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663 necessitated the KafkaStreams client +state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ;>KIP-696.. +The ERROR state is now terminal with PENDING_ERROR being a transitional state where the resources are closing. +The ERROR state indicates that there something wrong and should not be blindly restarted without classifying Review comment: I went with `is something wrong and the KafkaStreams clinet should not be blindly restarted` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664
hachikuji commented on a change in pull request #9979: URL: https://github.com/apache/kafka/pull/9979#discussion_r565517686 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -511,6 +513,9 @@ public void testSerialization() throws Exception { checkRequest(createAlterClientQuotasRequest(), true); checkErrorResponse(createAlterClientQuotasRequest(), unknownServerException, true); checkResponse(createAlterClientQuotasResponse(), 0, true); +checkRequest(createDescribeProducersRequest(), true); +checkErrorResponse(createDescribeProducersRequest(), unknownServerException, true); +checkResponse(createDescribeProducersResponse(), 0, true); Review comment: Sounds reasonable. I'm also not super fond of this test pattern. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6223) Please delete old releases from mirroring system
[ https://issues.apache.org/jira/browse/KAFKA-6223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273066#comment-17273066 ] ASF GitHub Bot commented on KAFKA-6223: --- mimaison merged pull request #322: URL: https://github.com/apache/kafka-site/pull/322 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Please delete old releases from mirroring system > > > Key: KAFKA-6223 > URL: https://issues.apache.org/jira/browse/KAFKA-6223 > Project: Kafka > Issue Type: Bug > Environment: https://dist.apache.org/repos/dist/release/kafka/ >Reporter: Sebb >Assignee: Rajini Sivaram >Priority: Major > > To reduce the load on the ASF mirrors, projects are required to delete old > releases [1] > Please can you remove all non-current releases? > It's unfair to expect the 3rd party mirrors to carry old releases. > Note that older releases can still be linked from the download page, but such > links should use the archive server at: > https://archive.apache.org/dist/kafka/ > A suggested process is: > + Change the download page to use archive.a.o for old releases > + Delete the corresponding directories from > {{https://dist.apache.org/repos/dist/release/kafka/}} > e.g. {{svn delete https://dist.apache.org/repos/dist/release/kafka/0.8.0}} > Thanks! > [1] http://www.apache.org/dev/release.html#when-to-archive -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 opened a new pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 opened a new pull request #9984: URL: https://github.com/apache/kafka/pull/9984 add timeout and static group rebalance to remove thread ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] miguno opened a new pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)
miguno opened a new pull request #9983: URL: https://github.com/apache/kafka/pull/9983 This adds a new user-facing documentation "Geo-replication (Cross-Cluster Data Mirroring)" section to the Kafka Operations documentation that covers MirrorMaker v2. Was already merged to `kafka-site` via https://github.com/apache/kafka-site/pull/324. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272987#comment-17272987 ] ASF GitHub Bot commented on KAFKA-8930: --- miguno commented on pull request #324: URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768416513 kafka/docs PR is up at https://github.com/apache/kafka/pull/9983 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273030#comment-17273030 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck opened a new pull request #326: URL: https://github.com/apache/kafka-site/pull/326 The MM2 docs are already in for 2.7 via https://github.com/apache/kafka-site/pull/324, this PR adds them to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => +val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null + val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) Review comment: Thinking on this more, I can simplify the line. If topic name is null, then we didn't have a valid topic ID. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei commented on pull request #9836: URL: https://github.com/apache/kafka/pull/9836#issuecomment-768546193 Most of those failures were known flaky tests, but one was an EasyMock error. I'm not able to repro it locally after a rebase, though. Rebased, pushed, and trying one more time to get a clean build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)
bbejeck commented on pull request #9983: URL: https://github.com/apache/kafka/pull/9983#issuecomment-768429896 cherry-picked to 2.7 and 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator
vvcephei commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r565519066 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * The {@code Branched} class is used to define the optional parameters when building branches with + * {@link BranchedKStream}. + * + * @param type of record key + * @param type of record value + */ +public class Branched implements NamedOperation> { + +protected final String name; +protected final Function, ? extends KStream> chainFunction; +protected final Consumer> chainConsumer; + +protected Branched(final String name, + final Function, ? extends KStream> chainFunction, + final Consumer> chainConsumer) { +this.name = name; +this.chainFunction = chainFunction; +this.chainConsumer = chainConsumer; +} + +/** + * Create an instance of {@code Branched} with provided branch name suffix. + * + * @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated + * (see {@link BranchedKStream} description for details) + * @param key type + * @param value type + * @return a new instance of {@code Branched} + */ +public static Branched as(final String name) { +return new Branched<>(name, null, null); Review comment: I agree, it seems like a good idea to check for `null` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273059#comment-17273059 ] ASF GitHub Bot commented on KAFKA-8930: --- miguno commented on pull request #326: URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768466468 This LGTM, though (1) there were some minor HTML changes not directly related to the original PR and (2) I didn't test this PR locally myself to ensure proper HTML rendering etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273073#comment-17273073 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck commented on pull request #326: URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852 > I didn't test this PR locally myself to ensure proper HTML rendering etc. FWIW I rendered it locally and it seemed fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => +val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null + val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) Review comment: Thinking on this more, I can simplify the line. Especially if I make changes with the code above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)
bbejeck commented on pull request #9983: URL: https://github.com/apache/kafka/pull/9983#issuecomment-768420998 merged #9983 into trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)
bbejeck merged pull request #9983: URL: https://github.com/apache/kafka/pull/9983 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664
hachikuji commented on a change in pull request #9979: URL: https://github.com/apache/kafka/pull/9979#discussion_r565525057 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3365,6 +3366,44 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = { +val describeProducersRequest = request.body[DescribeProducersRequest] + +def partitionError(topicPartition: TopicPartition, error: Errors): DescribeProducersResponseData.PartitionResponse = { + new DescribeProducersResponseData.PartitionResponse() +.setPartitionIndex(topicPartition.partition) +.setErrorCode(error.code) +} + +val response = new DescribeProducersResponseData() +describeProducersRequest.data.topics.forEach { topicRequest => + val topicResponse = new DescribeProducersResponseData.TopicResponse() +.setName(topicRequest.name) + val topicError = if (!authHelper.authorize(request.context, READ, TOPIC, topicRequest.name)) +Some(Errors.TOPIC_AUTHORIZATION_FAILED) + else if (!metadataCache.contains(topicRequest.name)) +Some(Errors.UNKNOWN_TOPIC_OR_PARTITION) + else +None + + topicRequest.partitionIndexes.forEach { partitionId => +val topicPartition = new TopicPartition(topicRequest.name, partitionId) +val partitionResponse = topicError match { + case Some(error) => partitionError(topicPartition, error) + case None => replicaManager.activeProducerState(topicPartition) +} +topicResponse.partitions.add(partitionResponse) + } + + if (!topicResponse.partitions.isEmpty) { +response.topics.add(topicResponse) + } Review comment: Hmm... It's been a while since I wrote this. I agree it looks a little strange. I guess there's probably no harm echoing back the same structure that was sent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, false, util.Collections.emptyList())) Review comment: So will we never reach this code path when using topic IDs? I think we are using topics to decide authorization. So in the case where we use ids and the name exists, then we will expose the name and return a zero ID? Might be useful to create an authorizer integration test with topic IDs to ensure correctness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael G. Noll reassigned KAFKA-8930: -- Assignee: Michael G. Noll (was: Ryanne Dolan) > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664
hachikuji commented on a change in pull request #9979: URL: https://github.com/apache/kafka/pull/9979#discussion_r565519812 ## File path: clients/src/main/resources/common/message/DescribeProducersResponse.json ## @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 61, + "type": "response", + "name": "DescribeProducersResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "Topics", "type": "[]TopicResponse", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", +"about": "The topic name" }, + { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+", +"about": "Each partition in the response.", "fields": [ +{ "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error code, or 0 if there was no error." }, Review comment: I don't feel strongly about it. Perhaps it's better to have it than not. I will add it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator
vvcephei commented on a change in pull request #9107: URL: https://github.com/apache/kafka/pull/9107#discussion_r565519912 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.util.Map; + +/** + * Branches the records in the original stream based on the predicates supplied for the branch definitions. + * + * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or + * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against the predicates + * supplied via {@link Branched} parameters, and is routed to the first branch for which its respective predicate + * evaluates to {@code true}. If a record does not match any predicates, it will be routed to the default branch, + * or dropped if no default branch is created. + * + * Each branch (which is a {@link KStream} instance) then can be processed either by + * a {@link java.util.function.Function} or a {@link java.util.function.Consumer} provided via a {@link Branched} + * parameter. If certain conditions are met, it also can be accessed from the {@link Map} returned by + * {@link BranchedKStream#defaultBranch(Branched)} or {@link BranchedKStream#noDefaultBranch()} + * (see usage examples). + * + * The branching happens on first-match: A record in the original stream is assigned to the corresponding result Review comment: Yes, I agree, unless you want to add a noun: ```suggestion * The branching happens on a first-match basis: A record in the original stream is assigned to the corresponding result ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka
junrao commented on a change in pull request #9872: URL: https://github.com/apache/kafka/pull/9872#discussion_r565473406 ## File path: Jenkinsfile ## @@ -160,5 +160,23 @@ pipeline { } } } +stage("Arm Build") { + agent { label 'arm4' } + options { +timeout(time: 8, unit: 'HOURS') +timestamps() + } + environment { Review comment: Do we need to specify the jdk version since scala 2.12 only works on jdk 8? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdespriee commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors
mdespriee commented on a change in pull request #9738: URL: https://github.com/apache/kafka/pull/9738#discussion_r565497044 ## File path: streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala ## @@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "setting a name on a filter processor" should "pass the name to the topology" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() +table + .filter((key, value) => key.equals("a") && value == 1, Named.as("my-name")) + .toStream + .to(sinkTopic) + +import scala.jdk.CollectionConverters._ + +val filterNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3) +filterNode.name() shouldBe "my-name" + } + + "setting a name on a count processor" should "pass the name to the topology" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count(Named.as("my-name")) +table.toStream.to(sinkTopic) + +import scala.jdk.CollectionConverters._ + +val filterNode = builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1) +filterNode.name() shouldBe "my-name" Review comment: bad copy-paste of variable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671
mjsax commented on a change in pull request #9969: URL: https://github.com/apache/kafka/pull/9969#discussion_r565515280 ## File path: docs/streams/upgrade-guide.html ## @@ -91,6 +91,29 @@ Streams API We extended StreamJoined to include the options withLoggingEnabled() and withLoggingDisabled() in https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs;>KIP-689. + +We added two new methods to Kafka Streams, namely addThread() and removeThread() in +https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663. +These enabled adding a removing StreamThreads to running KafkaStreams client. + + +We deprecated setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) +in favor of setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) +in https://cwiki.apache.org/confluence/x/lkN4CQ;>KIP-671. +The default handler will close the client and the client will transit to state ERROR. +If you implement a custom handler, the new interface allows you to return a StreamThreadExceptionResponse, +which will determine how the application will respond to a thread failure. + + +Changes in https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663 necessitated the KafkaStreams client +state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ;>KIP-696.. +The ERROR state is now terminal with PENDING_ERROR being a transitional state where the resources are closing. +The ERROR state indicates that there something wrong and should not be blindly restarted without classifying Review comment: `that there something` -> `that there [is] something` ? `and should not be blindly restarted` -> `and you should not restarted KafkaStreams blindly` ? (or `and [KafkaStreams] should not be blindly restarted`) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
mjsax commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-768478775 @inponomarev the failing tests seems to be due to a known issue that was fixed via https://github.com/apache/kafka/pull/9768 Can you rebase your PR to pickup the fix so we can get a green build? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273075#comment-17273075 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck merged pull request #326: URL: https://github.com/apache/kafka-site/pull/326 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8930) MM2 documentation
[ https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273074#comment-17273074 ] ASF GitHub Bot commented on KAFKA-8930: --- bbejeck edited a comment on pull request #326: URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852 > I didn't test this PR locally myself to ensure proper HTML rendering etc. FWIW I rendered it locally and it seemed fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > MM2 documentation > - > > Key: KAFKA-8930 > URL: https://issues.apache.org/jira/browse/KAFKA-8930 > Project: Kafka > Issue Type: Improvement > Components: documentation, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Michael G. Noll >Priority: Minor > > Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig > classes. Include example usage and example configuration. > Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and > MirrorHeartbeatConnector, including example configuration for running on > Connect w/o mm2 driver. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
jolshan commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, false, util.Collections.emptyList())) Review comment: So will we never reach this code path when using topic IDs? I think we are using topics to decide authorization. So in the case where we use ids and the name exists, then we will expose the name and return a zero ID? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -34,6 +34,7 @@ import scala.collection.mutable object KafkaNetworkChannel { + val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0) Review comment: Yea, good catch. The AddressSpec makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -36,7 +36,9 @@ public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + "the set of voters in a comma-separated list of `{id}@{host}:{port}` entries. " + -"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`"; +"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" + +"If the voter endpoints are not known at startup, a non-routable address can be provided instead." + Review comment: Fair enough. I can move it there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte Review comment: typo thte ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565720574 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { Review comment: We generally don't explicitly make this part of the API, and just inform users through the javadocs as you've done ```suggestion public Optional removeStreamThread(final Duration timeout) { ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time Review comment: ```suggestion * @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -88,9 +91,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.errors.TimeoutException; Review comment: nit: move the import to the other `o.a.k.*` imports ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (streamThread.getGroupInstanceID().isPresent()) { +final MemberToRemove memberToRemove = new MemberToRemove(streamThread.getGroupInstanceID().get()); +final Collection membersToRemove = Collections.singletonList(memberToRemove); +final RemoveMembersFromConsumerGroupResult
[GitHub] [kafka] satishd commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.
satishd commented on a change in pull request #9980: URL: https://github.com/apache/kafka/pull/9980#discussion_r565730154 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -63,7 +63,7 @@ private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 def empty(producerId: Long) = new ProducerStateEntry(producerId, -batchMetadata = mutable.Queue[BatchMetadata](), +batchMetadata = new mutable.Queue[BatchMetadata](5), Review comment: minor: you may want to have it as `new mutable.Queue[BatchMetadata](NumBatchesToRetain)` instead of harcoding directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565731661 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -319,9 +319,9 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin StreamThread.State.PARTITIONS_ASSIGNED); return null; }).anyTimes(); + EasyMock.expect(thread.getGroupInstanceID()).andReturn(Optional.empty()).anyTimes(); Review comment: ```suggestion EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty()); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: It's a bit weird to test this by passing in a negative timeout but I don't have any good ideas for forcing it to exceed the timeout ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { Review comment: ```suggestion public void shouldNotRemoveStreamThreadWithinTimeout() throws Exception { ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: Hm actually now that I think about it, we should probably continue with the cleanup to leave the
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.getProcessId(); Review comment: This is the only real change in the constructor, but I had to move a few things around and tried to organize them as I went This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
skaundinya15 commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r565772802 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ## @@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) { public List clusterPairs() { List pairs = new ArrayList<>(); Set clusters = clusters(); +Map originalStrings = originalsStrings(); +boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT; +if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + for (String source : clusters) { for (String target : clusters) { -SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); if (!source.equals(target)) { -pairs.add(sourceAndTarget); +String clusterPairConfigPrefix = source + "->" + target + "."; +boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false")); +boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled; +if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + +if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { Review comment: ```suggestion // By default, all source->target Herder combinations are created even if `x->y.enabled=false` // Unless `emit.heartbeats.enabled=false` or `x->y.emit.heartbeats.enabled=false` // Reason for this behavior: for a given replication flow A->B with heartbeats, 2 herders are required : // B->A for the MirrorHeartbeatConnector (emits heartbeats into A for monitoring replication health) // A->B for the MirrorSourceConnector (actual replication flow) if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { ``` Looks good to me, just had a small tweak for the `B->A` comment. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.
gardnervickers commented on a change in pull request #9980: URL: https://github.com/apache/kafka/pull/9980#discussion_r565781027 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -63,7 +63,7 @@ private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 def empty(producerId: Long) = new ProducerStateEntry(producerId, -batchMetadata = mutable.Queue[BatchMetadata](), +batchMetadata = new mutable.Queue[BatchMetadata](5), Review comment: Yes, good suggestion :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: H. That is interesting. I am not sure. If the thread hasn't been removed then we don't want to resize the cache so would removing the thread then throwing an exception the right way of doing it as the timeout is essentially saying that removing the thread failed. So is it right to then remove it anyways? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: H. That is interesting. I am not sure. If the thread hasn't been removed then we don't want to resize the cache. The timeout is essentially saying that removing the thread failed. So is it right to then remove it anyways? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500
chia7712 commented on a change in pull request #9967: URL: https://github.com/apache/kafka/pull/9967#discussion_r565786393 ## File path: core/src/main/scala/kafka/server/Server.scala ## @@ -46,6 +46,22 @@ object Server { new Metrics(metricConfig, reporters, time, true, metricsContext) } + def initializeMetrics( +config: KafkaConfig, +time: Time, +metaProps: MetaProperties Review comment: It seems to me the properties in ```MetaProperties``` is duplicate to ```KafkaConfig``` in this case. Is there any reason that we need to pass ```MetaProperties```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565797234 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: It does seem like kind of a gray area. Still, the TimeoutException isn't necessarily saying that it failed, just that we didn't wait long enough for it to finish the shutdown. But we have at least definitely initiated the shutdown -- besides, if the thread really is stuck in its shutdown then it's probably a benefit to go ahead with the `removeMembersFromConsumerGroup` call to get it kicked out all the sooner. But, in the end, we really make no guarantees about the application should a user choose to ignore the TimeoutException (though they absolutely can). I can imagine that some users might choose to just swallow it and decide that they don't care if the shutdown is taking a long time. It's hard to say This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901 ] feyman edited comment on KAFKA-9689 at 1/28/21, 3:54 AM: - The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). Current implementation leverages the assignment. 2.1 Each streamThread put the feature metadata(EOS feature version) in the SubscriptionInfo when subscribe 2.2 Upon receiving the JoinGroupResp, the leader will know the current feature version in the broker side, it can put the current broker side feature version(if updated) in the assignment as suggested feature version 2.3 when the follower receive the assignment in the SyncGroupResp, it will find the new broker side latest feature version 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that I'm implementing the code as the sequence above, currently on 2, but need to discuss if step 2 make sense, haven't start step 3 yet. Questions to [~bchen225242] : A) 2.1 Might need to add a new field in the SubscriptionInfoData to include the client side feature metadata, it seems ok to me since SubscriptionInfoData is the stream-specific and doesn't seem to need a KIP for it, thoughts ? was (Author: feyman): The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). Current implementation levrages the assignment. 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -54,14 +61,27 @@ private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); static final String LOCK_FILE_NAME = ".lock"; +/* The process file is used to persist the process id across restarts. + * The version 0 schema consists only of the version number and UUID + * + * If you need to store additional metadata of the process you can bump the version numberand append new fields. + * For compatibility reasons you should only ever add fields, and only by appending them to the end + */ +private static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata"; +private static final int PROCESS_FILE_VERSION = 0; Review comment: No idea if we'll ever want to add anything else to this file, but better to be safe and forward compatible than sad This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws Exception { logPrefix(), dirName, id), exception ); -throw exception; Review comment: IDE was giving me a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ## @@ -112,6 +118,19 @@ public void createTopics() throws Exception { CLUSTER.createTopic(outputTopic, 1, 3); } +@After +public void cleanUp() { +if (streamInstanceOne != null) { +streamInstanceOne.close(); +} +if (streamInstanceTwo != null) { +streamInstanceTwo.close(); +} +if (streamInstanceOneRecovery != null) { +streamInstanceOneRecovery.close(); +} Review comment: There are no logical changes to this test, I just had to refactor it a bit because we were creating two copies of the same KafkaStreams at the same time (with the same app.dir & state.dir), even though one of them wasn't started until much later. Since we do the state initialization inside the KafkaStreams constructor, this was no good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565823251 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: Okay I buy it I'll delay the exception This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9982: MINOR: remove some explicit type argument in generator
dengziming commented on pull request #9982: URL: https://github.com/apache/kafka/pull/9982#issuecomment-768802003 @chia7712 @cmccabe Hello, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
chia7712 commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r565826783 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -1004,10 +998,6 @@ public void testWithRecords(Args args) { } Review comment: @g1geordie I file a patch for aforementioned idea. Please take a look at https://github.com/chia7712/kafka/pull/1/files it uses explicit assert (exception or expected value) for all parameters instead of just ignoring them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet
[ https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10658: --- Affects Version/s: 2.6.0 > ErrantRecordReporter.report always return completed future even though the > record is not sent to DLQ topic yet > --- > > Key: KAFKA-10658 > URL: https://issues.apache.org/jira/browse/KAFKA-10658 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > This issue happens when both DLQ and error log are enabled. There is a > incorrect filter in handling multiple reports and it results in the > uncompleted future is filtered out. Hence, users always receive a completed > future even though the record is still in producer buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12220) Replace PowerMock by Mockito
[ https://issues.apache.org/jira/browse/KAFKA-12220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273244#comment-17273244 ] Chia-Ping Tsai commented on KAFKA-12220: [~ijuma] How about splitting PR by package? ||package||classes|| |org.apache.kafka.connect.runtime.standalone|1| |org.apache.kafka.connect.runtime.distributed|3| |org.apache.kafka.connect.runtime.errors|2| |org.apache.kafka.connect.runtime.rest|3| |org.apache.kafka.connect.util|3| |org.apache.kafka.connect.storage|4| |org.apache.kafka.connect.runtime|9| > Replace PowerMock by Mockito > > > Key: KAFKA-12220 > URL: https://issues.apache.org/jira/browse/KAFKA-12220 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > We are migrating project from junit 4 to junit 5 (KAFKA-7339). PowerMock, > however, does not support junit 5 totally > (https://github.com/powermock/powermock/issues/830). Hence, we ought to > replace PowerMock by Mockito before migrating to junit 5 since rewriting all > tests which are depending on PowerMock can bring a bunch of changes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
wcarlson5 commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565815347 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { +log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; } -if (state == State.PENDING_ERROR) { -log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); -if (waitOnState(State.ERROR, timeoutMs)) { +if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) { +log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); +if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { log.info("Streams client stopped to ERROR completely"); return true; +} else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { +log.info("Streams client stopped to NOT_RUNNING completely"); +return true; } else { -log.info("Streams client cannot transition to ERROR completely within the timeout"); +log.warn("Streams client cannot transition to {}} completely within the timeout", state); Review comment: the state here doesn't make the log make sense. If the state is `PENDING_ERROR` then the log should say ERROR ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: since it doesn't seem that we need to be very thrifty with space for this file would it make sense to write it in a more friendly format that would be easier to maintain? i.e. json or something, we are giving it a version number... ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { Review comment: I think this change makes a lot of sense. I don't think it changes the final behavior besides avoiding extra state change rejections from the logs, but it looks like they are replaced. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); Review comment: Is there any case where we might want to release the lock of this state directory? It looks like we just hold it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that sending a snapshot is not required. When thinking through
[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
g1geordie commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r565834861 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -1004,10 +998,6 @@ public void testWithRecords(Args args) { } Review comment: It sound like you change `assume (condition)` to `if (condition) ... else ...` in all method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9981: MINOR: Upgrade to Scala 2.12.13
chia7712 merged pull request #9981: URL: https://github.com/apache/kafka/pull/9981 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565783921 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1843,6 +1843,8 @@ class KafkaApis(val requestChannel: RequestChannel, .setNumPartitions(-1) .setReplicationFactor(-1) .setTopicConfigErrorCode(Errors.NONE.code) + } else { + result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(), Uuid.ZERO_UUID)) Review comment: I've added something like this to ZkAdminManager. Let me know if it makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] inponomarev commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
inponomarev commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-768769254 > @inponomarev the failing tests seems to be due to a known issue that was fixed via #9768 > > Can you rebase your PR to pickup the fix so we can get a green build? Done rebasing, expect the fixes according to your latest review soon! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { Review comment: Something I noticed during testing, I feel it makes sense for the handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). This is a slight change in behavior; now if a user calls `close()` while the instance is already closing, it will wait for the ongoing shutdown to complete before returning (with timeout). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565802847 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: But it isn't consistent because if the thread removes itself then the timeout its started This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling
vvcephei commented on pull request #9840: URL: https://github.com/apache/kafka/pull/9840#issuecomment-768782409 Hmm, the Java 8 build appears to have hung after an hour and 58 minutes. It's been running for 3 hours and 30 minutes now. This is now the 16th build, and there have been multiple Java 8 successes to date, so I think it's environmental. I'll go ahead with the merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9840: KAFKA-10867: Improved task idling
vvcephei merged pull request #9840: URL: https://github.com/apache/kafka/pull/9840 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { + if (shouldForwardRequest(request)) { +forwardingManager.sendInterBrokerRequest( + getCreateTopicsRequest(Seq(internalTopicName)), + _ => ()) + } else { +val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + +val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName)) +adminManager.createTopics( + config.requestTimeoutMs, + validateOnly = false, + topicConfigs, + Map.empty, + controllerMutationQuota, + _ => ()) + } } -val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) -} else { - val coordinatorEndpoint = topicMetadata.partitions.asScala -.find(_.partitionIndex == partition) -.filter(_.leaderId != MetadataResponse.NO_LEADER_ID) -.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) -.flatMap(_.getNode(request.context.listenerName)) -.filterNot(_.isEmpty) - - coordinatorEndpoint match { -case Some(endpoint) => - createFindCoordinatorResponse(Errors.NONE, endpoint) -case _ => - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( + Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) + } else { +def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) { +
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that the sending a snapshot is not required. When thinking
[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13
chia7712 commented on pull request #9981: URL: https://github.com/apache/kafka/pull/9981#issuecomment-768681359 build pass This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
mjsax commented on pull request #9708: URL: https://github.com/apache/kafka/pull/9708#issuecomment-768685944 Thanks @lct45! For reference: https://github.com/apache/kafka/pull/9951 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273249#comment-17273249 ] Matthias J. Sax commented on KAFKA-6520: i am wondering if https://issues.apache.org/jira/browse/KAFKA-10866 (merge recently) is something we could exploit to implement a DISCONNECT state? The new metadata contains a `receivedTimestamp` field and thus we could track the time difference of "now" and the last received fetch response. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565785613 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: I don't either... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
chia7712 commented on pull request #9906: URL: https://github.com/apache/kafka/pull/9906#issuecomment-768769711 For another, ```testWriteControlBatchNotAllowedMagicV0``` and ```testWriteControlBatchNotAllowedMagicV1``` are almost same. Could we merge them into single test case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3745: -- Assignee: Bill Bejeck > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Bill Bejeck >Priority: Minor > Labels: api, kip > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. > KIP-149: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
mjsax commented on pull request #9420: URL: https://github.com/apache/kafka/pull/9420#issuecomment-768709672 @dongjinleekr -- the PR shows merge conflicts. Can you rebase once more. Sorry about that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-3745: -- Assignee: (was: Bill Bejeck) > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, kip > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. > KIP-149: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500
hachikuji commented on pull request #9967: URL: https://github.com/apache/kafka/pull/9967#issuecomment-768742967 @chia7712 @ijuma Thanks for the comments thus far. This is ready for another look when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
hachikuji commented on pull request #9589: URL: https://github.com/apache/kafka/pull/9589#issuecomment-768756102 @twobeeb Before I merge, would you mind updating the PR description? Also, I will leave it to you to add the doc suggestion from @skaundinya15. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } +private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { +final MetricConfig metricConfig = new MetricConfig() +.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); +final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); +final JmxReporter jmxReporter = new JmxReporter(); +jmxReporter.configure(config.originals()); +reporters.add(jmxReporter); +final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); +return new Metrics(metricConfig, reporters, time, metricsContext); +} + +private int getNumStreamThreads(final boolean hasGlobalTopology) { +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.info("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Topology with no input topics will create no stream threads and no global thread."); +throw new TopologyException("Topology has no stream threads and no global threads, " + +"must subscribe to at least one source topic or global table."); +} +return numStreamThreads; Review comment: Just tried to factor some of the self-contained logic into helper methods, since I found it incredibly difficult to get oriented within the super-long KafkaStreams constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.initializeProcessId(); Review comment: this is the only logical change in the KafkaStreams constructor: the rest of the diff is due to moving things around in order to get everything initialized in the proper order This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on pull request #9978: URL: https://github.com/apache/kafka/pull/9978#issuecomment-768786220 Not done with the tests, but I'd appreciate some feedback on the non-testing code and general idea -- any takers for review? @cadonna @vvcephei @guozhangwang @wcarlson5 @lct45 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
vvcephei commented on pull request #9420: URL: https://github.com/apache/kafka/pull/9420#issuecomment-768789634 Hey @dongjinleekr , Sorry for the force-push, but I had to rebase this and resolve a conflict before merging. Note that the conflict was from 462c89e0b436abd56864bea8bbcaf1ab70b7f66e, which re-organized the boolean conditions in the StateDirectory constructor, specifically where we warn if the state dir is a temp dir. After resolving the conflict, I noticed there's no test for that warning, so I added one to be sure it works. It also looked like the temp dir check could actually be a bit simpler, so I just tweaked it rather than leaving a new comment for you to address. I hope this is all ok. I'll let the tests run and merge in the morning, unless you have any objections. Thanks! -John This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565820783 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that sending a snapshot is not required. When thinking through
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273328#comment-17273328 ] Boyang Chen commented on KAFKA-12169: - In general, the leader should be able to detect metadata discrepancy between its remembered topic metadata and broker side metadata. I don't think we have any test case to cover both the topic partition change and leader rejoin at the same time, so it's possible and needs some verification. > Consumer can not know paritions chage when client leader restart with static > membership protocol > > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; } -public void waitOnThreadState(final StreamThread.State targetState) { +public boolean waitOnThreadState(final StreamThread.State targetState, long timeoutMs) { +if (timeoutMs < 0) { Review comment: for the non timeout uses ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID(){ +return mainConsumer.groupMetadata().groupInstanceId().orElse(""); Review comment: It seems easier to get it form here than the config. It looked like I might have how to manipulate strings in that case ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); Review comment: I ended up getting the `group.instance.id` from the streamThread This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
cadonna commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565615976 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID() { Review comment: Why not an `Optional`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org