[jira] [Updated] (KAFKA-15410) Add basic functionality integration test with tiered storage

2023-09-05 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15410:
-
Description: 
Add the below basic functionality integration tests with tiered storage:
 # PartitionsExpandTest
 # DeleteTopicWithSecondaryStorageTest
 # DeleteSegmentsByRetentionSizeTest
 # DeleteSegmentsByRetentionTimeTest
 # DeleteSegmentsDueToLogStartOffsetBreachTest
 # EnableRemoteLogOnTopicTest
 # ReassignReplicaExpandTest
 # ReassignReplicaMoveTest
 # ReassignReplicaShrinkTest and
 # TransactionsTestWithTieredStore

  was:
Add the below basic functionality integration tests with tiered storage:
 # PartitionsExpandTest
 # DeleteTopicWithSecondaryStorageTest
 # DeleteSegmentsByRetentionSizeTest
 # DeleteSegmentsByRetentionTimeTest
 # DeleteSegmentsDueToLogStartOffsetBreachTest
 # EnableRemoteLogOnTopicTest
 # ListOffsetsTest
 # ReassignReplicaExpandTest
 # ReassignReplicaMoveTest
 # ReassignReplicaShrinkTest and
 # TransactionsTestWithTieredStore


> Add basic functionality integration test with tiered storage
> 
>
> Key: KAFKA-15410
> URL: https://issues.apache.org/jira/browse/KAFKA-15410
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Add the below basic functionality integration tests with tiered storage:
>  # PartitionsExpandTest
>  # DeleteTopicWithSecondaryStorageTest
>  # DeleteSegmentsByRetentionSizeTest
>  # DeleteSegmentsByRetentionTimeTest
>  # DeleteSegmentsDueToLogStartOffsetBreachTest
>  # EnableRemoteLogOnTopicTest
>  # ReassignReplicaExpandTest
>  # ReassignReplicaMoveTest
>  # ReassignReplicaShrinkTest and
>  # TransactionsTestWithTieredStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15410) Add basic functionality integration test with tiered storage

2023-09-05 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15410:
-
Description: 
Add the below basic functionality integration tests with tiered storage:
 # PartitionsExpandTest
 # DeleteTopicTest
 # DeleteSegmentsByRetentionSizeTest
 # DeleteSegmentsByRetentionTimeTest
 # DeleteSegmentsDueToLogStartOffsetBreachTest
 # EnableRemoteLogOnTopicTest
 # ReassignReplicaExpandTest
 # ReassignReplicaMoveTest
 # ReassignReplicaShrinkTest and
 # TransactionsTestWithTieredStore

  was:
Add the below basic functionality integration tests with tiered storage:
 # PartitionsExpandTest
 # DeleteTopicWithSecondaryStorageTest
 # DeleteSegmentsByRetentionSizeTest
 # DeleteSegmentsByRetentionTimeTest
 # DeleteSegmentsDueToLogStartOffsetBreachTest
 # EnableRemoteLogOnTopicTest
 # ReassignReplicaExpandTest
 # ReassignReplicaMoveTest
 # ReassignReplicaShrinkTest and
 # TransactionsTestWithTieredStore


> Add basic functionality integration test with tiered storage
> 
>
> Key: KAFKA-15410
> URL: https://issues.apache.org/jira/browse/KAFKA-15410
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Add the below basic functionality integration tests with tiered storage:
>  # PartitionsExpandTest
>  # DeleteTopicTest
>  # DeleteSegmentsByRetentionSizeTest
>  # DeleteSegmentsByRetentionTimeTest
>  # DeleteSegmentsDueToLogStartOffsetBreachTest
>  # EnableRemoteLogOnTopicTest
>  # ReassignReplicaExpandTest
>  # ReassignReplicaMoveTest
>  # ReassignReplicaShrinkTest and
>  # TransactionsTestWithTieredStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage

2023-09-05 Thread via GitHub


satishd commented on code in PR #14340:
URL: https://github.com/apache/kafka/pull/14340#discussion_r1316738717


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1387,6 +1390,112 @@ public void testStopPartitionsWithDeletion() throws 
RemoteStorageException {
 verify(remoteLogMetadataManager, 
times(16)).updateRemoteLogSegmentMetadata(any());
 }
 
+/**
+ * This test asserts that the newly elected leader for a partition is able 
to find the log-start-offset.
+ * Note that the case tested here is that the previous leader deleted the 
log segments up-to offset 500. And, the
+ * log-start-offset didn't propagate to the replicas before the 
leader-election.
+ */
+@Test
+public void testFindLogStartOffset() throws RemoteStorageException, 
IOException {
+List epochEntries = new ArrayList<>();
+epochEntries.add(new EpochEntry(0, 0L));
+epochEntries.add(new EpochEntry(1, 250L));
+epochEntries.add(new EpochEntry(2, 550L));
+checkpoint.write(epochEntries);
+
+LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+long timestamp = time.milliseconds();
+int segmentSize = 1024;
+List segmentMetadataList = Arrays.asList(
+new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+500, 599, timestamp, brokerId, timestamp, segmentSize, 
truncateAndGetLeaderEpochs(epochEntries, 500L, 599L)),
+new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),

Review Comment:
   This segment contains records with epoch 2 instead of 1 but we are returning 
this for epoch 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy merged pull request #14339: KAFKA-15422: Update documenttion for delegation tokens when working with Kafka with KRaft

2023-09-05 Thread via GitHub


omkreddy merged PR #14339:
URL: https://github.com/apache/kafka/pull/14339


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm opened a new pull request, #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-05 Thread via GitHub


lianetm opened a new pull request, #14346:
URL: https://github.com/apache/kafka/pull/14346

   Implementation of the required functionality for resetting and validating 
positions in the new async consumer.
   
   This PR includes:
   - New application events `ResetPositionsApplicationEvent` and 
`ValidatePositionsApplicationEvent`, both handled by the same 
`OfffsetsRequestManager`. 
   - Integration of the reset/validate functionality in the new async consumer, 
to update fetch positions. 
   - Minor refactoring to extract functionality that is reused from both 
consumer implementations (moving logic without changes from `OffsetFetcher` 
into `OffsetFetchUtils`, and from `OffsetsForLeaderEpochClient` into 
`OffsetsForLeaderEpochUtils`)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager

2023-09-05 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15115:
---
Description: 
Introduce support for resetting positions in the new OffsetsRequestManager. 
This task will include a new event for the resetPositions calls performed from 
the new consumer, and the logic for handling such events in the 
OffsetRequestManager.

The reset positions implementation will keep the same behaviour as the one in 
the old consumer, but adapted to the new threading model. So it is based in a 
RESET_POSITIONS events that is submitted to the background thread, and then 
processed by the ApplicationEventProcessor. The processing itself is done by 
the OffsetRequestManager given that this will require a LIST_OFFSETS request 
for the partitions awaiting reset.

  was:
Introduce support for resetting positions in the new ListOffsetsRequestManager. 
This task will include a new event for the resetPositions calls performed from 
the new consumer, and the logic for handling such events in the 
ListOffsetRequestManager.

The reset positions implementation will keep the same behaviour as the one in 
the old consumer, but adapted to the new threading model. So it is based in a 
RESET_POSITIONS events that is submitted to the background thread, and the 
processed by the ApplicationEventProcessor. The processing itself is done by 
the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
request for the partitions awaiting reset.


> Implement resetPositions functionality in OffsetsRequestManager
> ---
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Introduce support for resetting positions in the new OffsetsRequestManager. 
> This task will include a new event for the resetPositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and then 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the OffsetRequestManager given that this will require a LIST_OFFSETS request 
> for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager

2023-09-05 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15115:
---
Summary: Implement resetPositions functionality in OffsetsRequestManager  
(was: Implement resetPositions functionality in ListOffsetRequestManager)

> Implement resetPositions functionality in OffsetsRequestManager
> ---
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Introduce support for resetting positions in the new 
> ListOffsetsRequestManager. This task will include a new event for the 
> resetPositions calls performed from the new consumer, and the logic for 
> handling such events in the ListOffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and the 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
> request for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15438) Review exception caching logic used for reset/validate positions in async consumer

2023-09-05 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15438:
--

 Summary: Review exception caching logic used for reset/validate 
positions in async consumer
 Key: KAFKA-15438
 URL: https://issues.apache.org/jira/browse/KAFKA-15438
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


The refactored async consumer reuses part of the core logic required for 
resetting and validating positions. That currently works on the principle of 
async requests, that reset/validate positions when responses are received. If 
the responses include errors, or if a validation verification fails (ex. log 
truncation detected), exceptions are saved in-memory, to be thrown on the next 
call to the reset/validate. Note that these functionalities are periodically 
called as part of the poll loop to update fetch positions before fetching 
records.

 

As an initial implementation, the async consumer reuses this same caching 
logic, as it has the asyn nature required. This task aims at reviewing the 
processing of `ResetApplicationEvent `and `ValidatePositionsApplicationEvent` 
to evaluate if they should rely on event completion instead, to propagate the 
errors found.  It would align with how other application events manage async 
requests and responses/errors for the new async consumer (based on 
CompletableFutures), but with the trade-off of heavily changing a caching logic 
that is currently reused by the legacy and the new consumer in the 
OffsetFetcherUtils.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-05 Thread via GitHub


showuon commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1316586591


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3487,8 +3487,12 @@ class ReplicaManagerTest {
 assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
   }
   if (enableRemoteStorage) {
-verify(mockRemoteLogManager, times(1))
-  .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), 
ArgumentMatchers.eq(false), any())
+if (throwIOException) {
+  verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())

Review Comment:
   Good catch. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316576861


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -306,6 +308,22 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a LeaveGroup request.
+ *
+ * @param context The request context.

Review Comment:
   The formatting with tab spaces between context and the description seems 
different in this javadoc and the next one, do we wanna uniformly space them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15249) Verify Connect test-plugins artifact is published to Maven Central

2023-09-05 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762237#comment-17762237
 ] 

Satish Duggana commented on KAFKA-15249:


[~ChrisEgerton] I thought there is some automation planned in release scripts. 
Thanks for the clarification on that. Please update 3.6.0 release plan with 
this note and close this JIRA.

> Verify Connect test-plugins artifact is published to Maven Central
> --
>
> Key: KAFKA-15249
> URL: https://issues.apache.org/jira/browse/KAFKA-15249
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.6.0
>
>
> In KAFKA-14759 we created a separate {{connect/test-plugins}} module to store 
> all testing-only Connect plugins and removed those plugins from existing 
> Connect modules.
> These testing-only plugins are intentionally excluded from the project's 
> release file (which can be generated with {{{}./gradlew releaseTarGz{}}}) 
> however, some users may still be relying on them for testing environments.
> Although we should refrain from distributing these testing-only plugins with 
> our out-of-the-box distribution of Connect, we should still ensure that 
> they're available on an opt-in basis to users who would like to continue 
> using them. This can be accomplished by publishing them to [Maven 
> Central|https://search.maven.org/], like we do with our other modules.
> This will probably happen automatically during the next release (3.6.0) with 
> no further action required. This ticket is just here as a reminder to verify 
> that the artifacts are present in the staging Maven repo when release 
> candidates are published for voting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd merged pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-05 Thread via GitHub


satishd merged PR #14329:
URL: https://github.com/apache/kafka/pull/14329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14329: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4)

2023-09-05 Thread via GitHub


satishd commented on PR #14329:
URL: https://github.com/apache/kafka/pull/14329#issuecomment-1707469208

   Merging it to trunk and 3.6 to unblock as it received 3 +1s. A couple of 
test failures unrelated to this change. Merging it to trunk and 3.6.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14314:
URL: https://github.com/apache/kafka/pull/14314#discussion_r1316539813


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java:
##
@@ -72,6 +79,12 @@ private void handleTopicError(
 ) {
 switch (topicError) {
 case UNKNOWN_TOPIC_OR_PARTITION:
+if (!tolerateUnknownTopics) {

Review Comment:
   > am not aware of any cases where brokers might return both topic- and topic 
partition-level errors for a metadata request, and if there are none, then this 
change should be safe. 
   
   Asking because my understanding is the only case where we would be concerned 
about topic and partition level errors is when the topic level error is unknown 
topic or partition but somehow the partitions for that topic have a different 
error? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15435) KRaft migration record counts in log message are incorrect

2023-09-05 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15435:
---
Fix Version/s: 3.6.0

> KRaft migration record counts in log message are incorrect
> --
>
> Key: KAFKA-15435
> URL: https://issues.apache.org/jira/browse/KAFKA-15435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0
>
>
> The counting logic in MigrationManifest is incorrect and produces invalid 
> output. This information is critical for users wanting to validate the result 
> of a migration.
>  
> {code}
> Completed migration of metadata from ZooKeeper to KRaft. 7117 records were 
> generated in 54253 ms across 1629 batches. The record types were 
> {TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, 
> ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. 
> {code}
> Due to the logic bug, the counts will never exceed 2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316531927


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a generic group LeaveGroup request.
+ *
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+ * no longer has any member.
+ */
+public CoordinatorResult genericGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+if (group.isInState(DEAD)) {
+return new CoordinatorResult<>(Collections.emptyList(),
+new LeaveGroupResponseData()
+.setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+);
+}
+
+CoordinatorResult coordinatorResult = EMPTY_RESULT;
+List memberResponses = new ArrayList<>();
+
+for (MemberIdentity member : request.members()) {
+// The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (member.memberId().equals(UNKNOWN_MEMBER_ID)) {
+if (member.groupInstanceId() != null && 
group.staticMemberId(member.groupInstanceId()) != null) {
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+group,
+group.staticMemberId(member.groupInstanceId()),
+member.reason()
+);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+);
+}
+} else if (group.isPendingMember(member.memberId())) {
+coordinatorResult = 
removePendingMemberAndUpdateGenericGroup(group, member.memberId());
+timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+log.info("Pending member {} has left group {} through 
explicit `LeaveGroup` request.",
+member.memberId(), group.groupId());
+
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+try {
+group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(

Review Comment:
   Added test cases as suggested below 
(`testJoinedMemberPendingMemberBatchLeaveGroup` and 
`testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember`). 
   
   Even though the group becomes empty, as there still is a pending member 
`hasAllMembersJoined()` returns false and does not generate records. Only after 
the last pending member leaves the group really becomes empty.
   
   So, the leave group request can only produce records in the last valid 
member. Invalid members will not overwrite the coordinatorResult.
   
   I understand that this looks unsafe, should we append the coordinatorResults 
and merge them instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316531927


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a generic group LeaveGroup request.
+ *
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+ * no longer has any member.
+ */
+public CoordinatorResult genericGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+if (group.isInState(DEAD)) {
+return new CoordinatorResult<>(Collections.emptyList(),
+new LeaveGroupResponseData()
+.setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+);
+}
+
+CoordinatorResult coordinatorResult = EMPTY_RESULT;
+List memberResponses = new ArrayList<>();
+
+for (MemberIdentity member : request.members()) {
+// The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (member.memberId().equals(UNKNOWN_MEMBER_ID)) {
+if (member.groupInstanceId() != null && 
group.staticMemberId(member.groupInstanceId()) != null) {
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+group,
+group.staticMemberId(member.groupInstanceId()),
+member.reason()
+);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+);
+}
+} else if (group.isPendingMember(member.memberId())) {
+coordinatorResult = 
removePendingMemberAndUpdateGenericGroup(group, member.memberId());
+timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+log.info("Pending member {} has left group {} through 
explicit `LeaveGroup` request.",
+member.memberId(), group.groupId());
+
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+try {
+group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(

Review Comment:
   Added a test case as suggested below 
(`testJoinedMemberPendingMemberBatchLeaveGroup` and 
`testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember`). Even though 
the group becomes empty, as there still is a pending member 
`hasAllMembersJoined()` returns false and does not generate records. Only after 
the last pending member leaves the group really becomes empty.
   
   So, the leave group request can only produce records in the last valid 
member.
   
   I understand that this looks unsafe, should we append the coordinatorResults 
and merge them instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 opened a new pull request, #14345: MINOR: Range assignor changes

2023-09-05 Thread via GitHub


rreddy-22 opened a new pull request, #14345:
URL: https://github.com/apache/kafka/pull/14345

   Minor changes to make the range assignor coherent with the other assignors


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1316480601


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType

Review Comment:
   I believe this is the only thing which makes this a converter-specific 
function. All of the other logic appears generic across all plugin types. It 
could be replaced with a `Consumer>` function which mutates 
the config before it is passed to the plugin, or a `Function, Map>` if you wanted to be pure.
   
   Then all of the variable names can just become `plugin` instead of 
`converter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters

2023-09-05 Thread via GitHub


gharris1727 commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1707372782

   > I honestly find the EnrichablePlugin class pretty hard to read, and prefer 
the merge style when it can be used.
   
   Sure, I'll buy that. I'm fine with migrating away from EnrichablePlugin to 
something else as long as it is a common abstraction. My concern here was just 
that we were adding a distinct third style of validating configurations when 
there appeared to be a lot of common functionality that could be shared.
   
   > If the stylistic suggestions are not blockers for review, but blockers for 
merging, do you think we could establish the ideal user-facing behavior here 
and then use a separate PR for a refactoring? I can target this branch with 
that PR (which would allow review to take place on it without having to merge 
this one to trunk), or target trunk (if we feel comfortable merging this 
without blocking on a refactor).
   
   I'm fine with reviewing this as-is and merging to trunk, and then 
refactoring the other two strategies in a follow-up. I think using lambdas is 
more appropriate than anonymous classes which are constructed for just one 
method call and then discarded.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1316465707


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType
+) {
+String converterClass = connectorConfig.get(converterProperty);
+
+if (converterClass == null
+|| converterConfigValue == null
+|| !converterConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T converterInstance;
+try {
+converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   > It felt slightly more readable to use a try-with-resources block than a 
finally block (especially since we don't have any catch blocks). 
   
   I suppose there's some subjectivity involved here, since I found the 
UncheckedClosable and explicit lambda to be a bit hard to read initially, but 
understood after some inspection. Using try-finally without any catch clauses 
is a pretty normal arrangement, and I think more developers would be used to it 
as compared to using a lambda along with our special UncheckedClosable.
   
   AFAIU the try-with-resources construct was added to help with cleaning up 
AutoClosable resources which can throw exceptions both during opening and 
closing, where it becomes tedious to set up the `finally` to perform the 
cleanup correctly. In this specific situation, the `newInstance` (open) errors 
are handled by a separate `try`, and the close errors are handled by 
`closeQuietly`, so none of the value-add of the try-with-resources is apparent.
   
   I See what you mean though, as we _do_ have exceptions from open and close, 
and we have somewhat tedious error handling surrounding them. But since the 
objects we're instantiating are "sometimes AutoClosable", the 
try-with-resources type checking is going to get in the way.
   
   Using try-with-resources to handle open and close together, you could have a 
wrapper class `MaybeClosable implements UncheckedClosable, Supplier` 
along with a method `static  MaybeClosable quiet(T, String)` that you 
would call like this:
   ```
   try (MaybeClosable wrapper = 
MaybeClosable.quiet(Utils.newInstance(...), "converter (for validation)") {
   Converter converter = wrapper.get();
   // do stuff
   } catch (RuntimeException e) {
   // exceptions from newInstance and do stuff
   }
   // exceptions from close are logged instead of propagated/suppressed
   ```
   I think that would type-check but I haven't tried it out myself. Everything 
is just a rough suggestion, so please iterate on the names or ergonomics if you 
like the idea.
   
   Without the closeQuietly semantics, it would look like this:
   ```
   try (MaybeClosable wrapper = 
MaybeClosable.propagate(Utils.newInstance(...)) {
   Converter converter = wrapper.get();
   // do stuff
   } catch (RuntimeException e) {
   // exceptions from newInstance and do stuff
   }
   // exceptions from close are not checked, but propagated or suppressed.
   ```
   
   > Is this correct? The interface was introduced in 
https://github.com/apache/kafka/pull/8618, where it was used on the left-hand 
side of a try-with-resources assignment that captured a method reference which 
did not throw a checked exception.
   
   When I said "exception throwing operation" I didn't mean "method that throws 
a checked exception", because I was thinking about how methods can throw 
RuntimeExceptions whether or not they have checked exceptions in the signature. 
I probably should have said "method that throws unchecked exceptions" to be 
unambiguous. Yes this PR and the linked PR both did not have checked 
exceptions, but they differ because one throws RuntimeExceptions and one does 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316456003


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be:
+ *  balance > rack matching (when applicable) > stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final Set subscriptionIds;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetAnExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+this.potentiallyUnfilledMembers = new HashMap<>();
+this.unfilledMembers = new HashMap<>();
+this.newAssignment = new HashMap<>();
+this.useRackAwareStrategy = rackInfo.useRackStrategy;
+// Without rack-aware strategy, tracking current owners of unassigned 
partitions is unnecessary
+// as all sticky partitions are retained until a member meets its 
quota.
+this.currentPartitionOwners = useRackAwareStrategy ? new HashMap<>() : 
Collections.emptyMap();
+}
+
+/**
+ * Here's the step-by-step breakdown of the assignment process:
+ *
+ *  Compute the quotas of partitions for each member based on the 
total partitions and member count.
+ *  For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.
+ *  If a partition's rack mismatches with its owner, track it for 
future use.
+ *  Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.
+ *  Derive the unassigned partitions by taking the difference between 
total partitions 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316439651


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be:
+ *  balance > rack matching (when applicable) > stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final Set subscriptionIds;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetAnExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+this.potentiallyUnfilledMembers = new HashMap<>();
+this.unfilledMembers = new HashMap<>();
+this.newAssignment = new HashMap<>();
+this.useRackAwareStrategy = rackInfo.useRackStrategy;

Review Comment:
   yep makes sense I removed it, I thought in the future if we wanted to have a 
flag in the assignor to switch it on or off



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316429235


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be:
+ *  balance > rack matching (when applicable) > stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14314:
URL: https://github.com/apache/kafka/pull/14314#discussion_r1316416748


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -5634,16 +5637,31 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
 
 final TopicPartition tp1 = new TopicPartition("foo", 0);
 
-try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
-env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+Map> responsesAndFailures 
= new HashMap<>();

Review Comment:
   extremely minor nit (we could do in a followup and not a blocker bug) but 
could we parameterize this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14314:
URL: https://github.com/apache/kafka/pull/14314#discussion_r1316414818


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java:
##
@@ -72,6 +79,12 @@ private void handleTopicError(
 ) {
 switch (topicError) {
 case UNKNOWN_TOPIC_OR_PARTITION:
+if (!tolerateUnknownTopics) {

Review Comment:
   > the operation would be retried if any metadata error was reported for any 
individual topic partition, even if an error was also reported for the entire 
topic. With this change, the operation always fails if an error is reported for 
the entire topic, even if an error is also reported for one or more individual 
topic partitions.
   
   This change only changes the behavior for unknown topic or partition errors 
right? Leader/broker not available continue to be retriable, topic auth, 
invalid topic, and other errors continue to fail the request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316408646


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * Assigns partitions to members of a consumer group ensuring a balanced 
distribution with
+ * considerations for sticky assignments and rack-awareness.
+ * The order of priority of properties during the assignment will be:
+ *  balance > rack matching (when applicable) > stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+private final AssignmentSpec assignmentSpec;
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+// List of topics subscribed to by all members.
+private final Set subscriptionIds;
+private final RackInfo rackInfo;
+// Count of members to receive an extra partition beyond the minimum quota,
+// to account for the distribution of the remaining partitions.
+private int remainingMembersToGetAnExtraPartition;
+// Map of members to the remaining number of partitions needed to meet the 
minimum quota,
+// including members eligible for an extra partition.
+private final Map potentiallyUnfilledMembers;
+// Members mapped to the remaining number of partitions needed to meet the 
full quota.
+// Full quota = minQuota + one extra partition (if applicable).
+private Map unfilledMembers;
+private List unassignedPartitions;
+private final Map newAssignment;
+// Tracks the current owner of each partition when using rack-aware 
strategy.
+// Current refers to the existing assignment.
+private final Map currentPartitionOwners;
+// Indicates if a rack aware assignment can be done.
+// True if racks are defined for both members and partitions.
+boolean useRackAwareStrategy;
+
+OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.assignmentSpec = assignmentSpec;
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());

Review Comment:
   yeah correct I added a check in the assign method so its not possible anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316406386


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GeneralUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+private static final Logger log = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
+
+@Override
+protected GroupAssignment buildAssignment() {
+return null;
+

Review Comment:
   ack will remove it but we can ignore this file for now since the 
implementation isn't in yet, just needed the initial template for now to get 
the conditional implementation of the specific assignment builder based on the 
subscriptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14305: KAFKA-14274: [1/7] basic refactoring

2023-09-05 Thread via GitHub


junrao commented on code in PR #14305:
URL: https://github.com/apache/kafka/pull/14305#discussion_r1316404620


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -363,6 +365,20 @@ private CompletableFuture> chainFuture(fi
 }
 });
 }
+
+@Override
+public String toString() {
+return "OffsetFetchRequestState{" +
+"requestedPartitions=" + requestedPartitions +
+", requestedGeneration=" + requestedGeneration +
+", future=" + future +
+", exponentialBackoff=" + exponentialBackoff +
+", lastSentMs=" + lastSentMs +
+", lastReceivedMs=" + lastReceivedMs +
+", numAttempts=" + numAttempts +
+", backoffMs=" + backoffMs +

Review Comment:
   Yes, I was referring to the `toString` method. It seems that every subclass 
of `RequestState` directly gets every field in `RequestState` to build its own 
string. This creates duplicated code and can be a bit error prone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-05 Thread via GitHub


jolshan commented on PR #14314:
URL: https://github.com/apache/kafka/pull/14314#issuecomment-1707306276

   Sorry for the delay @C0urante and thanks for the explanation. It makes sense.
   
   Given that we moved the retries to `TopicAdmin::retryEndOffsets` does this 
mean there is not any known wide usage or need for this retry?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15437) Add metrics about open iterators

2023-09-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15437:
---

 Summary: Add metrics about open iterators
 Key: KAFKA-15437
 URL: https://issues.apache.org/jira/browse/KAFKA-15437
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams allows to create iterators over state stores. Those iterator must 
get closed to free up resources (especially for RocksDB). – We regularly get 
user reports of "resource leaks" that can be pinned down to leaking (ie 
not-closed) iterators.

To simplify monitoring, it would be helpful to add a metric about open 
iterators to allow users to alert and pin-point the issue directly (and before 
the actually resource leak is observed).

We might want to have a DEBUG level per-store metric (to allow identifying the 
store in question quickly), but an already rolled up INFO level metric for the 
whole application.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316396258


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -49,6 +51,8 @@
  * 
  */
 public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);

Review Comment:
   removing this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316395025


##
clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationRequest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ControllerRegistrationRequestData;
+import org.apache.kafka.common.message.ControllerRegistrationResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ControllerRegistrationRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+private final ControllerRegistrationRequestData data;
+
+public Builder(ControllerRegistrationRequestData data) {
+super(ApiKeys.BROKER_HEARTBEAT);

Review Comment:
   Yes, nice catch. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393751


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() {
 }
 }
 
+public short registerControllerRecordVersion() {
+if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {

Review Comment:
   Sorry, this was left over from when the feature was in `IBP_3_6_IV2` 
previously. Fixed now.



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -320,6 +323,19 @@ public short registerBrokerRecordVersion() {
 }
 }
 
+public short registerControllerRecordVersion() {
+if (isAtLeast(MetadataVersion.IBP_3_6_IV2)) {

Review Comment:
   Sorry, this was left over from when the feature was in `IBP_3_6_IV2` 
previously. Fixed now to be `IBP_3_7_IV0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-05 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1316393088


##
clients/src/main/resources/common/message/ControllerRegistrationRequest.json:
##
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 70,

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316390836


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdPartition.java:
##
@@ -0,0 +1,74 @@
+/*

Review Comment:
   Its not the exact same, that one has topicId mapped to topicPartition here 
its just the partition number



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316388557


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)

Review Comment:
   > I guess we could add that check as well and have something like 
Utils::ensureConcreteSubclass--how does that sound?
   
   Yeah I imagine a `ensureConcreteSubclass(Class cls, Class baseClass)` 
that:
   1. asserts that cls is concrete and baseClass.isAssignableFrom(cls) is true
   2. if not valid, suggests child classes that are concrete 
and`baseClass::isAssignableFrom` is true.
   
   I don't think that it would compose well with the current `ensureConcrete` 
method though, so it would probably replace what is there already.
   
   > I'm on the fence about it potentially doing too much for a utility method, 
but it does provide user-facing advantages.
   
   I thought that about the child class suggestion logic, but relented for the 
same reason. It significantly complicates this method which could otherwise be 
a single if condition, but that complication could help someone quickly resolve 
an obvious typo.
   
   I don't think that asserting `baseClass.isAssignableFrom(cls)` is too much 
for this utility method, because we should always have a baseClass in mind when 
defining a `CLASS` configuration. Users of the 
`AbstractConfig::getConfiguredInstance` methods already need to provide a 
baseClass for subclass checking and type safety at runtime, this utility method 
would help users to remember the same check at validation time.
   
   And worst-case, someone can just specify Object as the baseClass, and 
effectively disable the baseClass filtering.
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters

2023-09-05 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1707264425

   > I wonder if we can unify these approaches, and maybe even use the "enrich" 
patten for producer/consumer/admin instead of the "merge" style.
   
   I honestly find the `EnrichablePlugin` class pretty hard to read, and prefer 
the merge style when it can be used. The extra logic involved in supporting 
aliases and overriding the base `ConfigDef` is already fairly complex, and we'd 
also have to expand the class to allow plugins to optionally return null 
`ConfigDef` objects.
   
   If the stylistic suggestions are not blockers for review, but blockers for 
merging, do you think we could establish the ideal user-facing behavior here 
and then use a separate PR for a refactoring? I can target this branch with 
that PR (which would allow review to take place on it without having to merge 
this one to trunk), or target trunk (if we feel comfortable merging this 
without blocking on a refactor).
   
   And of course, if the stylistic suggestions are blockers for review, let me 
know and I can take a stab at that without doing anything fancy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362502


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)
+.filter(c -> !Modifier.isAbstract(c.getModifiers()))
+.filter(c -> Modifier.isPublic(c.getModifiers()))
+.map(Class::getName)
+.collect(Collectors.joining(", "));
+String message = Utils.isBlank(childClassNames) ?
+name + " is abstract and cannot be created." :
+name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+throw new ConfigException(name, cls.getName(), message);

Review Comment:
   I like this simplification, and it makes sense to generalize the error 
message as part of promoting this to the Utils class. `parseForValidate` will 
still attribute the error to the correct key/value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362060


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method
+ * reference from a null object will result in a {@link 
NullPointerException}. In the example code above,
+ * it would be the caller's responsibility to ensure that {@code task} was 
non-null before attempting to
+ * use a method reference from it.
+ */
+public static void maybeCloseQuietly(Object maybeCloseable, String name) {

Review Comment:
   I found more use-cases for this today, so  on adding this helper.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method
+ * reference from a null object will result in a {@link 
NullPointerException}. In the example code above,
+ * it would be the caller's responsibility to ensure that {@code task} was 
non-null before attempting to
+ * use a method reference from it.
+ */
+public static void maybeCloseQuietly(Object maybeCloseable, String name) {

Review Comment:
   I found more use-cases for this today, so  on adding this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14309: KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1316338908


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType
+) {
+String converterClass = connectorConfig.get(converterProperty);
+
+if (converterClass == null
+|| converterConfigValue == null
+|| !converterConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T converterInstance;
+try {
+converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   It felt slightly more readable to use a try-with-resources block than a 
finally block (especially since we don't have any catch blocks). You're correct 
that `Utils::maybeCloseQuietly` doesn't throw any checked exceptions, but I had 
to provide a left-hand type (that extended the `AutoCloseable` interface) to 
prove that to the compiler, which ruled out `AutoCloseable` and `Closeable` 
since both of those interface's `close` methods throw checked exceptions.
   
   Also, regarding this comment:
   > The unchecked closable should be for exception-throwing operations that 
should have their exceptions suppressed, but maybeCloseQuietly should never 
throw an exception.
   
   Is this correct? The interface was introduced in 
https://github.com/apache/kafka/pull/8618, where it was used on the left-hand 
side of a try-with-resources assignment that captured a method reference which 
did not throw a checked exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-05 Thread via GitHub


philipnee commented on code in PR #14313:
URL: https://github.com/apache/kafka/pull/14313#discussion_r1316332331


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String 
topicName) {
  * @param adminConfig the configuration for the {@link Admin}
  */
 public TopicAdmin(Map adminConfig) {
-this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), 
Admin.create(adminConfig));
+this(adminConfig, Admin.create(adminConfig));
 }
 
-public TopicAdmin(Object bootstrapServers, Admin adminClient) {
-this(bootstrapServers, adminClient, true);
+public TopicAdmin(Map adminConfig, Admin adminClient) {
+this(bootstrapServers(adminConfig), adminClient, true);
+}
+
+// visible for testing
+TopicAdmin(Admin adminClient) {
+this(null, adminClient, true);

Review Comment:
   sorry for being a type police  : But this logic annoys me
   
   ```
   public TopicAdmin(Object bootstrapServers, Admin adminClient) {
   this(bootstrapServers, adminClient, true);
   }
   
   // visible for testing
   TopicAdmin(Object bootstrapServers, Admin adminClient, boolean 
logCreation) {
   this.admin = adminClient;
   this.bootstrapServers = bootstrapServers != null ? 
bootstrapServers.toString() : "";
   this.logCreation = logCreation;
   }
   ```
   
   Can we just do map.getOrDefault() and perform an inline type cast? In 
general passing nulls around confuses me...



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String 
topicName) {
  * @param adminConfig the configuration for the {@link Admin}
  */
 public TopicAdmin(Map adminConfig) {
-this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), 
Admin.create(adminConfig));
+this(adminConfig, Admin.create(adminConfig));
 }
 
-public TopicAdmin(Object bootstrapServers, Admin adminClient) {
-this(bootstrapServers, adminClient, true);
+public TopicAdmin(Map adminConfig, Admin adminClient) {
+this(bootstrapServers(adminConfig), adminClient, true);
+}
+
+// visible for testing
+TopicAdmin(Admin adminClient) {
+this(null, adminClient, true);

Review Comment:
   also understood this is not your code ... So I can submit a minor patch for 
this if you don't want to modify your PR.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java:
##
@@ -286,11 +286,16 @@ public static NewTopicBuilder defineTopic(String 
topicName) {
  * @param adminConfig the configuration for the {@link Admin}
  */
 public TopicAdmin(Map adminConfig) {
-this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), 
Admin.create(adminConfig));
+this(adminConfig, Admin.create(adminConfig));
 }
 
-public TopicAdmin(Object bootstrapServers, Admin adminClient) {
-this(bootstrapServers, adminClient, true);
+public TopicAdmin(Map adminConfig, Admin adminClient) {
+this(bootstrapServers(adminConfig), adminClient, true);
+}
+
+// visible for testing
+TopicAdmin(Admin adminClient) {
+this(null, adminClient, true);

Review Comment:
   sorry for being a type police  : But this logic annoys me
   
   ```
   public TopicAdmin(Object bootstrapServers, Admin adminClient) {
   this(bootstrapServers, adminClient, true);
   }
   
   // visible for testing
   TopicAdmin(Object bootstrapServers, Admin adminClient, boolean 
logCreation) {
   this.admin = adminClient;
   this.bootstrapServers = bootstrapServers != null ? 
bootstrapServers.toString() : "";
   this.logCreation = logCreation;
   }
   ```
   
   Can we just do map.getOrDefault() and perform an inline type cast? In 
general passing nulls around confuses me...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15435) KRaft migration record counts in log message are incorrect

2023-09-05 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-15435:


Assignee: David Arthur

> KRaft migration record counts in log message are incorrect
> --
>
> Key: KAFKA-15435
> URL: https://issues.apache.org/jira/browse/KAFKA-15435
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
>
> The counting logic in MigrationManifest is incorrect and produces invalid 
> output. This information is critical for users wanting to validate the result 
> of a migration.
>  
> {code}
> Completed migration of metadata from ZooKeeper to KRaft. 7117 records were 
> generated in 54253 ms across 1629 batches. The record types were 
> {TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, 
> ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. 
> {code}
> Due to the logic bug, the counts will never exceed 2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316327673


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+// The value will be null if the class couldn't be found; no point 
in performing follow-up validation
+return;
+}
+
+Class cls = (Class) value;
+try {
+Object o = cls.getDeclaredConstructor().newInstance();
+Utils.maybeCloseQuietly(o, o + " (instantiated for preflight 
validation");
+} catch (NoSuchMethodException e) {
+throw new ConfigException(name, cls.getName(), "Could not find a 
public no-argument constructor for class" + (e.getMessage() != null ? ": " + 
e.getMessage() : ""));
+} catch (ReflectiveOperationException | RuntimeException e) {
+throw new ConfigException(name, cls.getName(), "Could not 
instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+}

Review Comment:
   Good call, done  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316327532


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)
+.filter(c -> !Modifier.isAbstract(c.getModifiers()))
+.filter(c -> Modifier.isPublic(c.getModifiers()))
+.map(Class::getName)
+.collect(Collectors.joining(", "));
+String message = Utils.isBlank(childClassNames) ?
+name + " is abstract and cannot be created." :
+name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+throw new ConfigException(name, cls.getName(), message);

Review Comment:
   臘 Thanks, good catch.
   
   In pursuit of simplicity I've taken a stab at using the single-arg 
`ConfigDef` constructor, and using a one-size-fits-all error message that just 
says "This class" instead of, e.g., "Transform". Since all current uses for 
this code path involve (directly or indirectly) hitting a `ConfigDef.Validator` 
as part of config validation, I think the user-facing impact should only go as 
far as the small wording change. Let me know if you think it's better to 
preserve existing behavior, though.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)
+.filter(c -> !Modifier.isAbstract(c.getModifiers()))
+.filter(c -> Modifier.isPublic(c.getModifiers()))
+.map(Class::getName)
+.collect(Collectors.joining(", "));
+String message = Utils.isBlank(childClassNames) ?
+name + " is abstract and cannot be created." :
+name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+throw new ConfigException(name, cls.getName(), message);

Review Comment:
   臘 Thanks, good catch.
   
   In pursuit of simplicity I've taken a stab at using the single-arg 
`ConfigDef` constructor, and using a one-size-fits-all error message that just 
says "This class" instead of, e.g., "Transform". Since all current uses for 
this code path involve (directly or indirectly) hitting a `ConfigDef.Validator` 
as part of config validation, I think the user-facing impact should only go as 
far as the small wording change. Let me know if you think it's better to 
preserve existing behavior, though.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316322730


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a generic group LeaveGroup request.
+ *
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+ * no longer has any member.
+ */
+public CoordinatorResult genericGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+if (group.isInState(DEAD)) {
+return new CoordinatorResult<>(Collections.emptyList(),
+new LeaveGroupResponseData()
+.setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+);
+}
+
+CoordinatorResult coordinatorResult = EMPTY_RESULT;
+List memberResponses = new ArrayList<>();
+
+for (MemberIdentity member : request.members()) {
+// The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (member.memberId().equals(UNKNOWN_MEMBER_ID)) {
+if (member.groupInstanceId() != null && 
group.staticMemberId(member.groupInstanceId()) != null) {
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+group,
+group.staticMemberId(member.groupInstanceId()),
+member.reason()
+);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+);
+}
+} else if (group.isPendingMember(member.memberId())) {
+coordinatorResult = 
removePendingMemberAndUpdateGenericGroup(group, member.memberId());
+timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+log.info("Pending member {} has left group {} through 
explicit `LeaveGroup` request.",
+member.memberId(), group.groupId());
+
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} else {
+try {
+group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+coordinatorResult = 
removeCurrentMemberFromGenericGroup(
+group,
+member.memberId(),
+member.reason()
+);
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+);
+} catch (KafkaException e) {
+memberResponses.add(
+new MemberResponse()
+.setMemberId(member.memberId())
+.setGroupInstanceId(member.groupInstanceId())
+.setErrorCode(Errors.forException(e).code())
+);
+}
+}
+}
+return new CoordinatorResult<>(
+coordinatorResult.records(),
+new LeaveGroupResponseData()
+.setMembers(memberResponses),
+coordinatorResult.appendFuture()
+);
+}
+
+/**
+ * Remove a member from the group. Cancel member's heartbeat, and prepare 
rebalance
+ * or complete the join phase if necessary.
+ * 
+ * @param group The generic group.
+ * @param memberId  The member id.
+ * @param 

[GitHub] [kafka] C0urante commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316306495


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method

Review Comment:
   臘 Thanks, I had a feeling I was missing something.
   
   Considering the utility of this method is that it's type-blind, I'm not sure 
we need to worry about it being invoked with a method reference. What are the 
odds that a method reference returns something that a dev, at compile time, 
isn't sure is `AutoCloseable` or not?
   
   I've tentatively removed all of the language around method references from 
this section. If you think it's better to err on the side of safety, I can 
restore it with your suggestion applied.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method

Review Comment:
   臘 Thanks, I had a feeling I was missing something.
   
   Considering the utility of this method is that it's type-blind, I'm not sure 
we need to worry about it being invoked with a method reference. What are the 
odds that a method reference returns something that a dev, at compile time, 
isn't sure is `AutoCloseable` or not?
   
   I've tentatively removed all of the language around method references from 
this section. If you think it's better to err on the side of safety, I can 
restore it with your suggestion applied.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.lang.reflect.Modifier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConcreteSubClassValidator implements ConfigDef.Validator {
+private final Class expectedSuperClass;
+
+private ConcreteSubClassValidator(Class expectedSuperClass) {
+this.expectedSuperClass = expectedSuperClass;
+}
+
+public static ConcreteSubClassValidator forSuperClass(Class 
expectedSuperClass) {
+return new ConcreteSubClassValidator(expectedSuperClass);
+}
+
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+// The value will be null if the class couldn't be found; no point 
in performing follow-up validation
+return;
+}

Review Comment:
   Cool, filed https://issues.apache.org/jira/browse/KAFKA-15436 to track



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except 

[jira] [Created] (KAFKA-15436) Custom ConfigDef validators are invoked with null when user-provided value does not match type

2023-09-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-15436:
-

 Summary: Custom ConfigDef validators are invoked with null when 
user-provided value does not match type
 Key: KAFKA-15436
 URL: https://issues.apache.org/jira/browse/KAFKA-15436
 Project: Kafka
  Issue Type: Bug
Reporter: Chris Egerton


Filed in response to [discussion on a tangentially-related 
PR|https://github.com/apache/kafka/pull/14304#discussion_r1310039190].
h3. Background

The [ConfigDef.Validator 
interface|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Validator.html]
 can be used to add custom per-property validation logic to a 
[ConfigDef|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html]
 instance. This can serve many uses, including but not limited to:
 * Ensuring that the value for a string property matches the name of a Java 
enum type
 * Ensuring that the value for an integer property falls within the range of 
valid port numbers
 * Ensuring that the value for a class property has a public, no-args 
constructor and/or implements a certain interface

This validation logic can be invoked directly via 
[ConfigDef::validate|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validate(java.util.Map)]
 or 
[ConfigDef::validateAll|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.html#validateAll(java.util.Map)],
 or indirectly when instantiating an 
[AbstractConfig|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/AbstractConfig.html].

When a value is validated by a {{ConfigDef}} instance, the {{ConfigDef}} first 
verifies that the value adheres to the expected type. For example, if the "raw" 
value is the string {{"345"}} and the property is defined with the [INT 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#INT],
 then the value is valid (it is parsed as the integer {{{}345{}}}). However, if 
the same raw value is used for a property defined with the [BOOLEAN 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN],
 then the value is invalid (it cannot be parsed as a boolean).
h3. Problem

When a raw value is invalid for the type of the property it is used for (e.g., 
{{"345"}} is used for a property defined with the [BOOLEAN 
type|https://kafka.apache.org/35/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#BOOLEAN]),
 custom validators for the property are still invoked, with a value of 
{{{}null{}}}.

This can lead to some counterintuitive behavior, and may necessitate that 
implementers of the {{ConfigDef.Validator}} interface catch cases where the 
value is {{null}} and choose not to report any errors (with the assumption that 
an error will already be reported by the {{ConfigDef}} regarding its failure to 
parse the raw value with the expected type).

We may consider skipping custom validation altogether when the raw value for a 
property cannot be parsed with the expected type. On the other hand, it's 
unclear if there are compatibility concerns about this kind of change.

If we decide to change this behavior, we should try to assess which code paths 
may lead to custom validators being invoked, which use cases correspond to 
which of these code paths, and whether this behavioral change has a chance to 
negatively impact these use cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tylerbertrand opened a new pull request, #14344: Resolve checkstyle cache miss

2023-09-05 Thread via GitHub


tylerbertrand opened a new pull request, #14344:
URL: https://github.com/apache/kafka/pull/14344

   Resolves cache misses in `checkstyle` tasks due to absolute paths in 
`configProperties`.
   
   Sets `configDirectory` extension property, which is made available by the 
`checkstyle` plugin as `${config_loc}` in the `checkstyle` xml files, as shown 
in the Checkstyle Gradle docs 
[here](https://docs.gradle.org/current/userguide/checkstyle_plugin.html#sec:checkstyle_built_in_variables).
 The absolute paths set in `configProperties` are then replaced by relative 
paths from `configDirectory`. Because the header and suppression config file 
names are static and only referenced once, these were removed from 
`configProperties` and the file names are given directly in `checkstyle.xml` 
   
   [Task input comparison showing input differences between builds causing 
cache 
miss](https://ge.solutions-team.gradle.com/c/udumk6sv5uwbw/an5e6pclyiblm/task-inputs?cacheability=cacheable,overlapping-outputs,validation-failure)
   [Task input comparison with fix applied - no differences between 
builds](https://ge.solutions-team.gradle.com/c/ojlblvuooowjo/rrubheuf5ex5g/task-inputs?cacheability=cacheable,overlapping-outputs,validation-failure)
   
   ### Tests
   
   Manually verified suppressions are working as expected:
   * [checkStyleMain FAILS when calling `System.exit()` in 
`MessageGenerator.java` with that suppression 
DISABLED](https://ge.solutions-team.gradle.com/s/34amuqemmyili/failure#1)
   * [checkStyleMain PASSES when calling `System.exit()` in 
`MessageGenerator.java` with that suppression 
ENABLED](https://ge.solutions-team.gradle.com/s/s62zwn5gfsu54/failure)
   
   Manually verified header check is working as expected:
   * [checkStyleMain FAILS when java file is missing license 
header](https://ge.solutions-team.gradle.com/s/rjjqrqrmam6oq/failure#1)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2023-09-05 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-9800.

Fix Version/s: 3.7.0
   Resolution: Fixed

Merged [https://github.com/apache/kafka/pull/14111] to trunk.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: KIP-580, client
> Fix For: 3.7.0
>
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible with the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2023-09-05 Thread via GitHub


junrao commented on PR #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-1707163099

   This probably should be closed now that 
https://github.com/apache/kafka/pull/14111 is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-05 Thread via GitHub


C0urante commented on PR #14313:
URL: https://github.com/apache/kafka/pull/14313#issuecomment-1707161908

   @philipnee Let me know if you're still looking into this. Happy to wait on 
your analysis if you are, and if not, we can merge as-is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-09-05 Thread via GitHub


junrao merged PR #14111:
URL: https://github.com/apache/kafka/pull/14111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316274795


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a generic group LeaveGroup request.
+ *
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+ * no longer has any member.
+ */
+public CoordinatorResult genericGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+if (group.isInState(DEAD)) {
+return new CoordinatorResult<>(Collections.emptyList(),
+new LeaveGroupResponseData()
+.setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+);
+}
+
+CoordinatorResult coordinatorResult = EMPTY_RESULT;
+List memberResponses = new ArrayList<>();
+
+for (MemberIdentity member : request.members()) {
+// The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (member.memberId().equals(UNKNOWN_MEMBER_ID)) {
+if (member.groupInstanceId() != null && 
group.staticMemberId(member.groupInstanceId()) != null) {

Review Comment:
   thanks for the catch. will use existing `hasStaticMember`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


C0urante commented on PR #14336:
URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707142324

   BTW, same thought about backporting--I'm going to cherry-pick these changes 
on the `apache/kafka` repo onto the 3.5 and 3.6 branches, and for them to show 
up immediately, feel free to file a PR on `apache/kafka-site` to update the 
docs for 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14147: KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator

2023-09-05 Thread via GitHub


jeffkbkim commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1316264712


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -306,6 +308,22 @@ public CoordinatorResult 
commitOffset(
 return offsetMetadataManager.commitOffset(context, request);
 }
 
+/**
+ * Handles a OffsetCommit request.
+ *
+ * @param context The request context.
+ * @param request The actual OffsetCommit request.
+ *
+ * @return A Result containing the OffsetCommitResponse response and

Review Comment:
   fixed the javadocs to LeaveGroup. The other APIs also are using the actual 
API name (SyncGroup/JoinGroup...) so I will keep LeaveGroup as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


C0urante merged PR #14336:
URL: https://github.com/apache/kafka/pull/14336


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


yashmayya commented on PR #14337:
URL: https://github.com/apache/kafka/pull/14337#issuecomment-1707112259

   Thanks for the review and suggestion Chris! I've raised 
https://github.com/apache/kafka-site/pull/538 to port over these changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest

2023-09-05 Thread via GitHub


mumrah commented on code in PR #14342:
URL: https://github.com/apache/kafka/pull/14342#discussion_r1316249748


##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -60,7 +61,8 @@ public MigrationManifest build() {
 if (endTimeNanos == 0) {
 endTimeNanos = time.nanoseconds();
 }
-return new MigrationManifest(total, batches, endTimeNanos - 
startTimeNanos, counts);
+Map orderedCounts = new 
TreeMap<>(counts);

Review Comment:
   Putting into a treemap will order the map according to the keys. The effect 
of this is that the log message becomes deterministic, which is useful for 
testing. It's arguably also a bit nicer for end users if the output of the 
message is consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest

2023-09-05 Thread via GitHub


mumrah commented on code in PR #14342:
URL: https://github.com/apache/kafka/pull/14342#discussion_r1316249748


##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -60,7 +61,8 @@ public MigrationManifest build() {
 if (endTimeNanos == 0) {
 endTimeNanos = time.nanoseconds();
 }
-return new MigrationManifest(total, batches, endTimeNanos - 
startTimeNanos, counts);
+Map orderedCounts = new 
TreeMap<>(counts);

Review Comment:
   Putting into a treemap will order the map according to the keys. The effect 
of this is that the log message becomes deterministic, which is useful for 
testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2023-09-05 Thread Jimmy Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762186#comment-17762186
 ] 

Jimmy Wang commented on KAFKA-14517:


[~dajac] 

Got it. I refactored the previous code and remove the client relative part.

For sever part implement, added an extra arguments TopicsImage into 
maybeUpdateSubscribedTopicNames().

When GroupMetadataKey/Value is replayed,  the regex will not be considered for 
the reason that when
computeSubscriptionMetadata() is invoked, the topics which match the regex will 
be added into subscribedTopicCount.
 

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Jimmy Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lzyLuke commented on a diff in pull request #14342: KAFKA-15435 Fix counts in MigrationManifest

2023-09-05 Thread via GitHub


lzyLuke commented on code in PR #14342:
URL: https://github.com/apache/kafka/pull/14342#discussion_r1316244101


##
metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationManifest.java:
##
@@ -60,7 +61,8 @@ public MigrationManifest build() {
 if (endTimeNanos == 0) {
 endTimeNanos = time.nanoseconds();
 }
-return new MigrationManifest(total, batches, endTimeNanos - 
startTimeNanos, counts);
+Map orderedCounts = new 
TreeMap<>(counts);

Review Comment:
   qq: why we need a treemap here? Is the sorted order help here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


yashmayya commented on PR #14336:
URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707093218

   Ah whoops, that was not intentional. Good catch, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 opened a new pull request, #14343: Minor fix trailing white spaces on reviewers.py

2023-09-05 Thread via GitHub


rreddy-22 opened a new pull request, #14343:
URL: https://github.com/apache/kafka/pull/14343

   Minor: Fixing trailing white spaces on reviewers.py


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


C0urante commented on PR #14336:
URL: https://github.com/apache/kafka/pull/14336#issuecomment-1707076130

   The new paragraph looks great, but was the `STOPPED` state summary in the 
section beginning with "Connectors and their tasks publish status updates" 
intentionally removed? That part looked good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


C0urante commented on PR #14337:
URL: https://github.com/apache/kafka/pull/14337#issuecomment-1707068710

   I've backported this to the 3.5 and 3.6 branches; it should be reflected 
once a new release is put out on those.
   
   @yashmayya if you'd like, feel free to file a PR against 
https://github.com/apache/kafka-site to port these changes over to 3.5, which 
will cause them to show up immediately on our current docs page (which uses 
3.5, since that's the latest release).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


C0urante merged PR #14337:
URL: https://github.com/apache/kafka/pull/14337


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #14342: KAFKA-15435 Fix counts in MigrationManifest

2023-09-05 Thread via GitHub


mumrah opened a new pull request, #14342:
URL: https://github.com/apache/kafka/pull/14342

   This patch fixes an issue in MigrationManifest where we weren't computing 
the record counts properly. A unit test was added to verify the fix. 
   
   This patch also changes the counters in MigrationManifest to use an ordered 
map so we'll get deterministic output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+  

[GitHub] [kafka] dajac commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-05 Thread via GitHub


dajac commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1316210535


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1491,10 +1526,15 @@ public void 
testConsumerGroupOffsetFetchWithUnknownMemberId() {
 .setPartitionIndexes(Collections.singletonList(0))
 );
 
+// Fetch offsets cases.
+assertThrows(UnknownMemberIdException.class,
+() -> context.fetchOffsets("group", "", 0, topics, 
Long.MAX_VALUE));

Review Comment:
   Yeah, I changed the member id to be nullable and null by default. I think 
that it makes more sense. So now, an empty string raises an unknown member id 
error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1316209008


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1491,10 +1526,15 @@ public void 
testConsumerGroupOffsetFetchWithUnknownMemberId() {
 .setPartitionIndexes(Collections.singletonList(0))
 );
 
+// Fetch offsets cases.
+assertThrows(UnknownMemberIdException.class,
+() -> context.fetchOffsets("group", "", 0, topics, 
Long.MAX_VALUE));

Review Comment:
   Or is this part of the latest commit that made null the no member ID 
indicator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1316207292


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1491,10 +1526,15 @@ public void 
testConsumerGroupOffsetFetchWithUnknownMemberId() {
 .setPartitionIndexes(Collections.singletonList(0))
 );
 
+// Fetch offsets cases.
+assertThrows(UnknownMemberIdException.class,
+() -> context.fetchOffsets("group", "", 0, topics, 
Long.MAX_VALUE));

Review Comment:
   Is this testing that the empty member string will fail (ie if no member, it 
must be null?) I'm trying to understand how this test is different than the one 
below.
   
   I also noticed that the test changed the default values from empty string to 
null. Was that an error before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14321: KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest

2023-09-05 Thread via GitHub


jolshan commented on code in PR #14321:
URL: https://github.com/apache/kafka/pull/14321#discussion_r1316197710


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -181,10 +181,28 @@ public 
List fetchOffsets(
 String groupId,
 List topics,
 long committedOffset
+) {

Review Comment:
   It could be useful to add a small comment in case folks wanted to build upon 
these tests. But up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


yashmayya commented on code in PR #14337:
URL: https://github.com/apache/kafka/pull/14337#discussion_r1316183113


##
docs/toc.html:
##
@@ -206,6 +206,14 @@
 Plugin 
Discovery
 
 8.3 Connector Development 
Guide
+
+Core Concepts and 
APIs
+Developing a Simple 
Connector
+Dynamic Input/Output 
Streams
+Connect Configuration 
Validation

Review Comment:
   Yeah, it's definitely confusing - in fact I initially thought that I had 
messed it up somehow. I elected to keep it as is when I noticed that there are 
other headings with the same issue too (in other sections) but `Configuration 
Validation` sounds good to me so I've made that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


yashmayya commented on code in PR #14336:
URL: https://github.com/apache/kafka/pull/14336#discussion_r1316175988


##
docs/connect.html:
##
@@ -1078,7 +1079,7 @@ Kafka Connect
 
 
 
-It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
a pause/resume API. While a source connector is paused, Connect will stop 
polling it for additional records. While a sink connector is paused, Connect 
will stop pushing new messages to it. The pause state is persistent, so even if 
you restart the cluster, the connector will not begin message processing again 
until the task has been resumed. Note that there may be a delay before all of a 
connector's tasks have transitioned to the PAUSED state since it may take time 
for them to finish whatever processing they were in the middle of when being 
paused. Additionally, failed tasks will not transition to the PAUSED state 
until they have been restarted.
+It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
pause / stop / resume APIs. While a source connector is paused or stopped, 
Connect will stop polling it for additional records. While a sink connector is 
paused or stopped, Connect will stop pushing new messages to it. The paused / 
stopped state is persistent, so even if you restart the cluster, the connector 
will not begin message processing again until it has been resumed. For a paused 
connector, any resources claimed by its tasks are left allocated, which allows 
the connector to begin processing data quickly once it is resumed. When a 
connector is stopped, however, its tasks are shut down and any resources 
claimed by its tasks are deallocated. This is more efficient from a resource u
 sage standpoint than pausing the connector, but can cause it to take longer to 
begin processing data once resumed. Note that there may be a delay before all 
of a connector's tasks have transitioned to the PAUSED state since it may take 
time for them to finish whatever processing they were in the middle of when 
being paused. Additionally, failed tasks will not transition to the PAUSED 
state until they have been restarted.

Review Comment:
   Thanks, that sounds good to me  



##
docs/connect.html:
##
@@ -1078,7 +1079,7 @@ Kafka Connect
 
 
 
-It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
a pause/resume API. While a source connector is paused, Connect will stop 
polling it for additional records. While a sink connector is paused, Connect 
will stop pushing new messages to it. The pause state is persistent, so even if 
you restart the cluster, the connector will not begin message processing again 
until the task has been resumed. Note that there may be a delay before all of a 
connector's tasks have transitioned to the PAUSED state since it may take time 
for them to finish whatever processing they were in the middle of when being 
paused. Additionally, failed tasks will not transition to the PAUSED state 
until they have been restarted.
+It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
pause / stop / resume APIs. While a source connector is paused or stopped, 
Connect will stop polling it for additional records. While a sink connector is 
paused or stopped, Connect will stop pushing new messages to it. The paused / 
stopped state is persistent, so even if you restart the cluster, the connector 
will not begin message processing again until it has been resumed. For a paused 
connector, any resources claimed by its tasks are left allocated, which allows 
the connector to begin processing data quickly once it is resumed. When a 
connector is stopped, however, its tasks are shut down and any resources 
claimed by its tasks are deallocated. This is more efficient from a resource u
 sage standpoint than pausing the connector, but can cause it to take longer to 
begin processing data once resumed. Note that there may be a delay before all 
of a connector's tasks have transitioned to the PAUSED state since it may take 
time for them 

[GitHub] [kafka] C0urante commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14337:
URL: https://github.com/apache/kafka/pull/14337#discussion_r1316159088


##
docs/toc.html:
##
@@ -206,6 +206,14 @@
 Plugin 
Discovery
 
 8.3 Connector Development 
Guide
+
+Core Concepts and 
APIs
+Developing a Simple 
Connector
+Dynamic Input/Output 
Streams
+Connect Configuration 
Validation

Review Comment:
   This is getting split across two lines when it's rendered. As a quick fix, 
maybe we can rename this section to just `Configuration Validation`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14337: MINOR: Update the documentation's table of contents to add missing headings for Kafka Connect

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14337:
URL: https://github.com/apache/kafka/pull/14337#discussion_r1316159088


##
docs/toc.html:
##
@@ -206,6 +206,14 @@
 Plugin 
Discovery
 
 8.3 Connector Development 
Guide
+
+Core Concepts and 
APIs
+Developing a Simple 
Connector
+Dynamic Input/Output 
Streams
+Connect Configuration 
Validation

Review Comment:
   This is getting split across two lines when it's rendered, which is a little 
confusing as it looks like there are different sections for `Connect 
Configuration` and `Validation`. As a quick fix, maybe we can rename this 
section to just `Configuration Validation`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie opened a new pull request, #14341: KAFKA-15307: Removes non-existent configs

2023-09-05 Thread via GitHub


Cerchie opened a new pull request, #14341:
URL: https://github.com/apache/kafka/pull/14341

   addresses 1/3 of https://issues.apache.org/jira/browse/KAFKA-15307 
   
   Removes pieces of configuration from the [developer guide to configuring 
streams.](https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html).
 
   
   These were the pieces of configuration not found in 
[StreamsConfig.java](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java).
 
   
   They include: 
- `partition.grouper`
- `rack.aware.assignment.tag`
- `rocksdb.config.setter`
- `state.dir`
- `topology.optimization`
   
   Note: `acks` and `replication.factor`, although not found in 
'StreamsConfig.java', are listed in the developer guide to configuring streams. 
I am assuming they are defined somewhere else. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14336: KAFKA-14876: Add stopped state to Kafka Connect Administration docs section

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14336:
URL: https://github.com/apache/kafka/pull/14336#discussion_r1316150923


##
docs/connect.html:
##
@@ -1078,7 +1079,7 @@ Kafka Connect
 
 
 
-It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
a pause/resume API. While a source connector is paused, Connect will stop 
polling it for additional records. While a sink connector is paused, Connect 
will stop pushing new messages to it. The pause state is persistent, so even if 
you restart the cluster, the connector will not begin message processing again 
until the task has been resumed. Note that there may be a delay before all of a 
connector's tasks have transitioned to the PAUSED state since it may take time 
for them to finish whatever processing they were in the middle of when being 
paused. Additionally, failed tasks will not transition to the PAUSED state 
until they have been restarted.
+It's sometimes useful to temporarily stop the message processing of a 
connector. For example, if the remote system is undergoing maintenance, it 
would be preferable for source connectors to stop polling it for new data 
instead of filling logs with exception spam. For this use case, Connect offers 
pause / stop / resume APIs. While a source connector is paused or stopped, 
Connect will stop polling it for additional records. While a sink connector is 
paused or stopped, Connect will stop pushing new messages to it. The paused / 
stopped state is persistent, so even if you restart the cluster, the connector 
will not begin message processing again until it has been resumed. For a paused 
connector, any resources claimed by its tasks are left allocated, which allows 
the connector to begin processing data quickly once it is resumed. When a 
connector is stopped, however, its tasks are shut down and any resources 
claimed by its tasks are deallocated. This is more efficient from a resource u
 sage standpoint than pausing the connector, but can cause it to take longer to 
begin processing data once resumed. Note that there may be a delay before all 
of a connector's tasks have transitioned to the PAUSED state since it may take 
time for them to finish whatever processing they were in the middle of when 
being paused. Additionally, failed tasks will not transition to the PAUSED 
state until they have been restarted.

Review Comment:
   It feels like this muddies the waters a bit for the existing pause/resume 
API... what do you think about leaving this paragraph as it was, and adding a 
separate paragraph afterward that covers the `STOPPED` state, explaining that 
it goes beyond the `PAUSED` state by completely shutting down tasks instead of 
leaving them idling?
   
   We can also include information on offset management even if we intend to 
backport this PR; I can just make sure to either remove it or clarify that it 
refers to a feature available in later versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on pull request #14341: KAFKA-15307: Removes non-existent configs

2023-09-05 Thread via GitHub


Cerchie commented on PR #14341:
URL: https://github.com/apache/kafka/pull/14341#issuecomment-1706973281

   @mjsax for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15293) Update metrics doc to add tiered storage metrics

2023-09-05 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-15293.

Resolution: Fixed

> Update metrics doc to add tiered storage metrics
> 
>
> Key: KAFKA-15293
> URL: https://issues.apache.org/jira/browse/KAFKA-15293
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Reporter: Abhijeet Kumar
>Assignee: Abhijeet Kumar
>Priority: Critical
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd merged pull request #14331: KAFKA-15293 Add documentation for tiered storage metrics

2023-09-05 Thread via GitHub


satishd merged PR #14331:
URL: https://github.com/apache/kafka/pull/14331


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14331: KAFKA-15293 Add documentation for tiered storage metrics

2023-09-05 Thread via GitHub


satishd commented on PR #14331:
URL: https://github.com/apache/kafka/pull/14331#issuecomment-1706953908

   There are a few failed tests that are unrelated to this PR. Merging it to 
trunk and 3.6.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14335: MINOR: update comment in consumeAction

2023-09-05 Thread via GitHub


satishd commented on PR #14335:
URL: https://github.com/apache/kafka/pull/14335#issuecomment-1706906829

   There are a few failed tests unrelated to this PR. Merging it to trunk and 
3.6.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #14335: MINOR: update comment in consumeAction

2023-09-05 Thread via GitHub


satishd merged PR #14335:
URL: https://github.com/apache/kafka/pull/14335


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] riedelmax commented on pull request #14124: KAFKA-14509; [1/2] Define ConsumerGroupDescribe API request and response schemas and classes.

2023-09-05 Thread via GitHub


riedelmax commented on PR #14124:
URL: https://github.com/apache/kafka/pull/14124#issuecomment-1706895064

   @dajac Thank you for the review. I added unit tests and corrected the nits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15435) KRaft migration record counts in log message are incorrect

2023-09-05 Thread David Arthur (Jira)
David Arthur created KAFKA-15435:


 Summary: KRaft migration record counts in log message are incorrect
 Key: KAFKA-15435
 URL: https://issues.apache.org/jira/browse/KAFKA-15435
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.6.0
Reporter: David Arthur


The counting logic in MigrationManifest is incorrect and produces invalid 
output. This information is critical for users wanting to validate the result 
of a migration.

 
{code}
Completed migration of metadata from ZooKeeper to KRaft. 7117 records were 
generated in 54253 ms across 1629 batches. The record types were 
{TOPIC_RECORD=2, CONFIG_RECORD=2, PARTITION_RECORD=2, 
ACCESS_CONTROL_ENTRY_RECORD=2, PRODUCER_IDS_RECORD=1}. 
{code}

Due to the logic bug, the counts will never exceed 2.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14182: KAFKA 14515: Optimized Uniform Rack Aware Assignor

2023-09-05 Thread via GitHub


rreddy-22 commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1316096433


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -89,16 +93,17 @@ public MemberWithRemainingAssignments(String memberId, int 
remaining) {
 private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec, final SubscribedTopicDescriber subscribedTopicDescriber) {
 Map> membersPerTopic = new HashMap<>();
 Map membersData = 
assignmentSpec.members();
-
+// Only add topic Ids to the map if they are present in the topic 
metadata.
 membersData.forEach((memberId, memberMetadata) -> {
 Collection topics = memberMetadata.subscribedTopicIds();
 for (Uuid topicId : topics) {
 if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
+log.warn("Members are subscribed to topic " + topicId + " 
which doesn't exist in the topic metadata.");

Review Comment:
   Yep I'll raise a different PR for range assignor changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #14317: KAFKA-13973: Fix inflated block cache metrics

2023-09-05 Thread via GitHub


cadonna commented on PR #14317:
URL: https://github.com/apache/kafka/pull/14317#issuecomment-1706873185

   @nicktelford Yes, you are right about the existing tests! Sorry I was in a 
hurry and only skimmed them and missed that fact.
   I would like to have some tests that verify the behavior with one single 
column family (non-timestamped RocksDB) and multiple column families 
(timestamped RocksDB) so that we get a signal when something changes on RocksDB 
side and to kind of document the behavior. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15414) remote logs get deleted after partition reassignment

2023-09-05 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762138#comment-17762138
 ] 

Kamal Chandraprakash commented on KAFKA-15414:
--

[~fvisconte] 

Could you please take the latest trunk and try it out? Reopen the ticket if it 
doesn't work. Thanks!

> remote logs get deleted after partition reassignment
> 
>
> Key: KAFKA-15414
> URL: https://issues.apache.org/jira/browse/KAFKA-15414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.6.0
>
> Attachments: image-2023-08-29-11-12-58-875.png
>
>
> it seems I'm reaching that codepath when running reassignments on my cluster 
> and segment are deleted from remote store despite a huge retention (topic 
> created a few hours ago with 1000h retention).
> It seems to happen consistently on some partitions when reassigning but not 
> all partitions.
> My test:
> I have a test topic with 30 partition configured with 1000h global retention 
> and 2 minutes local retention
> I have a load tester producing to all partitions evenly
> I have consumer load tester consuming that topic
> I regularly reset offsets to earliest on my consumer to test backfilling from 
> tiered storage.
> My consumer was catching up consuming the backlog and I wanted to upscale my 
> cluster to speed up recovery: I upscaled my cluster from 3 to 12 brokers and 
> reassigned my test topic to all available brokers to have an even 
> leader/follower count per broker.
> When I triggered the reassignment, the consumer lag dropped on some of my 
> topic partitions:
> !image-2023-08-29-11-12-58-875.png|width=800,height=79! Screenshot 2023-08-28 
> at 20 57 09
> Later I tried to reassign back my topic to 3 brokers and the issue happened 
> again.
> Both times in my logs, I've seen a bunch of logs like:
> [RemoteLogManager=10005 partition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17] 
> Deleted remote log segment RemoteLogSegmentId
> {topicIdPartition=uR3O_hk3QRqsn4mPXGFoOw:loadtest11-17, 
> id=Mk0chBQrTyKETTawIulQog}
> due to leader epoch cache truncation. Current earliest epoch: 
> EpochEntry(epoch=14, startOffset=46776780), segmentEndOffset: 46437796 and 
> segmentEpochs: [10]
> Looking at my s3 bucket. The segments prior to my reassignment have been 
> indeed deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on pull request #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage

2023-09-05 Thread via GitHub


kamalcph commented on PR #14340:
URL: https://github.com/apache/kafka/pull/14340#issuecomment-1706829016

   @clolov @divijvaidya @showuon @abhijeetk88 @satishd
   
   Call for review. Please take a look!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14340: KAFKA-15351: Update log-start-offset after leader election for topics enabled with remote storage

2023-09-05 Thread via GitHub


kamalcph opened a new pull request, #14340:
URL: https://github.com/apache/kafka/pull/14340

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Chuckame commented on pull request #14157: KAFKA-15303: Avoid unnecessary re-serialization in FK-join

2023-09-05 Thread via GitHub


Chuckame commented on PR #14157:
URL: https://github.com/apache/kafka/pull/14157#issuecomment-1706826894

   Hello @mjsax, do you know if there is potential advancement about this fix ? 
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15221) Potential race condition between requests from rebooted followers

2023-09-05 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762134#comment-17762134
 ] 

Calvin Liu commented on KAFKA-15221:


[~satish.duggana] The PR is pending review, it targets 3.6.0 and other 3.5.* 
versions depending on when we can finish the code review.

cc [~dajac], [~david.mao] 

> Potential race condition between requests from rebooted followers
> -
>
> Key: KAFKA-15221
> URL: https://issues.apache.org/jira/browse/KAFKA-15221
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Blocker
> Fix For: 3.6.0, 3.5.2
>
>
> When the leader processes the fetch request, it does not acquire locks when 
> updating the replica fetch state. Then there can be a race between the fetch 
> requests from a rebooted follower.
> T0, broker 1 sends a fetch to broker 0(leader). At the moment, broker 1 is 
> not in ISR.
> T1, broker 1 crashes.
> T2 broker 1 is back online and receives a new broker epoch. Also, it sends a 
> new Fetch request.
> T3 broker 0 receives the old fetch requests and decides to expand the ISR.
> T4 Right before broker 0 starts to fill the AlterPartitoin request, the new 
> fetch request comes in and overwrites the fetch state. Then broker 0 uses the 
> new broker epoch on the AlterPartition request.
> In this way, the AlterPartition request can get around KIP-903 and wrongly 
> update the ISR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15430) Kafla create replca partition on controller node

2023-09-05 Thread Andrii Vysotskiy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Vysotskiy updated KAFKA-15430:
-
Issue Type: Bug  (was: Test)

> Kafla create replca partition on controller node
> 
>
> Key: KAFKA-15430
> URL: https://issues.apache.org/jira/browse/KAFKA-15430
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Andrii Vysotskiy
>Priority: Major
>
> I have configuration 5 nodes (KRAFT mode), with next roles: 4 
> broker+controller and 1 controller. Create topic with replication factor 5, 
> and it is created, and describe show that topic partition have 5 replicas.
>  
> {{/opt/kafka/latest/bin/kafka-topics.sh --create 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092 --replication-factor 5 
> --partitions 1 --topic test5}}
>  
> /opt/kafka/latest/bin/kafka-topics.sh --describe --topic test5 
> --bootstrap-server=dc1-prod-kafka-001-vs:9092
> Topic: test5 TopicId: amuqr8EgRmqeKryUHZwsMA PartitionCount: 1 
> ReplicationFactor: 5 Configs: segment.bytes=1073741824
> Topic: test5 Partition: 0 Leader: 3 Replicas: 3,4,1,2,5 Isr: 3,4,1,2}}
>  
> Replicas 5 and ISR 4. Why does kafka initially allow you to create a replica 
> on the controller node, although in reality the replica is not created on the 
> controller node and there are no topic files in the log directory.
> Is this expected behavior or not? Thanks.
> I want to understand whether such behavior is the norm for Kafka
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Cerchie commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression

2023-09-05 Thread via GitHub


Cerchie commented on code in PR #14322:
URL: https://github.com/apache/kafka/pull/14322#discussion_r1316003070


##
docs/design.html:
##
@@ -136,8 +136,10 @@ 
-Kafka supports this with an efficient batching format. A batch of messages 
can be clumped together compressed and sent to the server in this form. This 
batch of messages will be written in compressed form and will
-remain compressed in the log and will only be decompressed by the consumer.
+Kafka supports this with an efficient batching format. A batch of messages 
can be grouped together, compressed, and sent to the server in this form. The 
broker decompresses the batch in order to validate it. For
+example, it validates that the number of records in the batch is same as 
what batch header states. The broker may also potentially modify the batch 
(e.g., if the topic is compacted, the broker will filter out 

Review Comment:
   thank you! sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano opened a new pull request, #14339: KAFKA-15422: Update documenttion for delegation tokens when working with Kafka with KRaft

2023-09-05 Thread via GitHub


pprovenzano opened a new pull request, #14339:
URL: https://github.com/apache/kafka/pull/14339

   Minor changes to documentation to reflect that Delegation Tokens now works 
with Kafka with KRaft.
   Cherry-Pick of https://github.com/apache/kafka/pull/14318
   Reviewers: Manikumar Reddy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano closed pull request #14338: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft

2023-09-05 Thread via GitHub


pprovenzano closed pull request #14338: KAFKA-15422: Update documentation for 
Delegation Tokens in Kafka with KRaft
URL: https://github.com/apache/kafka/pull/14338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano opened a new pull request, #14338: KAFKA-15422: Update documentation for Delegation Tokens in Kafka with KRaft

2023-09-05 Thread via GitHub


pprovenzano opened a new pull request, #14338:
URL: https://github.com/apache/kafka/pull/14338

   Minor changes to documentation to reflect that Delegation Tokens now works 
with Kafka with KRaft.
   This update was already merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nicktelford commented on pull request #14317: KAFKA-13973: Fix inflated block cache metrics

2023-09-05 Thread via GitHub


nicktelford commented on PR #14317:
URL: https://github.com/apache/kafka/pull/14317#issuecomment-1706697411

   > @nicktelford Thanks for the update!
   > 
   > Could you also look why we did not catch this bug in 
`RocksDBMetricsIntegrationTest`or other metrics integration tests and add tests 
if needed?
   
   @cadonna Looks like that test doesn't verify the value of those metrics; it 
only checks the number of metrics registered under each name (i.e. that the 
expected metrics are registered and available, but not what they are).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >