[jira] [Updated] (KAFKA-15410) Add basic functionality integration test with tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-15410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15410: - Description: Add the below basic functionality integration tests with tiered storage: # PartitionsExpandTest # DeleteTopicWithSecondaryStorageTest # DeleteSegmentsByRetentionSizeTest # DeleteSegmentsByRetentionTimeTest # DeleteSegmentsDueToLogStartOffsetBreachTest # EnableRemoteLogOnTopicTest # ReassignReplicaExpandTest # ReassignReplicaMoveTest # ReassignReplicaShrinkTest and # TransactionsTestWithTieredStore was: Add the below basic functionality integration tests with tiered storage: # PartitionsExpandTest # DeleteTopicWithSecondaryStorageTest # DeleteSegmentsByRetentionSizeTest # DeleteSegmentsByRetentionTimeTest # DeleteSegmentsDueToLogStartOffsetBreachTest # EnableRemoteLogOnTopicTest # ListOffsetsTest # ReassignReplicaExpandTest # ReassignReplicaMoveTest # ReassignReplicaShrinkTest and # TransactionsTestWithTieredStore > Add basic functionality integration test with tiered storage > > > Key: KAFKA-15410 > URL: https://issues.apache.org/jira/browse/KAFKA-15410 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > > Add the below basic functionality integration tests with tiered storage: > # PartitionsExpandTest > # DeleteTopicWithSecondaryStorageTest > # DeleteSegmentsByRetentionSizeTest > # DeleteSegmentsByRetentionTimeTest > # DeleteSegmentsDueToLogStartOffsetBreachTest > # EnableRemoteLogOnTopicTest > # ReassignReplicaExpandTest > # ReassignReplicaMoveTest > # ReassignReplicaShrinkTest and > # TransactionsTestWithTieredStore -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15410) Add basic functionality integration test with tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-15410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15410: - Description: Add the below basic functionality integration tests with tiered storage: # PartitionsExpandTest # DeleteTopicTest # DeleteSegmentsByRetentionSizeTest # DeleteSegmentsByRetentionTimeTest # DeleteSegmentsDueToLogStartOffsetBreachTest # EnableRemoteLogOnTopicTest # ReassignReplicaExpandTest # ReassignReplicaMoveTest # ReassignReplicaShrinkTest and # TransactionsTestWithTieredStore was: Add the below basic functionality integration tests with tiered storage: # PartitionsExpandTest # DeleteTopicWithSecondaryStorageTest # DeleteSegmentsByRetentionSizeTest # DeleteSegmentsByRetentionTimeTest # DeleteSegmentsDueToLogStartOffsetBreachTest # EnableRemoteLogOnTopicTest # ReassignReplicaExpandTest # ReassignReplicaMoveTest # ReassignReplicaShrinkTest and # TransactionsTestWithTieredStore > Add basic functionality integration test with tiered storage > > > Key: KAFKA-15410 > URL: https://issues.apache.org/jira/browse/KAFKA-15410 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > > Add the below basic functionality integration tests with tiered storage: > # PartitionsExpandTest > # DeleteTopicTest > # DeleteSegmentsByRetentionSizeTest > # DeleteSegmentsByRetentionTimeTest > # DeleteSegmentsDueToLogStartOffsetBreachTest > # EnableRemoteLogOnTopicTest > # ReassignReplicaExpandTest > # ReassignReplicaMoveTest > # ReassignReplicaShrinkTest and > # TransactionsTestWithTieredStore -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage
satishd commented on code in PR #14340: URL: https://github.com/apache/kafka/pull/14340#discussion_r1316738717 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1387,6 +1390,112 @@ public void testStopPartitionsWithDeletion() throws RemoteStorageException { verify(remoteLogMetadataManager, times(16)).updateRemoteLogSegmentMetadata(any()); } +/** + * This test asserts that the newly elected leader for a partition is able to find the log-start-offset. + * Note that the case tested here is that the previous leader deleted the log segments up-to offset 500. And, the + * log-start-offset didn't propagate to the replicas before the leader-election. + */ +@Test +public void testFindLogStartOffset() throws RemoteStorageException, IOException { +List epochEntries = new ArrayList<>(); +epochEntries.add(new EpochEntry(0, 0L)); +epochEntries.add(new EpochEntry(1, 250L)); +epochEntries.add(new EpochEntry(2, 550L)); +checkpoint.write(epochEntries); + +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + +long timestamp = time.milliseconds(); +int segmentSize = 1024; +List segmentMetadataList = Arrays.asList( +new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), +500, 599, timestamp, brokerId, timestamp, segmentSize, truncateAndGetLeaderEpochs(epochEntries, 500L, 599L)), +new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), Review Comment: This segment contains records with epoch 2 instead of 1 but we are returning this for epoch 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #14339: KAFKA-15422: Update documenttion for delegation tokens when working with Kafka with KRaft
omkreddy merged PR #14339: URL: https://github.com/apache/kafka/pull/14339 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm opened a new pull request, #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm opened a new pull request, #14346: URL: https://github.com/apache/kafka/pull/14346 Implementation of the required functionality for resetting and validating positions in the new async consumer. This PR includes: - New application events `ResetPositionsApplicationEvent` and `ValidatePositionsApplicationEvent`, both handled by the same `OfffsetsRequestManager`. - Integration of the reset/validate functionality in the new async consumer, to update fetch positions. - Minor refactoring to extract functionality that is reused from both consumer implementations (moving logic without changes from `OffsetFetcher` into `OffsetFetchUtils`, and from `OffsetsForLeaderEpochClient` into `OffsetsForLeaderEpochUtils`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15115: --- Description: Introduce support for resetting positions in the new OffsetsRequestManager. This task will include a new event for the resetPositions calls performed from the new consumer, and the logic for handling such events in the OffsetRequestManager. The reset positions implementation will keep the same behaviour as the one in the old consumer, but adapted to the new threading model. So it is based in a RESET_POSITIONS events that is submitted to the background thread, and then processed by the ApplicationEventProcessor. The processing itself is done by the OffsetRequestManager given that this will require a LIST_OFFSETS request for the partitions awaiting reset. was: Introduce support for resetting positions in the new ListOffsetsRequestManager. This task will include a new event for the resetPositions calls performed from the new consumer, and the logic for handling such events in the ListOffsetRequestManager. The reset positions implementation will keep the same behaviour as the one in the old consumer, but adapted to the new threading model. So it is based in a RESET_POSITIONS events that is submitted to the background thread, and the processed by the ApplicationEventProcessor. The processing itself is done by the ListOffsetRequestManager given that this will require a LIST_OFFSETS request for the partitions awaiting reset. > Implement resetPositions functionality in OffsetsRequestManager > --- > > Key: KAFKA-15115 > URL: https://issues.apache.org/jira/browse/KAFKA-15115 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > Introduce support for resetting positions in the new OffsetsRequestManager. > This task will include a new event for the resetPositions calls performed > from the new consumer, and the logic for handling such events in the > OffsetRequestManager. > The reset positions implementation will keep the same behaviour as the one in > the old consumer, but adapted to the new threading model. So it is based in a > RESET_POSITIONS events that is submitted to the background thread, and then > processed by the ApplicationEventProcessor. The processing itself is done by > the OffsetRequestManager given that this will require a LIST_OFFSETS request > for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15115: --- Summary: Implement resetPositions functionality in OffsetsRequestManager (was: Implement resetPositions functionality in ListOffsetRequestManager) > Implement resetPositions functionality in OffsetsRequestManager > --- > > Key: KAFKA-15115 > URL: https://issues.apache.org/jira/browse/KAFKA-15115 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > Introduce support for resetting positions in the new > ListOffsetsRequestManager. This task will include a new event for the > resetPositions calls performed from the new consumer, and the logic for > handling such events in the ListOffsetRequestManager. > The reset positions implementation will keep the same behaviour as the one in > the old consumer, but adapted to the new threading model. So it is based in a > RESET_POSITIONS events that is submitted to the background thread, and the > processed by the ApplicationEventProcessor. The processing itself is done by > the ListOffsetRequestManager given that this will require a LIST_OFFSETS > request for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15438) Review exception caching logic used for reset/validate positions in async consumer
Lianet Magrans created KAFKA-15438: -- Summary: Review exception caching logic used for reset/validate positions in async consumer Key: KAFKA-15438 URL: https://issues.apache.org/jira/browse/KAFKA-15438 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans The refactored async consumer reuses part of the core logic required for resetting and validating positions. That currently works on the principle of async requests, that reset/validate positions when responses are received. If the responses include errors, or if a validation verification fails (ex. log truncation detected), exceptions are saved in-memory, to be thrown on the next call to the reset/validate. Note that these functionalities are periodically called as part of the poll loop to update fetch positions before fetching records. As an initial implementation, the async consumer reuses this same caching logic, as it has the asyn nature required. This task aims at reviewing the processing of `ResetApplicationEvent `and `ValidatePositionsApplicationEvent` to evaluate if they should rely on event completion instead, to propagate the errors found. It would align with how other application events manage async requests and responses/errors for the new async consumer (based on CompletableFutures), but with the trade-off of heavily changing a caching logic that is currently reused by the legacy and the new consumer in the OffsetFetcherUtils. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
showuon commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1316586591 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3487,8 +3487,12 @@ class ReplicaManagerTest { assertTrue(replicaManager.logManager.getLog(tp0).isDefined) } if (enableRemoteStorage) { -verify(mockRemoteLogManager, times(1)) - .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), ArgumentMatchers.eq(false), any()) +if (throwIOException) { + verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any()) Review Comment: Good catch. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
rreddy-22 commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316576861 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -306,6 +308,22 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a LeaveGroup request. + * + * @param context The request context. Review Comment: The formatting with tab spaces between context and the description seems different in this javadoc and the next one, do we wanna uniformly space them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central
[ https://issues.apache.org/jira/browse/KAFKA-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762237#comment-17762237 ] Satish Duggana commented on KAFKA-15249: [~ChrisEgerton] I thought there is some automation planned in release scripts. Thanks for the clarification on that. Please update 3.6.0 release plan with this note and close this JIRA. > Verify Connect test-plugins artifact is published to Maven Central > -- > > Key: KAFKA-15249 > URL: https://issues.apache.org/jira/browse/KAFKA-15249 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.6.0 > > > In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store > all testing-only Connect plugins and removed those plugins from existing > Connect modules. > These testing-only plugins are intentionally excluded from the project's > release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) > however, some users may still be relying on them for testing environments. > Although we should refrain from distributing these testing-only plugins with > our out-of-the-box distribution of Connect, we should still ensure that > they're available on an opt-in basis to users who would like to continue > using them. This can be accomplished by publishing them to [Maven > Central|https://search.maven.org/], like we do with our other modules. > This will probably happen automatically during the next release (3.6.0) with > no further action required. This ticket is just here as a reminder to verify > that the artifacts are present in the staging Maven repo when release > candidates are published for voting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd merged pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
satishd merged PR #14329: URL: https://github.com/apache/kafka/pull/14329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)
satishd commented on PR #14329: URL: https://github.com/apache/kafka/pull/14329#issuecomment-1707469208 Merging it to trunk and 3.6 to unblock as it received 3 +1s. A couple of test failures unrelated to this change. Merging it to trunk and 3.6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on code in PR #14314: URL: https://github.com/apache/kafka/pull/14314#discussion_r1316539813 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java: ## @@ -72,6 +79,12 @@ private void handleTopicError( ) { switch (topicError) { case UNKNOWN_TOPIC_OR_PARTITION: +if (!tolerateUnknownTopics) { Review Comment: > am not aware of any cases where brokers might return both topic- and topic partition-level errors for a metadata request, and if there are none, then this change should be safe. Asking because my understanding is the only case where we would be concerned about topic and partition level errors is when the topic level error is unknown topic or partition but somehow the partitions for that topic have a different error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15435) KRaft migration record counts in log message are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15435: --- Fix Version/s: 3.6.0 > KRaft migration record counts in log message are incorrect > -- > > Key: KAFKA-15435 > URL: https://issues.apache.org/jira/browse/KAFKA-15435 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.0 > > > The counting logic in MigrationManifest is incorrect and produces invalid > output. This information is critical for users wanting to validate the result > of a migration. > > {code} > Completed migration of metadata from ZooKeeper to KRaft. 7117 records were > generated in 54253 ms across 1629 batches. The record types were > {TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, > ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. > {code} > Due to the logic bug, the counts will never exceed 2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316531927 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } +/** + * Handle a generic group LeaveGroup request. + * + * @param contextThe request context. + * @param requestThe actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ +public CoordinatorResult genericGroupLeave( +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); +if (group.isInState(DEAD)) { +return new CoordinatorResult<>(Collections.emptyList(), +new LeaveGroupResponseData() +.setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) +); +} + +CoordinatorResult coordinatorResult = EMPTY_RESULT; +List memberResponses = new ArrayList<>(); + +for (MemberIdentity member : request.members()) { +// The LeaveGroup API allows administrative removal of members by GroupInstanceId +// in which case we expect the MemberId to be undefined. +if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { +if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { +coordinatorResult = removeCurrentMemberFromGenericGroup( +group, +group.staticMemberId(member.groupInstanceId()), +member.reason() +); +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) +); +} +} else if (group.isPendingMember(member.memberId())) { +coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); +timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); +log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", +member.memberId(), group.groupId()); + +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +try { +group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); +coordinatorResult = removeCurrentMemberFromGenericGroup( Review Comment: Added test cases as suggested below (`testJoinedMemberPendingMemberBatchLeaveGroup` and `testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember`). Even though the group becomes empty, as there still is a pending member `hasAllMembersJoined()` returns false and does not generate records. Only after the last pending member leaves the group really becomes empty. So, the leave group request can only produce records in the last valid member. Invalid members will not overwrite the coordinatorResult. I understand that this looks unsafe, should we append the coordinatorResults and merge them instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316531927 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } +/** + * Handle a generic group LeaveGroup request. + * + * @param contextThe request context. + * @param requestThe actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ +public CoordinatorResult genericGroupLeave( +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); +if (group.isInState(DEAD)) { +return new CoordinatorResult<>(Collections.emptyList(), +new LeaveGroupResponseData() +.setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) +); +} + +CoordinatorResult coordinatorResult = EMPTY_RESULT; +List memberResponses = new ArrayList<>(); + +for (MemberIdentity member : request.members()) { +// The LeaveGroup API allows administrative removal of members by GroupInstanceId +// in which case we expect the MemberId to be undefined. +if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { +if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { +coordinatorResult = removeCurrentMemberFromGenericGroup( +group, +group.staticMemberId(member.groupInstanceId()), +member.reason() +); +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) +); +} +} else if (group.isPendingMember(member.memberId())) { +coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); +timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); +log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", +member.memberId(), group.groupId()); + +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +try { +group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); +coordinatorResult = removeCurrentMemberFromGenericGroup( Review Comment: Added a test case as suggested below (`testJoinedMemberPendingMemberBatchLeaveGroup` and `testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember`). Even though the group becomes empty, as there still is a pending member `hasAllMembersJoined()` returns false and does not generate records. Only after the last pending member leaves the group really becomes empty. So, the leave group request can only produce records in the last valid member. I understand that this looks unsafe, should we append the coordinatorResults and merge them instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 opened a new pull request, #14345: MINOR: Range assignor changes
rreddy-22 opened a new pull request, #14345: URL: https://github.com/apache/kafka/pull/14345 Minor changes to make the range assignor coherent with the other assignors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters
gharris1727 commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1316480601 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -380,6 +387,110 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue converterConfigValue, +Class converterInterface, +Function configDefAccessor, +String converterName, +String converterProperty, +ConverterType converterType Review Comment: I believe this is the only thing which makes this a converter-specific function. All of the other logic appears generic across all plugin types. It could be replaced with a `Consumer>` function which mutates the config before it is passed to the plugin, or a `Function, Map>` if you wanted to be pure. Then all of the variable names can just become `plugin` instead of `converter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters
gharris1727 commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-1707372782 > I honestly find the EnrichablePlugin class pretty hard to read, and prefer the merge style when it can be used. Sure, I'll buy that. I'm fine with migrating away from EnrichablePlugin to something else as long as it is a common abstraction. My concern here was just that we were adding a distinct third style of validating configurations when there appeared to be a lot of common functionality that could be shared. > If the stylistic suggestions are not blockers for review, but blockers for merging, do you think we could establish the ideal user-facing behavior here and then use a separate PR for a refactoring? I can target this branch with that PR (which would allow review to take place on it without having to merge this one to trunk), or target trunk (if we feel comfortable merging this without blocking on a refactor). I'm fine with reviewing this as-is and merging to trunk, and then refactoring the other two strategies in a follow-up. I think using lambdas is more appropriate than anonymous classes which are constructed for just one method call and then discarded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters
gharris1727 commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1316465707 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -380,6 +387,110 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue converterConfigValue, +Class converterInterface, +Function configDefAccessor, +String converterName, +String converterProperty, +ConverterType converterType +) { +String converterClass = connectorConfig.get(converterProperty); + +if (converterClass == null +|| converterConfigValue == null +|| !converterConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T converterInstance; +try { +converterInstance = Utils.newInstance(converterClass, converterInterface); +} catch (ClassNotFoundException | RuntimeException e) { +log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", converterName, converterClass, e); +converterConfigValue.addErrorMessage("Failed to load class " + converterClass + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} + +try (Utils.UncheckedCloseable close = () -> Utils.maybeCloseQuietly(converterInstance, converterName + " " + converterClass);) { Review Comment: > It felt slightly more readable to use a try-with-resources block than a finally block (especially since we don't have any catch blocks). I suppose there's some subjectivity involved here, since I found the UncheckedClosable and explicit lambda to be a bit hard to read initially, but understood after some inspection. Using try-finally without any catch clauses is a pretty normal arrangement, and I think more developers would be used to it as compared to using a lambda along with our special UncheckedClosable. AFAIU the try-with-resources construct was added to help with cleaning up AutoClosable resources which can throw exceptions both during opening and closing, where it becomes tedious to set up the `finally` to perform the cleanup correctly. In this specific situation, the `newInstance` (open) errors are handled by a separate `try`, and the close errors are handled by `closeQuietly`, so none of the value-add of the try-with-resources is apparent. I See what you mean though, as we _do_ have exceptions from open and close, and we have somewhat tedious error handling surrounding them. But since the objects we're instantiating are "sometimes AutoClosable", the try-with-resources type checking is going to get in the way. Using try-with-resources to handle open and close together, you could have a wrapper class `MaybeClosable implements UncheckedClosable, Supplier` along with a method `static MaybeClosable quiet(T, String)` that you would call like this: ``` try (MaybeClosable wrapper = MaybeClosable.quiet(Utils.newInstance(...), "converter (for validation)") { Converter converter = wrapper.get(); // do stuff } catch (RuntimeException e) { // exceptions from newInstance and do stuff } // exceptions from close are logged instead of propagated/suppressed ``` I think that would type-check but I haven't tried it out myself. Everything is just a rough suggestion, so please iterate on the names or ergonomics if you like the idea. Without the closeQuietly semantics, it would look like this: ``` try (MaybeClosable wrapper = MaybeClosable.propagate(Utils.newInstance(...)) { Converter converter = wrapper.get(); // do stuff } catch (RuntimeException e) { // exceptions from newInstance and do stuff } // exceptions from close are not checked, but propagated or suppressed. ``` > Is this correct? The interface was introduced in https://github.com/apache/kafka/pull/8618, where it was used on the left-hand side of a try-with-resources assignment that captured a method reference which did not throw a checked exception. When I said "exception throwing operation" I didn't mean "method that throws a checked exception", because I was thinking about how methods can throw RuntimeExceptions whether or not they have checked exceptions in the signature. I probably should have said "method that throws unchecked exceptions" to be unambiguous. Yes this PR and the linked PR both did not have checked exceptions, but they differ because one throws RuntimeExceptions and one does
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316456003 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * Assigns partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: + * balance > rack matching (when applicable) > stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final Set subscriptionIds; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetAnExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +this.potentiallyUnfilledMembers = new HashMap<>(); +this.unfilledMembers = new HashMap<>(); +this.newAssignment = new HashMap<>(); +this.useRackAwareStrategy = rackInfo.useRackStrategy; +// Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary +// as all sticky partitions are retained until a member meets its quota. +this.currentPartitionOwners = useRackAwareStrategy ? new HashMap<>() : Collections.emptyMap(); +} + +/** + * Here's the step-by-step breakdown of the assignment process: + * + * Compute the quotas of partitions for each member based on the total partitions and member count. + * For existing assignments, retain partitions based on the determined quota and member's rack compatibility. + * If a partition's rack mismatches with its owner, track it for future use. + * Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions. + * Derive the unassigned partitions by taking the difference between total partitions
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316439651 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * Assigns partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: + * balance > rack matching (when applicable) > stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final Set subscriptionIds; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetAnExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +this.potentiallyUnfilledMembers = new HashMap<>(); +this.unfilledMembers = new HashMap<>(); +this.newAssignment = new HashMap<>(); +this.useRackAwareStrategy = rackInfo.useRackStrategy; Review Comment: yep makes sense I removed it, I thought in the future if we wanted to have a flag in the assignor to switch it on or off -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316429235 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * Assigns partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: + * balance > rack matching (when applicable) > stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on code in PR #14314: URL: https://github.com/apache/kafka/pull/14314#discussion_r1316416748 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -5634,16 +5637,31 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { final TopicPartition tp1 = new TopicPartition("foo", 0); -try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { -env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +Map> responsesAndFailures = new HashMap<>(); Review Comment: extremely minor nit (we could do in a followup and not a blocker bug) but could we parameterize this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on code in PR #14314: URL: https://github.com/apache/kafka/pull/14314#discussion_r1316414818 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java: ## @@ -72,6 +79,12 @@ private void handleTopicError( ) { switch (topicError) { case UNKNOWN_TOPIC_OR_PARTITION: +if (!tolerateUnknownTopics) { Review Comment: > the operation would be retried if any metadata error was reported for any individual topic partition, even if an error was also reported for the entire topic. With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions. This change only changes the behavior for unknown topic or partition errors right? Leader/broker not available continue to be retriable, topic auth, invalid topic, and other errors continue to fail the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316408646 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ## @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * Assigns partitions to members of a consumer group ensuring a balanced distribution with + * considerations for sticky assignments and rack-awareness. + * The order of priority of properties during the assignment will be: + * balance > rack matching (when applicable) > stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +private final AssignmentSpec assignmentSpec; +private final SubscribedTopicDescriber subscribedTopicDescriber; +// List of topics subscribed to by all members. +private final Set subscriptionIds; +private final RackInfo rackInfo; +// Count of members to receive an extra partition beyond the minimum quota, +// to account for the distribution of the remaining partitions. +private int remainingMembersToGetAnExtraPartition; +// Map of members to the remaining number of partitions needed to meet the minimum quota, +// including members eligible for an extra partition. +private final Map potentiallyUnfilledMembers; +// Members mapped to the remaining number of partitions needed to meet the full quota. +// Full quota = minQuota + one extra partition (if applicable). +private Map unfilledMembers; +private List unassignedPartitions; +private final Map newAssignment; +// Tracks the current owner of each partition when using rack-aware strategy. +// Current refers to the existing assignment. +private final Map currentPartitionOwners; +// Indicates if a rack aware assignment can be done. +// True if racks are defined for both members and partitions. +boolean useRackAwareStrategy; + +OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.assignmentSpec = assignmentSpec; +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); Review Comment: yeah correct I added a check in the assign method so its not possible anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316406386 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GeneralUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { +private static final Logger log = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); + +@Override +protected GroupAssignment buildAssignment() { +return null; + Review Comment: ack will remove it but we can ignore this file for now since the implementation isn't in yet, just needed the initial template for now to get the conditional implementation of the specific assignment builder based on the subscriptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14305: KAFKA-14274: [1/7] basic refactoring
junrao commented on code in PR #14305: URL: https://github.com/apache/kafka/pull/14305#discussion_r1316404620 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -363,6 +365,20 @@ private CompletableFuture> chainFuture(fi } }); } + +@Override +public String toString() { +return "OffsetFetchRequestState{" + +"requestedPartitions=" + requestedPartitions + +", requestedGeneration=" + requestedGeneration + +", future=" + future + +", exponentialBackoff=" + exponentialBackoff + +", lastSentMs=" + lastSentMs + +", lastReceivedMs=" + lastReceivedMs + +", numAttempts=" + numAttempts + +", backoffMs=" + backoffMs + Review Comment: Yes, I was referring to the `toString` method. It seems that every subclass of `RequestState` directly gets every field in `RequestState` to build its own string. This creates duplicated code and can be a bit error prone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on PR #14314: URL: https://github.com/apache/kafka/pull/14314#issuecomment-1707306276 Sorry for the delay @C0urante and thanks for the explanation. It makes sense. Given that we moved the retries to `TopicAdmin::retryEndOffsets` does this mean there is not any known wide usage or need for this retry? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15437) Add metrics about open iterators
Matthias J. Sax created KAFKA-15437: --- Summary: Add metrics about open iterators Key: KAFKA-15437 URL: https://issues.apache.org/jira/browse/KAFKA-15437 Project: Kafka Issue Type: New Feature Components: streams Reporter: Matthias J. Sax Kafka Streams allows to create iterators over state stores. Those iterator must get closed to free up resources (especially for RocksDB). – We regularly get user reports of "resource leaks" that can be pinned down to leaking (ie not-closed) iterators. To simplify monitoring, it would be helpful to add a metric about open iterators to allow users to alert and pin-point the issue directly (and before the actually resource leak is observed). We might want to have a DEBUG level per-store metric (to allow identifying the store in question quickly), but an already rolled up INFO level metric for the whole application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316396258 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -49,6 +51,8 @@ * */ public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); Review Comment: removing this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1316395025 ## clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.message.ControllerRegistrationResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ControllerRegistrationRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +private final ControllerRegistrationRequestData data; + +public Builder(ControllerRegistrationRequestData data) { +super(ApiKeys.BROKER_HEARTBEAT); Review Comment: Yes, nice catch. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393751 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() { } } +public short registerControllerRecordVersion() { +if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) { Review Comment: Sorry, this was left over from when the feature was in `IBP_3_6_IV2` previously. Fixed now. ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() { } } +public short registerControllerRecordVersion() { +if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) { Review Comment: Sorry, this was left over from when the feature was in `IBP_3_6_IV2` previously. Fixed now to be `IBP_3_7_IV0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393088 ## clients/src/main/resources/common/message/ControllerRegistrationRequest.json: ## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 70, Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316390836 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdPartition.java: ## @@ -0,0 +1,74 @@ +/* Review Comment: Its not the exact same, that one has topicId mapped to topicPartition here its just the partition number -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
gharris1727 commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316388557 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensure that the class is concrete (i.e., not abstract). If it is, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param cls the class to check; may not be null + * @param name the name of the type of class to use in the error message; e.g., "Transform", + * "Interceptor", or even just "Class"; may be null + * @throws ConfigException if the class is not concrete + */ +public static void ensureConcrete(Class cls, String name) { +Objects.requireNonNull(cls); +if (isBlank(name)) +name = "Class"; +if (Modifier.isAbstract(cls.getModifiers())) { +String childClassNames = Stream.of(cls.getClasses()) +.filter(cls::isAssignableFrom) Review Comment: > I guess we could add that check as well and have something like Utils::ensureConcreteSubclass--how does that sound? Yeah I imagine a `ensureConcreteSubclass(Class cls, Class baseClass)` that: 1. asserts that cls is concrete and baseClass.isAssignableFrom(cls) is true 2. if not valid, suggests child classes that are concrete and`baseClass::isAssignableFrom` is true. I don't think that it would compose well with the current `ensureConcrete` method though, so it would probably replace what is there already. > I'm on the fence about it potentially doing too much for a utility method, but it does provide user-facing advantages. I thought that about the child class suggestion logic, but relented for the same reason. It significantly complicates this method which could otherwise be a single if condition, but that complication could help someone quickly resolve an obvious typo. I don't think that asserting `baseClass.isAssignableFrom(cls)` is too much for this utility method, because we should always have a baseClass in mind when defining a `CLASS` configuration. Users of the `AbstractConfig::getConfiguredInstance` methods already need to provide a baseClass for subclass checking and type safety at runtime, this utility method would help users to remember the same check at validation time. And worst-case, someone can just specify Object as the baseClass, and effectively disable the baseClass filtering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters
C0urante commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-1707264425 > I wonder if we can unify these approaches, and maybe even use the "enrich" patten for producer/consumer/admin instead of the "merge" style. I honestly find the `EnrichablePlugin` class pretty hard to read, and prefer the merge style when it can be used. The extra logic involved in supporting aliases and overriding the base `ConfigDef` is already fairly complex, and we'd also have to expand the class to allow plugins to optionally return null `ConfigDef` objects. If the stylistic suggestions are not blockers for review, but blockers for merging, do you think we could establish the ideal user-facing behavior here and then use a separate PR for a refactoring? I can target this branch with that PR (which would allow review to take place on it without having to merge this one to trunk), or target trunk (if we feel comfortable merging this without blocking on a refactor). And of course, if the stylistic suggestions are blockers for review, let me know and I can take a stab at that without doing anything fancy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
gharris1727 commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362502 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensure that the class is concrete (i.e., not abstract). If it is, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param cls the class to check; may not be null + * @param name the name of the type of class to use in the error message; e.g., "Transform", + * "Interceptor", or even just "Class"; may be null + * @throws ConfigException if the class is not concrete + */ +public static void ensureConcrete(Class cls, String name) { +Objects.requireNonNull(cls); +if (isBlank(name)) +name = "Class"; +if (Modifier.isAbstract(cls.getModifiers())) { +String childClassNames = Stream.of(cls.getClasses()) +.filter(cls::isAssignableFrom) +.filter(c -> !Modifier.isAbstract(c.getModifiers())) +.filter(c -> Modifier.isPublic(c.getModifiers())) +.map(Class::getName) +.collect(Collectors.joining(", ")); +String message = Utils.isBlank(childClassNames) ? +name + " is abstract and cannot be created." : +name + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; +throw new ConfigException(name, cls.getName(), message); Review Comment: I like this simplification, and it makes sense to generalize the error message as part of promoting this to the Utils class. `parseForValidate` will still attribute the error to the correct key/value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
gharris1727 commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362060 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends AutoCloseable { void close(); } +/** + * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface, + * and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + * + * {@code closeQuietly(task::stop, "source task");} + * + * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. + */ +public static void maybeCloseQuietly(Object maybeCloseable, String name) { Review Comment: I found more use-cases for this today, so on adding this helper. ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends AutoCloseable { void close(); } +/** + * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface, + * and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + * + * {@code closeQuietly(task::stop, "source task");} + * + * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method + * reference from a null object will result in a {@link NullPointerException}. In the example code above, + * it would be the caller's responsibility to ensure that {@code task} was non-null before attempting to + * use a method reference from it. + */ +public static void maybeCloseQuietly(Object maybeCloseable, String name) { Review Comment: I found more use-cases for this today, so on adding this helper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters
C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1316338908 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -380,6 +387,110 @@ protected Map validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } +private ConfigInfos validateConverterConfig( +Map connectorConfig, +ConfigValue converterConfigValue, +Class converterInterface, +Function configDefAccessor, +String converterName, +String converterProperty, +ConverterType converterType +) { +String converterClass = connectorConfig.get(converterProperty); + +if (converterClass == null +|| converterConfigValue == null +|| !converterConfigValue.errorMessages().isEmpty() +) { +// Either no custom converter was specified, or one was specified but there's a problem with it. +// No need to proceed any further. +return null; +} + +T converterInstance; +try { +converterInstance = Utils.newInstance(converterClass, converterInterface); +} catch (ClassNotFoundException | RuntimeException e) { +log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", converterName, converterClass, e); +converterConfigValue.addErrorMessage("Failed to load class " + converterClass + (e.getMessage() != null ? ": " + e.getMessage() : "")); +return null; +} + +try (Utils.UncheckedCloseable close = () -> Utils.maybeCloseQuietly(converterInstance, converterName + " " + converterClass);) { Review Comment: It felt slightly more readable to use a try-with-resources block than a finally block (especially since we don't have any catch blocks). You're correct that `Utils::maybeCloseQuietly` doesn't throw any checked exceptions, but I had to provide a left-hand type (that extended the `AutoCloseable` interface) to prove that to the compiler, which ruled out `AutoCloseable` and `Closeable` since both of those interface's `close` methods throw checked exceptions. Also, regarding this comment: > The unchecked closable should be for exception-throwing operations that should have their exceptions suppressed, but maybeCloseQuietly should never throw an exception. Is this correct? The interface was introduced in https://github.com/apache/kafka/pull/8618, where it was used on the left-hand side of a try-with-resources assignment that captured a method reference which did not throw a checked exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
philipnee commented on code in PR #14313: URL: https://github.com/apache/kafka/pull/14313#discussion_r1316332331 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); +this(adminConfig, Admin.create(adminConfig)); } -public TopicAdmin(Object bootstrapServers, Admin adminClient) { -this(bootstrapServers, adminClient, true); +public TopicAdmin(Map adminConfig, Admin adminClient) { +this(bootstrapServers(adminConfig), adminClient, true); +} + +// visible for testing +TopicAdmin(Admin adminClient) { +this(null, adminClient, true); Review Comment: sorry for being a type police : But this logic annoys me ``` public TopicAdmin(Object bootstrapServers, Admin adminClient) { this(bootstrapServers, adminClient, true); } // visible for testing TopicAdmin(Object bootstrapServers, Admin adminClient, boolean logCreation) { this.admin = adminClient; this.bootstrapServers = bootstrapServers != null ? bootstrapServers.toString() : ""; this.logCreation = logCreation; } ``` Can we just do map.getOrDefault() and perform an inline type cast? In general passing nulls around confuses me... ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); +this(adminConfig, Admin.create(adminConfig)); } -public TopicAdmin(Object bootstrapServers, Admin adminClient) { -this(bootstrapServers, adminClient, true); +public TopicAdmin(Map adminConfig, Admin adminClient) { +this(bootstrapServers(adminConfig), adminClient, true); +} + +// visible for testing +TopicAdmin(Admin adminClient) { +this(null, adminClient, true); Review Comment: also understood this is not your code ... So I can submit a minor patch for this if you don't want to modify your PR. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java: ## @@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link Admin} */ public TopicAdmin(Map adminConfig) { -this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig)); +this(adminConfig, Admin.create(adminConfig)); } -public TopicAdmin(Object bootstrapServers, Admin adminClient) { -this(bootstrapServers, adminClient, true); +public TopicAdmin(Map adminConfig, Admin adminClient) { +this(bootstrapServers(adminConfig), adminClient, true); +} + +// visible for testing +TopicAdmin(Admin adminClient) { +this(null, adminClient, true); Review Comment: sorry for being a type police : But this logic annoys me ``` public TopicAdmin(Object bootstrapServers, Admin adminClient) { this(bootstrapServers, adminClient, true); } // visible for testing TopicAdmin(Object bootstrapServers, Admin adminClient, boolean logCreation) { this.admin = adminClient; this.bootstrapServers = bootstrapServers != null ? bootstrapServers.toString() : ""; this.logCreation = logCreation; } ``` Can we just do map.getOrDefault() and perform an inline type cast? In general passing nulls around confuses me... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15435) KRaft migration record counts in log message are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur reassigned KAFKA-15435: Assignee: David Arthur > KRaft migration record counts in log message are incorrect > -- > > Key: KAFKA-15435 > URL: https://issues.apache.org/jira/browse/KAFKA-15435 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > > The counting logic in MigrationManifest is incorrect and produces invalid > output. This information is critical for users wanting to validate the result > of a migration. > > {code} > Completed migration of metadata from ZooKeeper to KRaft. 7117 records were > generated in 54253 ms across 1629 batches. The record types were > {TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, > ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. > {code} > Due to the logic bug, the counts will never exceed 2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
C0urante commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316327673 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +public class InstantiableClassValidator implements ConfigDef.Validator { + +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +// The value will be null if the class couldn't be found; no point in performing follow-up validation +return; +} + +Class cls = (Class) value; +try { +Object o = cls.getDeclaredConstructor().newInstance(); +Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation"); +} catch (NoSuchMethodException e) { +throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : "")); +} catch (ReflectiveOperationException | RuntimeException e) { +throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : "")); +} Review Comment: Good call, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
C0urante commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316327532 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensure that the class is concrete (i.e., not abstract). If it is, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param cls the class to check; may not be null + * @param name the name of the type of class to use in the error message; e.g., "Transform", + * "Interceptor", or even just "Class"; may be null + * @throws ConfigException if the class is not concrete + */ +public static void ensureConcrete(Class cls, String name) { +Objects.requireNonNull(cls); +if (isBlank(name)) +name = "Class"; +if (Modifier.isAbstract(cls.getModifiers())) { +String childClassNames = Stream.of(cls.getClasses()) +.filter(cls::isAssignableFrom) +.filter(c -> !Modifier.isAbstract(c.getModifiers())) +.filter(c -> Modifier.isPublic(c.getModifiers())) +.map(Class::getName) +.collect(Collectors.joining(", ")); +String message = Utils.isBlank(childClassNames) ? +name + " is abstract and cannot be created." : +name + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; +throw new ConfigException(name, cls.getName(), message); Review Comment: 臘 Thanks, good catch. In pursuit of simplicity I've taken a stab at using the single-arg `ConfigDef` constructor, and using a one-size-fits-all error message that just says "This class" instead of, e.g., "Transform". Since all current uses for this code path involve (directly or indirectly) hitting a `ConfigDef.Validator` as part of config validation, I think the user-facing impact should only go as far as the small wording change. Let me know if you think it's better to preserve existing behavior, though. ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensure that the class is concrete (i.e., not abstract). If it is, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param cls the class to check; may not be null + * @param name the name of the type of class to use in the error message; e.g., "Transform", + * "Interceptor", or even just "Class"; may be null + * @throws ConfigException if the class is not concrete + */ +public static void ensureConcrete(Class cls, String name) { +Objects.requireNonNull(cls); +if (isBlank(name)) +name = "Class"; +if (Modifier.isAbstract(cls.getModifiers())) { +String childClassNames = Stream.of(cls.getClasses()) +.filter(cls::isAssignableFrom) +.filter(c -> !Modifier.isAbstract(c.getModifiers())) +.filter(c -> Modifier.isPublic(c.getModifiers())) +.map(Class::getName) +.collect(Collectors.joining(", ")); +String message = Utils.isBlank(childClassNames) ? +name + " is abstract and cannot be created." : +name + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; +throw new ConfigException(name, cls.getName(), message); Review Comment: 臘 Thanks, good catch. In pursuit of simplicity I've taken a stab at using the single-arg `ConfigDef` constructor, and using a one-size-fits-all error message that just says "This class" instead of, e.g., "Transform". Since all current uses for this code path involve (directly or indirectly) hitting a `ConfigDef.Validator` as part of config validation, I think the user-facing impact should only go as far as the small wording change. Let me know if you think it's better to preserve existing behavior, though. ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensure that the class is concrete (i.e., not abstract). If it is, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param cls the class to check; may not be null + * @param name the name of the type of class to use in the
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316322730 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } +/** + * Handle a generic group LeaveGroup request. + * + * @param contextThe request context. + * @param requestThe actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ +public CoordinatorResult genericGroupLeave( +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); +if (group.isInState(DEAD)) { +return new CoordinatorResult<>(Collections.emptyList(), +new LeaveGroupResponseData() +.setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) +); +} + +CoordinatorResult coordinatorResult = EMPTY_RESULT; +List memberResponses = new ArrayList<>(); + +for (MemberIdentity member : request.members()) { +// The LeaveGroup API allows administrative removal of members by GroupInstanceId +// in which case we expect the MemberId to be undefined. +if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { +if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { +coordinatorResult = removeCurrentMemberFromGenericGroup( +group, +group.staticMemberId(member.groupInstanceId()), +member.reason() +); +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) +); +} +} else if (group.isPendingMember(member.memberId())) { +coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); +timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); +log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", +member.memberId(), group.groupId()); + +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} else { +try { +group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); +coordinatorResult = removeCurrentMemberFromGenericGroup( +group, +member.memberId(), +member.reason() +); +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +); +} catch (KafkaException e) { +memberResponses.add( +new MemberResponse() +.setMemberId(member.memberId()) +.setGroupInstanceId(member.groupInstanceId()) +.setErrorCode(Errors.forException(e).code()) +); +} +} +} +return new CoordinatorResult<>( +coordinatorResult.records(), +new LeaveGroupResponseData() +.setMembers(memberResponses), +coordinatorResult.appendFuture() +); +} + +/** + * Remove a member from the group. Cancel member's heartbeat, and prepare rebalance + * or complete the join phase if necessary. + * + * @param group The generic group. + * @param memberId The member id. + * @param
[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes
C0urante commented on code in PR #14304: URL: https://github.com/apache/kafka/pull/14304#discussion_r1316306495 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends AutoCloseable { void close(); } +/** + * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface, + * and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + * + * {@code closeQuietly(task::stop, "source task");} + * + * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method Review Comment: 臘 Thanks, I had a feeling I was missing something. Considering the utility of this method is that it's type-blind, I'm not sure we need to worry about it being invoked with a method reference. What are the odds that a method reference returns something that a dev, at compile time, isn't sure is `AutoCloseable` or not? I've tentatively removed all of the language around method references from this section. If you think it's better to err on the side of safety, I can restore it with your suggestion applied. ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends AutoCloseable { void close(); } +/** + * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface, + * and if an exception is thrown, it is logged at the WARN level. + * Be cautious when passing method references as an argument. For example: + * + * {@code closeQuietly(task::stop, "source task");} + * + * Although this method gracefully handles null {@link AutoCloseable} objects, attempts to take a method Review Comment: 臘 Thanks, I had a feeling I was missing something. Considering the utility of this method is that it's type-blind, I'm not sure we need to worry about it being invoked with a method reference. What are the odds that a method reference returns something that a dev, at compile time, isn't sure is `AutoCloseable` or not? I've tentatively removed all of the language around method references from this section. If you think it's better to err on the side of safety, I can restore it with your suggestion applied. ## connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.lang.reflect.Modifier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConcreteSubClassValidator implements ConfigDef.Validator { +private final Class expectedSuperClass; + +private ConcreteSubClassValidator(Class expectedSuperClass) { +this.expectedSuperClass = expectedSuperClass; +} + +public static ConcreteSubClassValidator forSuperClass(Class expectedSuperClass) { +return new ConcreteSubClassValidator(expectedSuperClass); +} + +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +// The value will be null if the class couldn't be found; no point in performing follow-up validation +return; +} Review Comment: Cool, filed https://issues.apache.org/jira/browse/KAFKA-15436 to track ## connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except
[jira] [Created] (KAFKA-15436) Custom ConfigDef validators are invoked with null when user-provided value does not match type
Chris Egerton created KAFKA-15436: - Summary: Custom ConfigDef validators are invoked with null when user-provided value does not match type Key: KAFKA-15436 URL: https://issues.apache.org/jira/browse/KAFKA-15436 Project: Kafka Issue Type: Bug Reporter: Chris Egerton Filed in response to [discussion on a tangentially-related PR|https://github.com/apache/kafka/pull/14304#discussion_r1310039190]. h3. Background The [ConfigDef.Validator interface|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html] can be used to add custom per-property validation logic to a [ConfigDef|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html] instance. This can serve many uses, including but not limited to: * Ensuring that the value for a string property matches the name of a Java enum type * Ensuring that the value for an integer property falls within the range of valid port numbers * Ensuring that the value for a class property has a public, no-args constructor and/or implements a certain interface This validation logic can be invoked directly via [ConfigDef::validate|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validate(java.util.Map)] or [ConfigDef::validateAll|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validateAll(java.util.Map)], or indirectly when instantiating an [AbstractConfig|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/AbstractConfig.html]. When a value is validated by a {{ConfigDef}} instance, the {{ConfigDef}} first verifies that the value adheres to the expected type. For example, if the "raw" value is the string {{"345"}} and the property is defined with the [INT type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#INT], then the value is valid (it is parsed as the integer {{{}345{}}}). However, if the same raw value is used for a property defined with the [BOOLEAN type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN], then the value is invalid (it cannot be parsed as a boolean). h3. Problem When a raw value is invalid for the type of the property it is used for (e.g., {{"345"}} is used for a property defined with the [BOOLEAN type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN]), custom validators for the property are still invoked, with a value of {{{}null{}}}. This can lead to some counterintuitive behavior, and may necessitate that implementers of the {{ConfigDef.Validator}} interface catch cases where the value is {{null}} and choose not to report any errors (with the assumption that an error will already be reported by the {{ConfigDef}} regarding its failure to parse the raw value with the expected type). We may consider skipping custom validation altogether when the raw value for a property cannot be parsed with the expected type. On the other hand, it's unclear if there are compatibility concerns about this kind of change. If we decide to change this behavior, we should try to assess which code paths may lead to custom validators being invoked, which use cases correspond to which of these code paths, and whether this behavioral change has a chance to negatively impact these use cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tylerbertrand opened a new pull request, #14344: Resolve checkstyle cache miss
tylerbertrand opened a new pull request, #14344: URL: https://github.com/apache/kafka/pull/14344 Resolves cache misses in `checkstyle` tasks due to absolute paths in `configProperties`. Sets `configDirectory` extension property, which is made available by the `checkstyle` plugin as `${config_loc}` in the `checkstyle` xml files, as shown in the Checkstyle Gradle docs [here](https://docs.gradle.org/current/userguide/checkstyle_plugin.html#sec:checkstyle_built_in_variables). The absolute paths set in `configProperties` are then replaced by relative paths from `configDirectory`. Because the header and suppression config file names are static and only referenced once, these were removed from `configProperties` and the file names are given directly in `checkstyle.xml` [Task input comparison showing input differences between builds causing cache miss](https://ge.solutions-team.gradle.com/c/udumk6sv5uwbw/an5e6pclyiblm/task-inputs?cacheability=cacheable,overlapping-outputs,validation-failure) [Task input comparison with fix applied - no differences between builds](https://ge.solutions-team.gradle.com/c/ojlblvuooowjo/rrubheuf5ex5g/task-inputs?cacheability=cacheable,overlapping-outputs,validation-failure) ### Tests Manually verified suppressions are working as expected: * [checkStyleMain FAILS when calling `System.exit()` in `MessageGenerator.java` with that suppression DISABLED](https://ge.solutions-team.gradle.com/s/34amuqemmyili/failure#1) * [checkStyleMain PASSES when calling `System.exit()` in `MessageGenerator.java` with that suppression ENABLED](https://ge.solutions-team.gradle.com/s/s62zwn5gfsu54/failure) Manually verified header check is working as expected: * [checkStyleMain FAILS when java file is missing license header](https://ge.solutions-team.gradle.com/s/rjjqrqrmam6oq/failure#1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-9800. Fix Version/s: 3.7.0 Resolution: Fixed Merged [https://github.com/apache/kafka/pull/14111] to trunk. > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Andrew Schofield >Priority: Major > Labels: KIP-580, client > Fix For: 3.7.0 > > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for retry backoff ms. > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, I already wrapped the exponential backoff/timeout util class in > my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of attempts and returns the backoff/timeout value at > the corresponding level. Thus, we can add a new class property to those > classes containing retriable data in order to record the number of failed > attempts. > > Changes: > KafkaProducer: > # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each > ProducerBatch in Accumulator, which already has an attribute attempts > recording the number of failed attempts. So we can let the Accumulator > calculate the new retry backoff for each bach when it enqueues them, to avoid > instantiate the util class multiple times. > # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new > class property of type `Long` to record the number of attempts. > KafkaConsumer: > # Some synchronous retry use cases. Record the failed attempts in the > blocking loop. > # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). > Though the actual requests are packed for each node, the current > implementation is applying backoff to each topic partition, where the backoff > value is kept by TopicPartitionState. Thus, TopicPartitionState will have the > new property recording the number of attempts. > Metadata: > # Metadata lives as a singleton in many clients. Add a new property > recording the number of attempts > AdminClient: > # AdminClient has its own request abstraction Call. The failed attempts are > already kept by the abstraction. So probably clean the Call class logic a bit. > Existing tests: > # If the tests are testing the retry backoff, add a delta to the assertion, > considering the existence of the jitter. > # If the tests are testing other functionality, we can specify the same > value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make > the retry backoff static. We can use this trick to make the existing tests > compatible with the changes. > There're other common usages look like client.poll(timeout), where the > timeout passed in is the retry backoff value. We won't change these usages > since its underlying logic is nioSelector.select(timeout) and > nioSelector.selectNow(), which means if no interested op exists, the client > will block retry backoff milliseconds. This is an optimization when there's > no request that needs to be sent but the client is waiting for responses. > Specifically, if the client fails the inflight requests before the retry > backoff milliseconds passed, it still needs to wait until that amount of time > passed, unless there's a new request need to be sent. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
junrao commented on PR #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-1707163099 This probably should be closed now that https://github.com/apache/kafka/pull/14111 is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
C0urante commented on PR #14313: URL: https://github.com/apache/kafka/pull/14313#issuecomment-1707161908 @philipnee Let me know if you're still looking into this. Happy to wait on your analysis if you are, and if not, we can merge as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao merged PR #14111: URL: https://github.com/apache/kafka/pull/14111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316274795 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } +/** + * Handle a generic group LeaveGroup request. + * + * @param contextThe request context. + * @param requestThe actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ +public CoordinatorResult genericGroupLeave( +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); +if (group.isInState(DEAD)) { +return new CoordinatorResult<>(Collections.emptyList(), +new LeaveGroupResponseData() +.setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) +); +} + +CoordinatorResult coordinatorResult = EMPTY_RESULT; +List memberResponses = new ArrayList<>(); + +for (MemberIdentity member : request.members()) { +// The LeaveGroup API allows administrative removal of members by GroupInstanceId +// in which case we expect the MemberId to be undefined. +if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { +if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { Review Comment: thanks for the catch. will use existing `hasStaticMember` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
C0urante commented on PR #14336: URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707142324 BTW, same thought about backporting--I'm going to cherry-pick these changes on the `apache/kafka` repo onto the 3.5 and 3.6 branches, and for them to show up immediately, feel free to file a PR on `apache/kafka-site` to update the docs for 3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1316264712 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -306,6 +308,22 @@ public CoordinatorResult commitOffset( return offsetMetadataManager.commitOffset(context, request); } +/** + * Handles a OffsetCommit request. + * + * @param context The request context. + * @param request The actual OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponse response and Review Comment: fixed the javadocs to LeaveGroup. The other APIs also are using the actual API name (SyncGroup/JoinGroup...) so I will keep LeaveGroup as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
C0urante merged PR #14336: URL: https://github.com/apache/kafka/pull/14336 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
yashmayya commented on PR #14337: URL: https://github.com/apache/kafka/pull/14337#issuecomment-1707112259 Thanks for the review and suggestion Chris! I've raised https://github.com/apache/kafka-site/pull/538 to port over these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest
mumrah commented on code in PR #14342: URL: https://github.com/apache/kafka/pull/14342#discussion_r1316249748 ## metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java: ## @@ -60,7 +61,8 @@ public MigrationManifest build() { if (endTimeNanos == 0) { endTimeNanos = time.nanoseconds(); } -return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts); +Map orderedCounts = new TreeMap<>(counts); Review Comment: Putting into a treemap will order the map according to the keys. The effect of this is that the log message becomes deterministic, which is useful for testing. It's arguably also a bit nicer for end users if the output of the message is consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest
mumrah commented on code in PR #14342: URL: https://github.com/apache/kafka/pull/14342#discussion_r1316249748 ## metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java: ## @@ -60,7 +61,8 @@ public MigrationManifest build() { if (endTimeNanos == 0) { endTimeNanos = time.nanoseconds(); } -return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts); +Map orderedCounts = new TreeMap<>(counts); Review Comment: Putting into a treemap will order the map according to the keys. The effect of this is that the log message becomes deterministic, which is useful for testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762186#comment-17762186 ] Jimmy Wang commented on KAFKA-14517: [~dajac] Got it. I refactored the previous code and remove the client relative part. For sever part implement, added an extra arguments TopicsImage into maybeUpdateSubscribedTopicNames(). When GroupMetadataKey/Value is replayed, the regex will not be considered for the reason that when computeSubscriptionMetadata() is invoked, the topics which match the regex will be added into subscribedTopicCount. > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Jimmy Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lzyLuke commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest
lzyLuke commented on code in PR #14342: URL: https://github.com/apache/kafka/pull/14342#discussion_r1316244101 ## metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java: ## @@ -60,7 +61,8 @@ public MigrationManifest build() { if (endTimeNanos == 0) { endTimeNanos = time.nanoseconds(); } -return new MigrationManifest(total, batches, endTimeNanos - startTimeNanos, counts); +Map orderedCounts = new TreeMap<>(counts); Review Comment: qq: why we need a treemap here? Is the sorted order help here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
yashmayya commented on PR #14336: URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707093218 Ah whoops, that was not intentional. Good catch, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 opened a new pull request, #14343: Minor fix trailing white spaces on reviewers.py
rreddy-22 opened a new pull request, #14343: URL: https://github.com/apache/kafka/pull/14343 Minor: Fixing trailing white spaces on reviewers.py -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
C0urante commented on PR #14336: URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707076130 The new paragraph looks great, but was the `STOPPED` state summary in the section beginning with "Connectors and their tasks publish status updates" intentionally removed? That part looked good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
C0urante commented on PR #14337: URL: https://github.com/apache/kafka/pull/14337#issuecomment-1707068710 I've backported this to the 3.5 and 3.6 branches; it should be reflected once a new release is put out on those. @yashmayya if you'd like, feel free to file a PR against https://github.com/apache/kafka-site to port these changes over to 3.5, which will cause them to show up immediately on our current docs page (which uses 3.5, since that's the latest release). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
C0urante merged PR #14337: URL: https://github.com/apache/kafka/pull/14337 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #14342: KAFKA-15435 Fix counts in MigrationManifest
mumrah opened a new pull request, #14342: URL: https://github.com/apache/kafka/pull/14342 This patch fixes an issue in MigrationManifest where we weren't computing the record counts properly. A unit test was added to verify the fix. This patch also changes the counters in MigrationManifest to use an ordered map so we'll get deterministic output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +
[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
dajac commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1316210535 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1491,10 +1526,15 @@ public void testConsumerGroupOffsetFetchWithUnknownMemberId() { .setPartitionIndexes(Collections.singletonList(0)) ); +// Fetch offsets cases. +assertThrows(UnknownMemberIdException.class, +() -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE)); Review Comment: Yeah, I changed the member id to be nullable and null by default. I think that it makes more sense. So now, an empty string raises an unknown member id error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
jolshan commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1316209008 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1491,10 +1526,15 @@ public void testConsumerGroupOffsetFetchWithUnknownMemberId() { .setPartitionIndexes(Collections.singletonList(0)) ); +// Fetch offsets cases. +assertThrows(UnknownMemberIdException.class, +() -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE)); Review Comment: Or is this part of the latest commit that made null the no member ID indicator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
jolshan commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1316207292 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1491,10 +1526,15 @@ public void testConsumerGroupOffsetFetchWithUnknownMemberId() { .setPartitionIndexes(Collections.singletonList(0)) ); +// Fetch offsets cases. +assertThrows(UnknownMemberIdException.class, +() -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE)); Review Comment: Is this testing that the empty member string will fail (ie if no member, it must be null?) I'm trying to understand how this test is different than the one below. I also noticed that the test changed the default values from empty string to null. Was that an error before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
jolshan commented on code in PR #14321: URL: https://github.com/apache/kafka/pull/14321#discussion_r1316197710 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -181,10 +181,28 @@ public List fetchOffsets( String groupId, List topics, long committedOffset +) { Review Comment: It could be useful to add a small comment in case folks wanted to build upon these tests. But up to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
yashmayya commented on code in PR #14337: URL: https://github.com/apache/kafka/pull/14337#discussion_r1316183113 ## docs/toc.html: ## @@ -206,6 +206,14 @@ Plugin Discovery 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Connect Configuration Validation Review Comment: Yeah, it's definitely confusing - in fact I initially thought that I had messed it up somehow. I elected to keep it as is when I noticed that there are other headings with the same issue too (in other sections) but `Configuration Validation` sounds good to me so I've made that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
yashmayya commented on code in PR #14336: URL: https://github.com/apache/kafka/pull/14336#discussion_r1316175988 ## docs/connect.html: ## @@ -1078,7 +1079,7 @@ Kafka Connect -It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. The pause state is persistent, so even if you restart the cluster, the connector will not begin message processing again until the task has been resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted. +It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers pause / stop / resume APIs. While a source connector is paused or stopped, Connect will stop polling it for additional records. While a sink connector is paused or stopped, Connect will stop pushing new messages to it. The paused / stopped state is persistent, so even if you restart the cluster, the connector will not begin message processing again until it has been resumed. For a paused connector, any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. When a connector is stopped, however, its tasks are shut down and any resources claimed by its tasks are deallocated. This is more efficient from a resource u sage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted. Review Comment: Thanks, that sounds good to me ## docs/connect.html: ## @@ -1078,7 +1079,7 @@ Kafka Connect -It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. The pause state is persistent, so even if you restart the cluster, the connector will not begin message processing again until the task has been resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted. +It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers pause / stop / resume APIs. While a source connector is paused or stopped, Connect will stop polling it for additional records. While a sink connector is paused or stopped, Connect will stop pushing new messages to it. The paused / stopped state is persistent, so even if you restart the cluster, the connector will not begin message processing again until it has been resumed. For a paused connector, any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. When a connector is stopped, however, its tasks are shut down and any resources claimed by its tasks are deallocated. This is more efficient from a resource u sage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them
[GitHub] [kafka] C0urante commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
C0urante commented on code in PR #14337: URL: https://github.com/apache/kafka/pull/14337#discussion_r1316159088 ## docs/toc.html: ## @@ -206,6 +206,14 @@ Plugin Discovery 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Connect Configuration Validation Review Comment: This is getting split across two lines when it's rendered. As a quick fix, maybe we can rename this section to just `Configuration Validation`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect
C0urante commented on code in PR #14337: URL: https://github.com/apache/kafka/pull/14337#discussion_r1316159088 ## docs/toc.html: ## @@ -206,6 +206,14 @@ Plugin Discovery 8.3 Connector Development Guide + +Core Concepts and APIs +Developing a Simple Connector +Dynamic Input/Output Streams +Connect Configuration Validation Review Comment: This is getting split across two lines when it's rendered, which is a little confusing as it looks like there are different sections for `Connect Configuration` and `Validation`. As a quick fix, maybe we can rename this section to just `Configuration Validation`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie opened a new pull request, #14341: KAFKA-15307: Removes non-existent configs
Cerchie opened a new pull request, #14341: URL: https://github.com/apache/kafka/pull/14341 addresses 1/3 of https://issues.apache.org/jira/browse/KAFKA-15307 Removes pieces of configuration from the [developer guide to configuring streams.](https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html). These were the pieces of configuration not found in [StreamsConfig.java](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java). They include: - `partition.grouper` - `rack.aware.assignment.tag` - `rocksdb.config.setter` - `state.dir` - `topology.optimization` Note: `acks` and `replication.factor`, although not found in 'StreamsConfig.java', are listed in the developer guide to configuring streams. I am assuming they are defined somewhere else. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section
C0urante commented on code in PR #14336: URL: https://github.com/apache/kafka/pull/14336#discussion_r1316150923 ## docs/connect.html: ## @@ -1078,7 +1079,7 @@ Kafka Connect -It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers a pause/resume API. While a source connector is paused, Connect will stop polling it for additional records. While a sink connector is paused, Connect will stop pushing new messages to it. The pause state is persistent, so even if you restart the cluster, the connector will not begin message processing again until the task has been resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted. +It's sometimes useful to temporarily stop the message processing of a connector. For example, if the remote system is undergoing maintenance, it would be preferable for source connectors to stop polling it for new data instead of filling logs with exception spam. For this use case, Connect offers pause / stop / resume APIs. While a source connector is paused or stopped, Connect will stop polling it for additional records. While a sink connector is paused or stopped, Connect will stop pushing new messages to it. The paused / stopped state is persistent, so even if you restart the cluster, the connector will not begin message processing again until it has been resumed. For a paused connector, any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed. When a connector is stopped, however, its tasks are shut down and any resources claimed by its tasks are deallocated. This is more efficient from a resource u sage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that there may be a delay before all of a connector's tasks have transitioned to the PAUSED state since it may take time for them to finish whatever processing they were in the middle of when being paused. Additionally, failed tasks will not transition to the PAUSED state until they have been restarted. Review Comment: It feels like this muddies the waters a bit for the existing pause/resume API... what do you think about leaving this paragraph as it was, and adding a separate paragraph afterward that covers the `STOPPED` state, explaining that it goes beyond the `PAUSED` state by completely shutting down tasks instead of leaving them idling? We can also include information on offset management even if we intend to backport this PR; I can just make sure to either remove it or clarify that it refers to a feature available in later versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on pull request #14341: KAFKA-15307: Removes non-existent configs
Cerchie commented on PR #14341: URL: https://github.com/apache/kafka/pull/14341#issuecomment-1706973281 @mjsax for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15293) Update metrics doc to add tiered storage metrics
[ https://issues.apache.org/jira/browse/KAFKA-15293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-15293. Resolution: Fixed > Update metrics doc to add tiered storage metrics > > > Key: KAFKA-15293 > URL: https://issues.apache.org/jira/browse/KAFKA-15293 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Reporter: Abhijeet Kumar >Assignee: Abhijeet Kumar >Priority: Critical > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd merged pull request #14331: KAFKA-15293 Add documentation for tiered storage metrics
satishd merged PR #14331: URL: https://github.com/apache/kafka/pull/14331 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14331: KAFKA-15293 Add documentation for tiered storage metrics
satishd commented on PR #14331: URL: https://github.com/apache/kafka/pull/14331#issuecomment-1706953908 There are a few failed tests that are unrelated to this PR. Merging it to trunk and 3.6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14335: MINOR: update comment in consumeAction
satishd commented on PR #14335: URL: https://github.com/apache/kafka/pull/14335#issuecomment-1706906829 There are a few failed tests unrelated to this PR. Merging it to trunk and 3.6. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #14335: MINOR: update comment in consumeAction
satishd merged PR #14335: URL: https://github.com/apache/kafka/pull/14335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] riedelmax commented on pull request #14124: KAFKA-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes.
riedelmax commented on PR #14124: URL: https://github.com/apache/kafka/pull/14124#issuecomment-1706895064 @dajac Thank you for the review. I added unit tests and corrected the nits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15435) KRaft migration record counts in log message are incorrect
David Arthur created KAFKA-15435: Summary: KRaft migration record counts in log message are incorrect Key: KAFKA-15435 URL: https://issues.apache.org/jira/browse/KAFKA-15435 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.6.0 Reporter: David Arthur The counting logic in MigrationManifest is incorrect and produces invalid output. This information is critical for users wanting to validate the result of a migration. {code} Completed migration of metadata from ZooKeeper to KRaft. 7117 records were generated in 54253 ms across 1629 batches. The record types were {TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. {code} Due to the logic bug, the counts will never exceed 2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor
rreddy-22 commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1316096433 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -89,16 +93,17 @@ public MemberWithRemainingAssignments(String memberId, int remaining) { private Map> membersPerTopic(final AssignmentSpec assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) { Map> membersPerTopic = new HashMap<>(); Map membersData = assignmentSpec.members(); - +// Only add topic Ids to the map if they are present in the topic metadata. membersData.forEach((memberId, memberMetadata) -> { Collection topics = memberMetadata.subscribedTopicIds(); for (Uuid topicId : topics) { if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); +log.warn("Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."); Review Comment: Yep I'll raise a different PR for range assignor changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #14317: KAFKA-13973: Fix inflated block cache metrics
cadonna commented on PR #14317: URL: https://github.com/apache/kafka/pull/14317#issuecomment-1706873185 @nicktelford Yes, you are right about the existing tests! Sorry I was in a hurry and only skimmed them and missed that fact. I would like to have some tests that verify the behavior with one single column family (non-timestamped RocksDB) and multiple column families (timestamped RocksDB) so that we get a signal when something changes on RocksDB side and to kind of document the behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15414) remote logs get deleted after partition reassignment
[ https://issues.apache.org/jira/browse/KAFKA-15414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762138#comment-17762138 ] Kamal Chandraprakash commented on KAFKA-15414: -- [~fvisconte] Could you please take the latest trunk and try it out? Reopen the ticket if it doesn't work. Thanks! > remote logs get deleted after partition reassignment > > > Key: KAFKA-15414 > URL: https://issues.apache.org/jira/browse/KAFKA-15414 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.6.0 > > Attachments: image-2023-08-29-11-12-58-875.png > > > it seems I'm reaching that codepath when running reassignments on my cluster > and segment are deleted from remote store despite a huge retention (topic > created a few hours ago with 1000h retention). > It seems to happen consistently on some partitions when reassigning but not > all partitions. > My test: > I have a test topic with 30 partition configured with 1000h global retention > and 2 minutes local retention > I have a load tester producing to all partitions evenly > I have consumer load tester consuming that topic > I regularly reset offsets to earliest on my consumer to test backfilling from > tiered storage. > My consumer was catching up consuming the backlog and I wanted to upscale my > cluster to speed up recovery: I upscaled my cluster from 3 to 12 brokers and > reassigned my test topic to all available brokers to have an even > leader/follower count per broker. > When I triggered the reassignment, the consumer lag dropped on some of my > topic partitions: > !image-2023-08-29-11-12-58-875.png|width=800,height=79! Screenshot 2023-08-28 > at 20 57 09 > Later I tried to reassign back my topic to 3 brokers and the issue happened > again. > Both times in my logs, I've seen a bunch of logs like: > [RemoteLogManager=10005 partition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17] > Deleted remote log segment RemoteLogSegmentId > {topicIdPartition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17, > id=Mk0chBQrTyKETTawIulQog} > due to leader epoch cache truncation. Current earliest epoch: > EpochEntry(epoch=14, startOffset=46776780), segmentEndOffset: 46437796 and > segmentEpochs: [10] > Looking at my s3 bucket. The segments prior to my reassignment have been > indeed deleted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on pull request #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage
kamalcph commented on PR #14340: URL: https://github.com/apache/kafka/pull/14340#issuecomment-1706829016 @clolov @divijvaidya @showuon @abhijeetk88 @satishd Call for review. Please take a look! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph opened a new pull request, #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage
kamalcph opened a new pull request, #14340: URL: https://github.com/apache/kafka/pull/14340 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Chuckame commented on pull request #14157: KAFKA-15303: Avoid unnecessary re-serialization in FK-join
Chuckame commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1706826894 Hello @mjsax, do you know if there is potential advancement about this fix ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15221) Potential race condition between requests from rebooted followers
[ https://issues.apache.org/jira/browse/KAFKA-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762134#comment-17762134 ] Calvin Liu commented on KAFKA-15221: [~satish.duggana] The PR is pending review, it targets 3.6.0 and other 3.5.* versions depending on when we can finish the code review. cc [~dajac], [~david.mao] > Potential race condition between requests from rebooted followers > - > > Key: KAFKA-15221 > URL: https://issues.apache.org/jira/browse/KAFKA-15221 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > When the leader processes the fetch request, it does not acquire locks when > updating the replica fetch state. Then there can be a race between the fetch > requests from a rebooted follower. > T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is > not in ISR. > T1, broker 1 crashes. > T2 broker 1 is back online and receives a new broker epoch. Also, it sends a > new Fetch request. > T3 broker 0 receives the old fetch requests and decides to expand the ISR. > T4 Right before broker 0 starts to fill the AlterPartitoin request, the new > fetch request comes in and overwrites the fetch state. Then broker 0 uses the > new broker epoch on the AlterPartition request. > In this way, the AlterPartition request can get around KIP-903 and wrongly > update the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node
[ https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Vysotskiy updated KAFKA-15430: - Issue Type: Bug (was: Test) > Kafla create replca partition on controller node > > > Key: KAFKA-15430 > URL: https://issues.apache.org/jira/browse/KAFKA-15430 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.1 >Reporter: Andrii Vysotskiy >Priority: Major > > I have configuration 5 nodes (KRAFT mode), with next roles: 4 > broker+controller and 1 controller. Create topic with replication factor 5, > and it is created, and describe show that topic partition have 5 replicas. > > {{/opt/kafka/latest/bin/kafka-topics.sh --create > --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 > --partitions 1 --topic test5}} > > /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 > --bootstrap-server=dc1-prod-kafka-001-vs:9092 > Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 > ReplicationFactor: 5 Configs: segment.bytes=1073741824 > Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}} > > Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica > on the controller node, although in reality the replica is not created on the > controller node and there are no topic files in the log directory. > Is this expected behavior or not? Thanks. > I want to understand whether such behavior is the norm for Kafka > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Cerchie commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression
Cerchie commented on code in PR #14322: URL: https://github.com/apache/kafka/pull/14322#discussion_r1316003070 ## docs/design.html: ## @@ -136,8 +136,10 @@ -Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will -remain compressed in the log and will only be decompressed by the consumer. +Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For +example, it validates that the number of records in the batch is same as what batch header states. The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out Review Comment: thank you! sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano opened a new pull request, #14339: KAFKA-15422: Update documenttion for delegation tokens when working with Kafka with KRaft
pprovenzano opened a new pull request, #14339: URL: https://github.com/apache/kafka/pull/14339 Minor changes to documentation to reflect that Delegation Tokens now works with Kafka with KRaft. Cherry-Pick of https://github.com/apache/kafka/pull/14318 Reviewers: Manikumar Reddy -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano closed pull request #14338: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft
pprovenzano closed pull request #14338: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft URL: https://github.com/apache/kafka/pull/14338 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano opened a new pull request, #14338: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft
pprovenzano opened a new pull request, #14338: URL: https://github.com/apache/kafka/pull/14338 Minor changes to documentation to reflect that Delegation Tokens now works with Kafka with KRaft. This update was already merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nicktelford commented on pull request #14317: KAFKA-13973: Fix inflated block cache metrics
nicktelford commented on PR #14317: URL: https://github.com/apache/kafka/pull/14317#issuecomment-1706697411 > @nicktelford Thanks for the update! > > Could you also look why we did not catch this bug in `RocksDBMetricsIntegrationTest`or other metrics integration tests and add tests if needed? @cadonna Looks like that test doesn't verify the value of those metrics; it only checks the number of metrics registered under each name (i.e. that the expected metrics are registered and available, but not what they are). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org