[GitHub] [kafka] Vaibhav-Nazare commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage

2023-06-22 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1603698336

   Hi @divijvaidya any further updates for us? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1239320095


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1072,1338 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+// TODO: this needs to be checked in conjunction with empty group 
offsets.
+//if (groups.containsKey(groupId)) {
+//throw new IllegalStateException("Unexpected unload of active 
group " + groupId +
+//"while loading partition " + topicPartition);
+//}
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1239318379


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Erro

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1239316537


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1045,1265 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+// Below stores all methods to handle generic group APIs.
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult, Record> 
genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult, Record> result = 
EMPTY_RESULT;
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < groupMinSessionTimeoutMs ||
+sessionTimeoutMs > groupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = (GenericGroup) getOrMaybeCreateGroup(groupId, GENERIC, 
isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+String joinReason = request.reason();
+if (joinReason == null || joinReason.isEmpty()) {
+joinReason = "not provided";
+}
+
+if (!acceptJoiningMember(group, memberId)) {
+group.remove(memberId);
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+);
+
+} else if (isUnknownMember) {
+result = genericGroupJoinNewMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+} else {
+result = genericGroupJoinExistingMember(
+context,
+request,
+group,
+joinReason,
+responseFuture
+);
+}
+
+// Attempt to complete join group phase. We do not complete
+// the join group phase if this is the initial rebalance.
+if (group.isInState(PREPARING_REBALANCE) &&
+group.hasAllMembersJoined() &&
+group.generationId() != 0
+) {
+completeGenericGroupJoin(group);
+}
+}
+
+return result;
+}
+
+/**
+ * Handle a new member generic group join.
+ *
+ * @param context The request context.
+ * @param request The join group request.
+ * @param group   The group to add the member.
+ * @param joinReason  The client reason for the join request.
+ * @param responseFuture  The response future to complete.
+ *
+ * @return The coordinator result that will be appended to the log.
+ */
+private CoordinatorResult, Record> 
genericGroupJoinNewMember(
+RequestContext context,
+JoinGroupRequestData request,
+GenericGroup group,
+String joinReason,
+CompletableFuture responseFuture
+) {
+List protocols = new ArrayList<>();
+request.protocols().forEach(protocol -> protocols.add(new 
Protocol(protocol.name(), protocol.metadata(;
+if (group.isInState(DEAD)) {
+// if the group is marked as dead, it means some other thread has 
just removed the group
+// from the coordinator metadata; it is likely that the group has 
migrated to some other
+// coordinator OR the group is in a transient unstable phase. Let 
the member retry
+// finding the correct coordinator and rejoin.
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(UNKNOWN_MEMBER_ID)
+.setErrorCode(Erro

[GitHub] [kafka] github-actions[bot] commented on pull request #12922: KAFKA-14397; Don't reset producer sequence number after delivery timeout

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #12922:
URL: https://github.com/apache/kafka/pull/12922#issuecomment-1603639700

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12940: MINOR: Remove lock contention while adding sensors

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #12940:
URL: https://github.com/apache/kafka/pull/12940#issuecomment-1603639683

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12967: MINOR: Replace ArrayBuffer with ListBuffer for better performance

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #12967:
URL: https://github.com/apache/kafka/pull/12967#issuecomment-1603639665

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12988: KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-1603639642

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12997: KAFKA-14492: Extract a method to create LogManager, in order to be overrided by subclass of KafkaServer

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #12997:
URL: https://github.com/apache/kafka/pull/12997#issuecomment-1603639624

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #13003: MINOR: only allow certain operations when in KRaft premigration

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #13003:
URL: https://github.com/apache/kafka/pull/13003#issuecomment-1603639603

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #13004: [WIP] KAFKA-14498: investigate flaky

2023-06-22 Thread via GitHub


github-actions[bot] commented on PR #13004:
URL: https://github.com/apache/kafka/pull/13004#issuecomment-1603639585

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hikaru1232 closed pull request #13907: Sync ak3.5 to ccs3.5 and update ccs kafka version to 7.5.1-0-ccs

2023-06-22 Thread via GitHub


hikaru1232 closed pull request #13907: Sync ak3.5 to ccs3.5 and update ccs 
kafka version to 7.5.1-0-ccs
URL: https://github.com/apache/kafka/pull/13907


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hikaru1232 opened a new pull request, #13907: Sync ak3.5 to ccs3.5 and update ccs kafka version to 7.5.1-0-ccs

2023-06-22 Thread via GitHub


hikaru1232 opened a new pull request, #13907:
URL: https://github.com/apache/kafka/pull/13907

   The jenkins job fails on auto merge AK3.5 to CCS 3.5 
   https://jenkins.confluent.io/job/apache-kafka-test/job/3.5/8/console
   
   ```
   14:08:02  + ./kafka-test/merge.sh https://github.com/apache/kafka.git 3.5 
https://github.com/confluentinc/kafka.git 3.5
   14:08:02  => Push changes from the  branch of Apache Kafka to the  branch of 
Confluent Kafka.
   14:08:02  => Merge https://github.com/confluentinc/kafka.git:3.5 with 
https://github.com/apache/kafka.git:3.5.
   14:08:02  => Clone Apache Kafka...
   14:08:02  Cloning into 
'/home/jenkins/workspace/apache-kafka-test_3.5/kafka'...
   14:08:20  => Pull latest commits
   14:08:20  Already up to date.
   14:08:20  Switched to a new branch '3.5'
   14:08:20  Branch '3.5' set up to track remote branch '3.5' from 'origin'.
   14:08:20  => Add Confluent Kafka as a remote repo.
   14:08:20  confluent  https://github.com/confluentinc/kafka.git (fetch)
   14:08:20  confluent  https://github.com/confluentinc/kafka.git (push)
   14:08:20  origin https://github.com/apache/kafka.git (fetch)
   14:08:20  origin https://github.com/apache/kafka.git (push)
   14:08:20  => Merge confluent/3.5
   14:08:21  From https://github.com/confluentinc/kafka
   14:08:21   * branch  3.5-> FETCH_HEAD
   14:08:21   * [new branch]3.5-> confluent/3.5
   14:08:21  Auto-merging tests/kafkatest/version.py
   14:08:21  CONFLICT (content): Merge conflict in tests/kafkatest/version.py
   14:08:21  Auto-merging tests/kafkatest/__init__.py
   14:08:21  CONFLICT (content): Merge conflict in tests/kafkatest/__init__.py
   14:08:21  Auto-merging streams/quickstart/pom.xml
   14:08:21  CONFLICT (content): Merge conflict in streams/quickstart/pom.xml
   14:08:21  Auto-merging 
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
   14:08:21  CONFLICT (content): Merge conflict in 
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
   14:08:21  Auto-merging streams/quickstart/java/pom.xml
   14:08:21  CONFLICT (content): Merge conflict in 
streams/quickstart/java/pom.xml
   14:08:21  Auto-merging gradle/dependencies.gradle
   14:08:21  Auto-merging gradle.properties
   14:08:21  CONFLICT (content): Merge conflict in gradle.properties
   14:08:21  Automatic merge failed; fix conflicts and then commit the result.
   14:08:21  => Pull 'Jenkinsfile' from confluent/3.5
   14:08:21  Updated 0 paths from 48a6c3170d
   14:08:21  => Continue with confluent/3.5 merge
   14:08:21  error: Committing is not possible because you have unmerged files.
   14:08:21  hint: Fix them up in the work tree, and then use 'git add/rm 
'
   14:08:21  hint: as appropriate to mark resolution and make a commit.
   14:08:21  fatal: Exiting because of an unresolved conflict.
   14:08:21  U  gradle.properties
   14:08:21  U  streams/quickstart/java/pom.xml
   14:08:21  U  
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
   14:08:21  U  streams/quickstart/pom.xml
   14:08:21  U  tests/kafkatest/__init__.py
   14:08:21  U  tests/kafkatest/version.py
   ```
   
   Manually resolve the conflicts. Update Kafka version in ccs from 7.5.0-0-ccs 
to 7.5.1-0-ccs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2023-06-22 Thread via GitHub


yeralin commented on PR #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-1603491428

   @venkatesh010 could you please provide a code snippet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lukestephenson-zendesk commented on a diff in pull request #13447: MINOR: Change ordering of checks to prevent log spam on metadata updates

2023-06-22 Thread via GitHub


lukestephenson-zendesk commented on code in PR #13447:
URL: https://github.com/apache/kafka/pull/13447#discussion_r1239157410


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -399,8 +399,13 @@ private Optional 
updateLatestMetadata(
 // Between the time that a topic is deleted and re-created, 
the client may lose track of the
 // corresponding topicId (i.e. `oldTopicId` will be null). In 
this case, when we discover the new
 // topicId, we allow the corresponding leader epoch to 
override the last seen value.
-log.info("Resetting the last seen epoch of partition {} to {} 
since the associated topicId changed from {} to {}",
- tp, newEpoch, oldTopicId, topicId);
+if (oldTopicId != null) {
+log.info("Resetting the last seen epoch of partition {} to 
{} since the associated topicId changed from {} to {}",
+tp, newEpoch, oldTopicId, topicId);
+} else {
+log.debug("Resetting the last seen epoch of partition {} 
to {} since the associated topicId was undefined but is now set to {}",

Review Comment:
   nope. Not creating new topics, restarting or moving partitions. Looking at 
our organisation logs across all services, it appears to affect a lot of 
producers across different teams.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-22 Thread via GitHub


vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239142854


##
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##
@@ -189,7 +221,22 @@ public Joined withOtherValueSerde(final 
Serde otherValueSerde) {
  */
 @Override
 public Joined withName(final String name) {
-return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+}
+
+/**
+ * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed. Out of order records in the grace period will 
be processed in timestamp order. Late records, out of the grace period, will be 
executed right as they come in, if it is past the table history retention this 
could result in joins on the wrong version or a null join. Long gaps in stream 
side arriving records will cause records to be delayed in processing, even 
resulting in be processed out of the grace period window.

Review Comment:
   > if it is past the table history retention this could result in joins on 
the wrong version or a null join
   
   I think this should be "if it is past the table history retention this could 
result in a null join" instead? I don't think it's possible to join with the 
wrong version, the only possible issue is that it joins with a null instead.
   
   > even resulting in be processed out of the grace period window
   
   What does this mean?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);
+}
+}
+
+private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) 
{
+final Record record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+.withHeaders(toEmit.recordContext().headers());
+internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   Can you unset the record context (i.e., set it back to the original context) 
after finishing this step, in order to avoid confusion about what context is 
being used downstream? That's what we do with the suppress buffer, for example: 
https://github.com/apache/kafka/blob/1dbcb7da9e3625ec2078a82f84542a3127730fef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java#L207-L215



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
 }
 }
 
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+builder = new StreamsBuilder();
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table("tableTopic2", consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.In

[GitHub] [kafka] vvcephei commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-06-22 Thread via GitHub


vvcephei commented on code in PR #13842:
URL: https://github.com/apache/kafka/pull/13842#discussion_r1239120109


##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()

Review Comment:
   I'm worried about taking the top ten contributors by _lifetime_ commits, 
rather than by commits from the last year (as in `git shortlog --email 
--numbered --summary --since=2022-04-28`), since we have some prolific 
contributors in the past who are no longer active.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


cmccabe commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1239118404


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)

Review Comment:
   Yes, we should remove the registration from KafkaBroker. I'll do that now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13890: KAFKA-15109 Don't skip leader epoch bump while in migration mode

2023-06-22 Thread via GitHub


cmccabe commented on code in PR #13890:
URL: https://github.com/apache/kafka/pull/13890#discussion_r1239115751


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -140,6 +144,11 @@ public PartitionChangeBuilder 
setTargetLeaderRecoveryState(LeaderRecoveryState t
 return this;
 }
 
+public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
+this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+return this;
+}

Review Comment:
   Oops, the intention here was to only call this API when ZK migration was 
enabled.
   
   To fix this, I’d suggest removing setBumpLeaderEpochOnIsrShrink and changing 
it to enableBumpLeaderEpochOnIsrShrink. Basically, the API can be used to 
enable the bump, but it cannot be used to disable. This should avoid this kind 
of problem in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13890: KAFKA-15109 Don't skip leader epoch bump while in migration mode

2023-06-22 Thread via GitHub


cmccabe commented on code in PR #13890:
URL: https://github.com/apache/kafka/pull/13890#discussion_r1239113984


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -140,6 +144,11 @@ public PartitionChangeBuilder 
setTargetLeaderRecoveryState(LeaderRecoveryState t
 return this;
 }
 
+public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
+this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+return this;
+}

Review Comment:
   > Hmm. Doesn't this ignore the MV?
   
   No. The MV is consulted in the constructor.
   
   > Checking the MV is important to make sure that all of the brokers support 
this feature before the controller can skip leader epoch bumps
   
   This function is only used to enable bumping leader epoch, which is always 
safe to do, even on older MVs.
   
   If you are really concerned about foolproofing this you could make a PR that 
changes this function from `setBumpLeaderEpochOnIsrShrink` to 
`enableBumpLeaderEpochOnIsrShrink` (i.e. remove the option to disable this way)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

2023-06-22 Thread via GitHub


kirktrue commented on PR #13898:
URL: https://github.com/apache/kafka/pull/13898#issuecomment-1603388253

   > @philipnee @vvcephei Can you tag this as `ctr`, please?
   
   Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13890: KAFKA-15109 Don't skip leader epoch bump while in migration mode

2023-06-22 Thread via GitHub


jsancio commented on code in PR #13890:
URL: https://github.com/apache/kafka/pull/13890#discussion_r1239082472


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -140,6 +144,11 @@ public PartitionChangeBuilder 
setTargetLeaderRecoveryState(LeaderRecoveryState t
 return this;
 }
 
+public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
+this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
+return this;
+}

Review Comment:
   Hmm. Doesn't this ignore the MV? Checking the MV is important to make sure 
that all of the brokers support this feature before the controller can skip 
leader epoch bumps. I think you want the following code:
   ```java
   public PartitionChangeBuilder setBumpLeaderEpochOnIsrShrink(boolean 
bumpLeaderEpochOnIsrShrink) {
   // If the MV doesn't support skipping leader epoch bump the caller 
cannot override it.
   if (metadataVersion.isSkipLeaderEpochBumpSupported()) {
  this.bumpLeaderEpochOnIsrShrink = bumpLeaderEpochOnIsrShrink;
   }
   return this;
   }
   ```
   
   Because of this, it is not clear to me if there is even the right API. You 
probably want `setInZKMigrationMode` to make all of this more explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-22 Thread via GitHub


wcarlson5 commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1603313016

   The next PR is the restore logic https://github.com/wcarlson5/kafka/pull/2
   
   I have it targeted to this branch for easy reading and will retarget it when 
this is merged. 
   
   @vcrfxia @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 closed pull request #13906: KAFKA-14936: Add restore logic (3/N)

2023-06-22 Thread via GitHub


wcarlson5 closed pull request #13906: KAFKA-14936: Add restore logic (3/N)
URL: https://github.com/apache/kafka/pull/13906


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15109.
--
Resolution: Fixed

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.6.0
>
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15109) ISR shrink/expand issues on ZK brokers during migration

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15109:
-
Fix Version/s: 3.6.0

> ISR shrink/expand issues on ZK brokers during migration
> ---
>
> Key: KAFKA-15109
> URL: https://issues.apache.org/jira/browse/KAFKA-15109
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, replication
>Affects Versions: 3.6.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.6.0
>
>
> KAFKA-15021 introduced a new controller behavior that avoids increasing the 
> leader epoch during the controlled shutdown scenario. This prevents some 
> unnecessary thrashing of metadata and threads on the brokers and clients. 
> While a cluster is in a KIP-866 migration and has a KRaft controller with ZK 
> brokers, we cannot employ this leader epoch bump avoidance. The ZK brokers 
> must have the leader epoch bump in order for ReplicaManager to react to the 
> LeaderAndIsrRequest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-15098:
-
Fix Version/s: 3.6.0
   3.5.1

> KRaft migration does not proceed and broker dies if authorizer.class.name is 
> set
> 
>
> Key: KAFKA-15098
> URL: https://issues.apache.org/jira/browse/KAFKA-15098
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Ron Dagostino
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.1
>
>
> [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to 
> fatal exception
> java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
> does not yet support authorizers. Remove authorizer.class.name before 
> performing a migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set

2023-06-22 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-15098.
--
Resolution: Fixed

> KRaft migration does not proceed and broker dies if authorizer.class.name is 
> set
> 
>
> Key: KAFKA-15098
> URL: https://issues.apache.org/jira/browse/KAFKA-15098
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Ron Dagostino
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.6.0, 3.5.1
>
>
> [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to 
> fatal exception
> java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
> does not yet support authorizers. Remove authorizer.class.name before 
> performing a migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 opened a new pull request, #13906: Grace get write batches

2023-06-22 Thread via GitHub


wcarlson5 opened a new pull request, #13906:
URL: https://github.com/apache/kafka/pull/13906

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] venkatesh010 commented on pull request #6592: KAFKA-8326: Introduce List Serde

2023-06-22 Thread via GitHub


venkatesh010 commented on PR #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-1603284146

   This is while using List where E is class which is Inner
   Serde used for Inner is JsonSerde of Type E
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


divijvaidya commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1238998513


##
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala:
##
@@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: 
Time, val logger: Logg
   false
 }
   }
+
+  def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {
+def registerGauge(name: String, gauge: Gauge[Long]): Unit = {
+  val metricName = KafkaYammerMetrics.getMetricName(
+"kafka.server",

Review Comment:
   Could we use the constant `Server.MetricsPrefix` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


divijvaidya commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1238997925


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)

Review Comment:
   ok. I asked this question because I didn't see any change to 
`KafkaBroker.scala` in this PR and assumed that we are adding new metrics. I 
apologize if I am missing something very obvious but I am confused here :(. 
Shouldn't we remove the registration from KafkaBroker since we are now 
registering them here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] venkatesh010 commented on pull request #6592: KAFKA-8326: Introduce List Serde

2023-06-22 Thread via GitHub


venkatesh010 commented on PR #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-1603267651

   Hey @yeralin @mjsax @ableegoldman  Getting SerializationException in this 
serde 
   SerializationException: Invalid serialization strategy flag value
   
   Flag value is derived from bytes size (which is coming >2) which is amount 
of enum variables hence its breaking
   seems to be a bug
   
   please check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15115) Implement resetPositions functionality in ListOffsetRequestManager

2023-06-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15115:
--

 Summary: Implement resetPositions functionality in 
ListOffsetRequestManager
 Key: KAFKA-15115
 URL: https://issues.apache.org/jira/browse/KAFKA-15115
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Introduce support for resetting positions in the new ListOffsetsRequestManager. 
This task will include a new event for the resetPositions calls performed from 
the new consumer, and the logic for handling such events in the 
ListOffsetRequestManager.

The reset positions implementation will keep the same behaviour as the one in 
the old consumer, but adapted to the new threading model. So it is based in a 
RESET_POSITIONS events that is submitted to the background thread, and the 
processed by the ApplicationEventProcessor. The processing itself is done by 
the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
request for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-22 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736201#comment-17736201
 ] 

Bo Gao edited comment on KAFKA-15053 at 6/22/23 6:36 PM:
-

Thanks [~ChrisEgerton]! It would be great if we can backport this to 3.3.0! 
Also, if we do so, is there anything that the client needs to do to consume 
this change or the change would be there automatically?


was (Author: JIRAUSER300429):
Thanks [~ChrisEgerton]! If we backport this to 3.3.0, is there anything that 
the client needs to do to consume this change or the change would be there 
automatically?

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Assignee: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

2023-06-22 Thread via GitHub


kirktrue commented on PR #13898:
URL: https://github.com/apache/kafka/pull/13898#issuecomment-1603067666

   @philipnee @vvcephei Can you tag this as `ctr`, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-22 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1603056522

   I've updated the PR, but I'm seeing a thread leak in the tests, not sure if 
that's just from trunk or my change. Will investigate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14694) RPCProducerIdManager should not wait for a new block

2023-06-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14694.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> RPCProducerIdManager should not wait for a new block
> 
>
> Key: KAFKA-14694
> URL: https://issues.apache.org/jira/browse/KAFKA-14694
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.6.0
>
>
> RPCProducerIdManager initiates an async request to the controller to grab a 
> block of producer IDs and then blocks waiting for a response from the 
> controller.
> This is done in the request handler threads while holding a global lock. This 
> means that if many producers are requesting producer IDs and the controller 
> is slow to respond, many threads can get stuck waiting for the lock.
> This may also be a deadlock concern under the following scenario:
> if the controller has 1 request handler thread (1 chosen for simplicity) and 
> receives an InitProducerId request, it may deadlock.
> basically any time the controller has N InitProducerId requests where N >= # 
> of request handler threads has the potential to deadlock.
> consider this:
> 1. the request handler thread tries to handle an InitProducerId request to 
> the controller by forwarding an AllocateProducerIds request.
> 2. the request handler thread then waits on the controller response (timed 
> poll on nextProducerIdBlock)
> 3. the controller's request handler threads need to pick this request up, and 
> handle it, but the controller's request handler threads are blocked waiting 
> for the forwarded AllocateProducerIds response.
>  
> We should not block while waiting for a new block and instead return 
> immediately to free the request handler threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-22 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1238801056


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,60 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+//cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)

Review Comment:
   Yeah we can remove



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1256,10 +1261,25 @@ private  KStream doStreamTableJoin(final 
KTable table,
 final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
 final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+Optional> buffer = Optional.empty();
+
+if (joined.gracePeriod() != null) {

Review Comment:
   This seems to be the main sticking point of the PR. 
   
   The thought was that this that we would need some way to stop using the 
grace period without loosing records, but you bring up a good point.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -122,12 +122,12 @@ public Maybe> 
priorValueForBuffered(final K key) {
 }
 
 @Override
-public void put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {
+public boolean put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {

Review Comment:
   added `shouldReturnIfRecordWasAdded`



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.

[GitHub] [kafka] lihaosky commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

2023-06-22 Thread via GitHub


lihaosky commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1238821866


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1375,6 +1375,18 @@ public void shouldReturnDefaultClientSupplier() {
 assertTrue(supplier instanceof DefaultKafkaClientSupplier);
 }
 
+@Test
+public void shouldReturnDefaultRackAwareAssignmentConfig() {
+final String strategy = 
streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY);
+assertEquals(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, 
strategy);
+}
+

Review Comment:
   Make sense. I can use raw values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

2023-06-22 Thread via GitHub


lihaosky commented on PR #13846:
URL: https://github.com/apache/kafka/pull/13846#issuecomment-1603047119

   > Thanks @lihaosky for the PR!
   > 
   > Are you sure you want to start with adding the config?
   > 
   > I would have added the config as the last step in the implementation. If 
we add it now, the config will show up in the docs but it will not or only 
partially work until the implementation is finished.
   
   @cadonna , thanks for reviewing. I can move this to last PR. I thought 
people would use flags when the features is announced in some release...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-06-22 Thread via GitHub


hachikuji merged PR #13267:
URL: https://github.com/apache/kafka/pull/13267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-22 Thread via GitHub


cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1238656502


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -122,12 +122,12 @@ public Maybe> 
priorValueForBuffered(final K key) {
 }
 
 @Override
-public void put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {
+public boolean put(final long time, final Record record, final 
ProcessorRecordContext recordContext) {

Review Comment:
   Could you please add unit tests for this change?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+private final static KeyValueTimestamp[] EMPTY = new 
KeyValueTimestamp[0];
+
+private final String streamTopic = "streamTopic";
+private final String tableTopic = "tableTopic";
+private TestInputTopic inputStreamTopic;
+private TestInputTopic inputTableTopic;
+private final int[] expectedKeys = {0, 1, 2, 3};
+
+private MockApiProcessor processor;
+private TopologyTestDriver driver;
+private StreamsBuilder builder;
+private final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+
+@BeforeEach
+public void setUp() {
+builder = new StreamsBuilder();
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+
+}
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table(tableTopic, consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+inputStreamTopic = driver.createInputTopic(streamTopic, new 
Integ

[jira] [Commented] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-22 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736201#comment-17736201
 ] 

Bo Gao commented on KAFKA-15053:


Thanks [~ChrisEgerton]! If we backport this to 3.3.0, is there anything that 
the client needs to do to consume this change or the change would be there 
automatically?

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Assignee: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-22 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238782864


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+.timeoutMs((int) timer.remainingMs());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+.get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+timer.update();
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
-
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+connName, e);
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
 }
+updateTimerAndCheckExpiry(timer

[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-22 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238782864


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+.timeoutMs((int) timer.remainingMs());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+.get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+timer.update();
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
-
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+connName, e);
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
 }
+updateTimerAndCheckExpiry(timer

[GitHub] [kafka] blacktooth opened a new pull request, #13905: Fix MM2 not consuming from latest when "auto.offset.reset=latest

2023-06-22 Thread via GitHub


blacktooth opened a new pull request, #13905:
URL: https://github.com/apache/kafka/pull/13905

   …=latest" is set.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


cmccabe commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1238763103


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)

Review Comment:
   This is not a new change. The metrics are currently registered in 
`KafkaBroker.scala`. This just moves where they are registered. Here is the 
existing registration code:
   
   ```
   private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", 
Time.SYSTEM, logger.underlying)
   
   if (linuxIoMetricsCollector.usable()) {
 metricsGroup.newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
 metricsGroup.newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
   }
   ```
   
   These metrics are described in KIP-551, which you can read here: 
https://cwiki.apache.org/confluence/x/sotSC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


cmccabe commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1238763103


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)

Review Comment:
   This is not a new change. The metrics are currently registered in 
`KafkaBroker.scala`. This just moves where they are registered. Here is the 
existing registration code:
   
   ```
   private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", 
Time.SYSTEM, logger.underlying)
   
   if (linuxIoMetricsCollector.usable()) {
 metricsGroup.newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
 metricsGroup.newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
   }```
   
   These metrics are described in KIP-551, which you can read here: 
https://cwiki.apache.org/confluence/x/sotSC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-22 Thread via GitHub


C0urante commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238757251


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   Awesome, thanks!



-- 
This is an aut

[GitHub] [kafka] C0urante commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-06-22 Thread via GitHub


C0urante commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1602941862

   Thanks for the detailed analysis, everyone!
   
   ## Root cause
   
   I believe @sudeshwasnik's latest theory is correct: the Connect runtime 
invokes `SourceTask::commitRecord` even for records in aborted transactions, 
which causes `ConnectorHandle::awaitCommits` to return before the expected 
number of (non-aborted) records has been produced to Kafka.
   
   ## Consume-all implementation
   
   I haven't been able to find any issues with 
`EmbeddedKafkaCluster::consumeAll`. The use of the `read_uncommitted` isolation 
level for fetching end offsets and the `read_committed` isolation level for 
consuming is intentional, and mirrors logic we use elsewhere in the Connect 
runtime (see 
[TopicAdmin::endOffsets](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L711),
 which is used by the `KafkaBasedLog` class when reading to the end of a topic, 
even if the log's consumer is configured with the `read_committed` isolation 
level). This ensures that, when reading to the end of a topic, we reach the end 
of any in-progress transactions on the topic.
   
   ## Consume vs. consume all
   
   With regards to the change in the PR--the concern with consuming a fixed 
number of records from the topic is that we can potentially see a gap in 
sequence numbers if the topic has multiple partitions, since we wouldn't be 
guaranteed to consume records in the same order they were produced (which is 
why I implemented and used `EmbeddedKafkaCluster::consumeAll` when writing 
these tests initially; you can find the discussion 
[here](https://github.com/apache/kafka/pull/11782#discussion_r912235126)).
   
   Could we stick with using `consumeAll` and instead bump the number of 
expected records/commits? I drafted this change locally and it seemed to work 
well:
   
   ```java
   // the connector aborts some transactions, which causes records that 
it has emitted (and for which
   // SourceTask::commitRecord has been invoked) to be invisible to 
consumers; we expect the task to
   // emit at most 233 records in total before 100 records have been 
emitted as part of one or more
   // committed transactions
   connectorHandle.expectedRecords(233);
   connectorHandle.expectedCommits(233);
   ```
   
   (This would replace the existing code 
[here](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L397-L399).)
   
   ## Transaction size logging
   
   Also, as an aside--it was really helpful to know how many records were in 
each aborted/committed transaction while investigating this test. I tweaked 
`MonitorableSourceConnector.MonitorableSourceTask::maybeDefineTransactionBoundary`
 to provide this info; if you agree that it'd be useful, feel free to add it to 
this PR (regardless of which fix we use):
   
   ```java
   private void maybeDefineTransactionBoundary(SourceRecord record) {
   if (context.transactionContext() == null || seqno != 
nextTransactionBoundary) {
   return;
   }
   long transactionSize = nextTransactionBoundary - 
priorTransactionBoundary;
   // If the transaction boundary ends on an even-numbered offset, 
abort it
   // Otherwise, commit
   boolean abort = nextTransactionBoundary % 2 == 0;
   calculateNextBoundary();
   if (abort) {
   log.info("Aborting transaction of {} records", 
transactionSize);
   context.transactionContext().abortTransaction(record);
   } else {
   log.info("Committing transaction of {} records", 
transactionSize);
   context.transactionContext().commitTransaction(record);
   }
   }
   }
   ```
   
   And in fact, if we believe this would be useful for all connectors, we could 
even add this kind of logging to the `ExactlyOnceWorkerSourceTask` class. But 
that should be done in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2023-06-22 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736180#comment-17736180
 ] 

Andrew Schofield commented on KAFKA-9800:
-

[~showuon] I wonder what the current status of this is. I'd be interested in 
helping to get it delivered in a forthcoming release.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Luke Chen
>Priority: Major
>  Labels: KIP-580, client
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible with the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238725344


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -266,9 +295,21 @@ public CompletableFuture joinGroup(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
+if (!isValidGroupId(request.groupId(), 
ApiKeys.forId(request.apiKey( {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+
+return responseFuture;
+}
+
+runtime.scheduleGenericGroupOperation("generic-group-join",
+topicPartitionFor(request.groupId()),
+coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture));
+
+return responseFuture;

Review Comment:
   We probably need to convert some of the exceptions like I did for the 
consumer group heartbeat request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-22 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1238720569


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -291,7 +291,7 @@ public long skip(long toSkip) throws IOException {
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
 long bytesSkipped = (avail < remaining) ? avail : remaining;
-pos += bytesSkipped;
+pos += (int) bytesSkipped;

Review Comment:
   I tweaked this a bit to make it more obvious why it's safe to cast. Please 
take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736177#comment-17736177
 ] 

Chris Egerton edited comment on KAFKA-15113 at 6/22/23 3:35 PM:


[~yash.mayya] Thanks for filing this!

One thing I'm struggling with is that, as far as I've been able to tell, 
there's no realistic use case for setting different bootstrap servers for, 
e.g., a sink connector's consumer and admin client, or a source connector's 
producer and admin client, or a sink connector's consumer and producer (which 
can be set up if the connector uses a DLQ topic).

If we don't want to support this use case, I think our effort might be better 
spent trying to make it easier for users to do the right thing instead of 
trying to gracefully recover when they've done the wrong thing.

One idea I've been toying with is adding support for declaring single 
properties in connector configurations that affect all Kafka clients spun up by 
the Connect runtime. For example, instead of specifying 
{{{}consumer.override.bootstrap.servers{}}}, 
{{{}producer.override.bootstrap.servers{}}}, and 
{{admin.override.bootstrap.servers}} in a connector config, we could allow 
users to simply declare {{{}kafka.clients.override.bootstrap.servers{}}}.

If we wanted to get fancier about this and avoid some of the compatibility 
constraints of adding framework-level properties to connector configurations 
(which always run the risk of conflicting with connector-defined properties), 
we might even expand the structure of connector configurations by separating 
configs that apply to the connector from the ones that apply to its 
runtime-constructed Kafka clients, its key/value/header converters, etc. That 
could look something like this (assuming the request is issued against the 
{{POST /connectors}} endpoint):

{

{{    "name": "reddit-source",}}

{{    "connector.config": {}}

{{        "connector.class": "RedditSource",}}

{{        "tasks.max": "1",}}

{{        "posts.subreddits": "CatsStandingUp",}}

{{        "posts.topic": "reddit"}}

{{    },}}

{{    "kafka.clients.config": {}}

{{        "bootstrap.servers": "localhost:9093",}}

{{        "security.protocol": "PLAINTEXT"}}

{{    },}}

{{    "producer.config": {}}

{{        "buffer.memory": "4194304"}}

        }

}

Both of these would come with the advantage that, if users start actually using 
the feature, it'd be harder to screw up connector configurations. Of course, we 
would still have to decide if/how to handle misconfiguration by the user, but 
it might allow us to pursue more opinionated options, like failing requests 
(and even rejecting connector configurations), which IMO is a fine option as 
long as we provide a clear error message with easy-to-follow instructions on 
how to correct the connector configuration.

 

TL;DR: We should start rejecting connector configurations (and failing offset 
modification requests for connectors) that have mismatched bootstrap servers 
across Kafka clients, but we should also make it easier for users to correctly 
configure a connector with overridden client bootstrap servers, which will 
almost certainly require a KIP.


was (Author: chrisegerton):
[~yash.mayya] Thanks for filing this!

One thing I'm struggling with is that, as far as I've been able to tell, 
there's no realistic use case for setting different bootstrap servers for, 
e.g., a sink connector's consumer and admin client, or a source connector's 
producer and admin client, or a sink connector's consumer and producer (which 
can be set up if the connector uses a DLQ topic).

If we don't want to support this use case, I think our effort might be better 
spent trying to make it easier for users to do the right thing instead of 
trying to gracefully recover when they've done the wrong thing.

One idea I've been toying with is adding support for declaring single 
properties in connector configurations that affect all Kafka clients spun up by 
the Connect runtime. For example, instead of specifying 
{{{}consumer.override.bootstrap.servers{}}}, 
{{{}producer.override.bootstrap.servers{}}}, and 
{{admin.override.bootstrap.servers}} in a connector config, we could allow 
users to simply declare {{{}kafka.clients.override.bootstrap.servers{}}}.

If we wanted to get fancier about this and avoid some of the compatibility 
constraints of adding framework-level properties to connector configurations 
(which always run the risk of conflicting with connector-defined properties), 
we might even expand the structure of connector configurations by separating 
configs that apply to the connector from the ones that apply to its 
runtime-constructed Kafka clients, its key/value/header converters, etc. That 
could look something like this (assuming the request is issued against the 
{{POST /connectors}} endpoint):

{{{}}

{{    "name": "reddit-source",}}


[jira] [Commented] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736177#comment-17736177
 ] 

Chris Egerton commented on KAFKA-15113:
---

[~yash.mayya] Thanks for filing this!

One thing I'm struggling with is that, as far as I've been able to tell, 
there's no realistic use case for setting different bootstrap servers for, 
e.g., a sink connector's consumer and admin client, or a source connector's 
producer and admin client, or a sink connector's consumer and producer (which 
can be set up if the connector uses a DLQ topic).

If we don't want to support this use case, I think our effort might be better 
spent trying to make it easier for users to do the right thing instead of 
trying to gracefully recover when they've done the wrong thing.

One idea I've been toying with is adding support for declaring single 
properties in connector configurations that affect all Kafka clients spun up by 
the Connect runtime. For example, instead of specifying 
{{{}consumer.override.bootstrap.servers{}}}, 
{{{}producer.override.bootstrap.servers{}}}, and 
{{admin.override.bootstrap.servers}} in a connector config, we could allow 
users to simply declare {{{}kafka.clients.override.bootstrap.servers{}}}.

If we wanted to get fancier about this and avoid some of the compatibility 
constraints of adding framework-level properties to connector configurations 
(which always run the risk of conflicting with connector-defined properties), 
we might even expand the structure of connector configurations by separating 
configs that apply to the connector from the ones that apply to its 
runtime-constructed Kafka clients, its key/value/header converters, etc. That 
could look something like this (assuming the request is issued against the 
{{POST /connectors}} endpoint):

{{{}}

{{    "name": "reddit-source",}}

{{    "connector.config": {}}

{{        "connector.class": "RedditSource",}}

{{        "tasks.max": "1",}}

{{        "posts.subreddits": "CatsStandingUp",}}

{{        "posts.topic": "reddit"}}

{{    },}}

{{    "kafka.clients.config": {}}

{{        "bootstrap.servers": "localhost:9093",}}

{{        "security.protocol": "PLAINTEXT"}}

{{    },}}

{{    "producer.config": {}}

{{        "buffer.memory": "4194304"}}

{{    }}}

{{}}}

Both of these would come with the advantage that, if users start actually using 
the feature, it'd be harder to screw up connector configurations. Of course, we 
would still have to decide if/how to handle misconfiguration by the user, but 
it might allow us to pursue more opinionated options, like failing requests 
(and even rejecting connector configurations), which IMO is a fine option as 
long as we provide a clear error message with easy-to-follow instructions on 
how to correct the connector configuration.

 

TL;DR: We should start rejecting connector configurations (and failing offset 
modification requests for connectors) that have mismatched bootstrap servers 
across Kafka clients, but we should also make it easier for users to correctly 
configure a connector with overridden client bootstrap servers, which will 
almost certainly require a KIP.

> Gracefully handle cases where a sink connector's admin and consumer client 
> config overrides target different Kafka clusters
> ---
>
> Key: KAFKA-15113
> URL: https://issues.apache.org/jira/browse/KAFKA-15113
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Priority: Minor
>
> Background reading -
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
>  
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  
>  
> From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
> {quote}Currently, admin clients are only instantiated for sink connectors to 
> create the DLQ topic if required. So it seems like it could be technically 
> possible for a sink connector's consumer client overrides to target a 
> different Kafka cluster from its producer and admin client overrides. Such a 
> setup won't work with this implementation of the get offsets API as it is 
> using an admin client to get a sink connector's consumer group offsets. 
> However, I'm not sure we want to use a consumer client to retrieve the 
> offsets either as we shouldn't be disrupting the existing sink tasks' 
> consumer group just to fetch offsets. Leveraging a sink task's consumer also 
> isn't an option because fetching offsets for a stopped sink connector (where 
> all the tasks will be stopped) should be allowed. I'm wondering if we should 
> document that a connector's various 

[GitHub] [kafka] ijuma merged pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-06-22 Thread via GitHub


ijuma merged PR #13582:
URL: https://github.com/apache/kafka/pull/13582


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13582: MINOR: Fix lossy conversions flagged by Java 20

2023-06-22 Thread via GitHub


ijuma commented on PR #13582:
URL: https://github.com/apache/kafka/pull/13582#issuecomment-1602803318

   After several test runs, it looks like the failures in the last run are 
flakes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-06-22 Thread via GitHub


pprovenzano commented on PR #13374:
URL: https://github.com/apache/kafka/pull/13374#issuecomment-1602781584

   PR to fix the issue is out for review:
   https://github.com/apache/kafka/pull/13904
   --Proven
   
   On Wed, Jun 21, 2023 at 12:16 PM Proven Provenzano ***@***.***>
   wrote:
   
   >
   > The code is correct and the example should be updated to 'name=' .
   > Sorry for the error in the documentation.
   > --Proven
   >
   > On Wed, Jun 21, 2023 at 4:38 AM Mike Lothian ***@***.***>
   > wrote:
   >
   >> The examples say:
   >>
   >>   --add-scram ADD_SCRAM, -S ADD_SCRAM
   >>  A SCRAM_CREDENTIAL to add to the 
__cluster_metadata log e.g.
   >>  
'SCRAM-SHA-256=[user=alice,password=alice-secret]'
   >>  
'SCRAM-SHA-512=[user=alice,iterations=8192,salt="N3E=",saltedpassword="YCE="]'
   >>
   >> But the code errors unless name= is passed, should user be name in the
   >> example or is the check wrong
   >> 
https://github.com/apache/kafka/blame/49c1697ab08189c9707d04a6078aa9d5b69ed3aa/core/src/main/scala/kafka/tools/StorageTool.scala#L174
   >>
   >> —
   >> Reply to this email directly, view it on GitHub
   >> , or
   >> unsubscribe
   >> 

   >> .
   >> You are receiving this because you authored the thread.Message ID:
   >> ***@***.***>
   >>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano opened a new pull request, #13904: KAFKA-15114: Update help in StorageTool for creating SCRAM credentials for KRaft bootstrap.

2023-06-22 Thread via GitHub


pprovenzano opened a new pull request, #13904:
URL: https://github.com/apache/kafka/pull/13904

   The choice of using name vs. user as a parameter is because internally the 
record uses name, all tests using the StorageTool use name as a parameter, 
KafkaPrincipals are created with name and because creating SCRAM credentials is 
done with --entity-name
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15114) StorageTool help specifies user as parameter not name

2023-06-22 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-15114:
-

 Summary: StorageTool help specifies user as parameter not name
 Key: KAFKA-15114
 URL: https://issues.apache.org/jira/browse/KAFKA-15114
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.5.0
Reporter: Proven Provenzano
Assignee: Proven Provenzano
 Fix For: 3.5.1


StorageTool help message current specifies setting a {{user}} parameter when 
creating a SCRAM record for bootstrap.

The StorageTool parses and only accepts the parameter as {{name}} and so the 
help message is wrong.

The choice of using {{name}} vs. {{user}} as a parameter is because internally 
the record uses name, all tests using the StorageTool use name as a parameter, 
KafkaPrincipals are created with {{name}} and because creating SCRAM 
credentials is done with {{--entity-name}}

I will change the help to specify {{name}} instead of {{user}}.


 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15113:
--
Priority: Minor  (was: Major)

> Gracefully handle cases where a sink connector's admin and consumer client 
> config overrides target different Kafka clusters
> ---
>
> Key: KAFKA-15113
> URL: https://issues.apache.org/jira/browse/KAFKA-15113
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Priority: Minor
>
> Background reading -
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
>  
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
>  
>  
> From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
> {quote}Currently, admin clients are only instantiated for sink connectors to 
> create the DLQ topic if required. So it seems like it could be technically 
> possible for a sink connector's consumer client overrides to target a 
> different Kafka cluster from its producer and admin client overrides. Such a 
> setup won't work with this implementation of the get offsets API as it is 
> using an admin client to get a sink connector's consumer group offsets. 
> However, I'm not sure we want to use a consumer client to retrieve the 
> offsets either as we shouldn't be disrupting the existing sink tasks' 
> consumer group just to fetch offsets. Leveraging a sink task's consumer also 
> isn't an option because fetching offsets for a stopped sink connector (where 
> all the tasks will be stopped) should be allowed. I'm wondering if we should 
> document that a connector's various client config override policies shouldn't 
> target different Kafka clusters (side note - looks like we don't [currently 
> document|https://kafka.apache.org/documentation/#connect] client config 
> overrides for Connect beyond just the worker property 
> {{{}connector.client.config.override.policy{}}}).
> {quote}
>  
> {quote}I don't think we need to worry too much about this. I cannot imagine a 
> sane use case that involves overriding a connector's Kafka clients with 
> different Kafka clusters (not just bootstrap servers, but actually different 
> clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
> docs that that kind of setup isn't supported but I really, really hope that 
> it's not necessary and nobody's trying to do that in the first place. I also 
> suspect that there are other places where this might cause issues, like with 
> exactly-once source support or automatic topic creation for source connectors.
> That said, there is a different case we may want to consider: someone may 
> have configured consumer overrides for a sink connector, but not admin 
> overrides. This may happen if they don't use a DLQ topic. I don't know if we 
> absolutely need to handle this now and we may consider filing a follow-up 
> ticket to look into this, but one quick-and-dirty thought I've had is to 
> configure the admin client used here with a combination of the configurations 
> for the connector's admin client and its consumer, giving precedent to the 
> latter.
> {quote}
>  
> Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] 
> -
> {quote}We will have undesirable behavior if the connector is targeting a 
> Kafka cluster different from the Connect cluster's backing Kafka cluster and 
> the user has configured the consumer overrides appropriately for their 
> connector, but not the admin overrides (something we also discussed 
> previously 
> [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).
> In the above case, if a user attempts to reset their sink connector's offsets 
> via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
> will occur:
>  # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
> which returns an empty partition offsets map for the sink connector's 
> consumer group ID (it exists on a different Kafka cluster to the one that the 
> admin client is connecting to).
>  # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
> could cause the sink connector to propagate the offsets reset related changes 
> to the sink system.
>  # We attempt to delete the consumer group via 
> {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} 
> which we essentially swallow in order to keep offsets reset operations 
> idempotent and return a success message to the user (even though the real 
> consumer group for the sink connector on the other Kafka cluster hasn't been 
> deleted).
> This will occur i

[GitHub] [kafka] jeffkbkim commented on pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-06-22 Thread via GitHub


jeffkbkim commented on PR #13267:
URL: https://github.com/apache/kafka/pull/13267#issuecomment-1602740242

   failure in `testBumpTransactionalEpoch(String).quorum=kraft – 
kafka.api.TransactionsTest` again
   
   ```
   org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
6ms while awaiting InitProducerId
   ```
   
   There's 
   ```
   [2023-06-22 00:21:19,219] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition 
__transaction_state-1. This error may be returned transiently when the 
partition is being created or deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70)
   [2023-06-22 00:21:19,329] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Partition topic1-1 marked as failed 
(kafka.server.ReplicaFetcherThread:70)
   [2023-06-22 00:21:19,330] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Partition topic2-1 marked as failed 
(kafka.server.ReplicaFetcherThread:70)
   ```
   
   running locally 50 times pass both kraft and zk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-22 Thread via GitHub


machi1990 commented on PR #13903:
URL: https://github.com/apache/kafka/pull/13903#issuecomment-1602701557

   > Thank you for the change. We had an automated open PR #13743 for it for 
quite some time now. 
   
   Right, I wasn't aware of that PR. I should have checked before opening a 
similar one. 
   
   > Can you please explain how did you test this change?
   
   I am following the guide 
https://github.com/apache/kafka/blob/50c05cea8e9f24ebb88e342d3ba5d6dc062c09c5/tests/README.md#running-tests-using-docker
 
   
   I opened a PR after running a few specific tests and they were passing. 
   Currently I am running the whole test suite - this takes time, I'll report 
back once it finishes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-22 Thread via GitHub


divijvaidya commented on PR #13903:
URL: https://github.com/apache/kafka/pull/13903#issuecomment-1602677237

   Thank you for the change. We had an automated open PR 
https://github.com/apache/kafka/pull/13743 for it for quite some time now. Can 
you please explain how did you test this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13895: KAFKA-15098 Allow authorizers to be configured in ZK migration

2023-06-22 Thread via GitHub


mumrah commented on PR #13895:
URL: https://github.com/apache/kafka/pull/13895#issuecomment-1602650945

   Test failures are the same as on trunk (left over thread)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13895: KAFKA-15098 Allow authorizers to be configured in ZK migration

2023-06-22 Thread via GitHub


mumrah merged PR #13895:
URL: https://github.com/apache/kafka/pull/13895


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13903: MINOR: Bump requests from 2.24.0 to 2.31.0 in /tests

2023-06-22 Thread via GitHub


machi1990 opened a new pull request, #13903:
URL: https://github.com/apache/kafka/pull/13903

   Update "requests" lib used in system tests to version "2.31.0" to fix 
CVE-2023-32681: Unintended leak of Proxy-Authorization header in requests
   
   The update is done as a best practice after I noticed it while setting up & 
running the system tests locally
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 opened a new pull request, #13902: MINOR: fix flaky ZkMigrationIntegrationTest.testNewAndChangedTopicsIn…

2023-06-22 Thread via GitHub


chia7712 opened a new pull request, #13902:
URL: https://github.com/apache/kafka/pull/13902

   ```
   org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: 
Unhandled error in MetadataChangeEvent: Session expired either before or while 
waiting for connection
at 
app//kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:266)
at 
app//kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:248)
at 
app//kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:248)
at 
app//kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:2082)
at 
app//kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1921)
at 
app//kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1916)
at 
app//kafka.zk.KafkaZkClient.updateMigrationState(KafkaZkClient.scala:1707)
at 
app//kafka.zk.ZkMigrationClient.$anonfun$setMigrationRecoveryState$1(ZkMigrationClient.scala:112)
at 
app//kafka.zk.ZkMigrationClient.setMigrationRecoveryState(ZkMigrationClient.scala:67)
at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$2(KRaftMigrationDriver.java:703)
at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:233)
at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$800(KRaftMigrationDriver.java:63)
at 
app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:702)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base@17.0.7/java.lang.Thread.run(Thread.java:833)
   Caused by: kafka.zookeeper.ZooKeeperClientExpiredException: Session expired 
either before or while waiting for connection
... 17 more
   ```
   
   the fix is copied from #13758
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13859: KAFKA-15093: Add 3.4 and 3.5 to core upgrade and compatibility tests

2023-06-22 Thread via GitHub


mimaison commented on code in PR #13859:
URL: https://github.com/apache/kafka/pull/13859#discussion_r1238507645


##
tests/kafkatest/tests/core/upgrade_test.py:
##
@@ -94,6 +94,12 @@ def perform_upgrade(self, from_kafka_version, 
to_message_format_version=None):
 self.wait_until_rejoin()
 
 @cluster(num_nodes=6)
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["none"])
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["lz4"])
+@parametrize(from_kafka_version=str(LATEST_3_5), 
to_message_format_version=None, compression_types=["snappy"])

Review Comment:
   To be honest I don't know. At this point, we've copied the same lines as the 
previous releases. Maybe this could be improved. If so I'd rather separate it 
from this PR and handle this out of a release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13849: Add 3.5.0 and 3.4.1 to system tests

2023-06-22 Thread via GitHub


mimaison commented on PR #13849:
URL: https://github.com/apache/kafka/pull/13849#issuecomment-1602609810

   I still don't see the packages in the S3 bucket, so re-pinging @mjsax 
@vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru closed pull request #13725: Backport KAFKA-14172 to 3.2

2023-06-22 Thread via GitHub


lucasbru closed pull request #13725: Backport KAFKA-14172 to 3.2
URL: https://github.com/apache/kafka/pull/13725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13900: MINOR: some minor cleanups for process startup

2023-06-22 Thread via GitHub


divijvaidya commented on code in PR #13900:
URL: https://github.com/apache/kafka/pull/13900#discussion_r1238424518


##
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala:
##
@@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: 
Time, val logger: Logg
   false
 }
   }
+
+  def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {

Review Comment:
   we should have a similar method for unregister which would be called on 
shutdown



##
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala:
##
@@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: 
Time, val logger: Logg
   false
 }
   }
+
+  def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {
+def registerGauge(name: String, gauge: Gauge[Long]): Unit = {
+  val metricName = KafkaYammerMetrics.getMetricName(
+"kafka.server",

Review Comment:
   please correct me if I am wrong here, this is a change in metric name. 
Earlier when we were using `metricsGroup` in `ControllerServer` the name of the 
metric would have included ControllerServer but now it includes KafkaServer.
   Isn't this a breaking change?



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)

Review Comment:
   This is a new change. Earlier brokers didn't have this metric. Please update 
the PR title and description to reflect this or start a new PR for this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238418966


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(
 String groupId,
+Group.GroupType groupType,
 boolean createIfNotExists
-) throws GroupIdNotFoundException {
+) throws InvalidGroupIdException, GroupIdNotFoundException {
+if (groupId == null || groupId.isEmpty()) {
+throw new InvalidGroupIdException(String.format("Group id %s is 
invalid.", groupId));
+}

Review Comment:
   i think that static validation could be done in the group coordina

[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-06-22 Thread via GitHub


dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1238417804


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -171,70 +260,152 @@ GroupMetadataManager build() {
 /**
  * The maximum number of members allowed in a single consumer group.
  */
-private final int consumerGroupMaxSize;
+private final int groupMaxSize;
 
 /**
  * The heartbeat interval for consumer groups.
  */
 private final int consumerGroupHeartbeatIntervalMs;
 
 /**
- * The topics metadata (or image).
+ * The metadata image.
+ */
+private MetadataImage metadataImage;
+
+// Rest of the fields are used for the generic group APIs.
+
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult, Record> 
EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));
+
+/**
+ * Initial rebalance delay for members joining a generic group.
+ */
+private final int initialRebalanceDelayMs;
+
+/**
+ * The timeout used to wait for a new member in milliseconds.
+ */
+private final int newMemberJoinTimeoutMs;
+
+/**
+ * The group minimum session timeout.
+ */
+private final int groupMinSessionTimeoutMs;
+
+/**
+ * The group maximum session timeout.
+ */
+private final int groupMaxSessionTimeoutMs;
+
+/**
+ * The timer to add and cancel group operations.
  */
-private TopicsImage topicsImage;
+private final Timer, Record> timer;
+
+/**
+ * The time.
+ */
+private final Time time;
 
 private GroupMetadataManager(
 SnapshotRegistry snapshotRegistry,
 LogContext logContext,
 List assignors,
-TopicsImage topicsImage,
-int consumerGroupMaxSize,
-int consumerGroupHeartbeatIntervalMs
+MetadataImage metadataImage,
+TopicPartition topicPartition,
+int groupMaxSize,
+int consumerGroupHeartbeatIntervalMs,
+int initialRebalanceDelayMs,
+int newMemberJoinTimeoutMs,
+int groupMinSessionTimeoutMs,
+int groupMaxSessionTimeoutMs,
+Timer, Record> timer,
+Time time
 ) {
+this.logContext = logContext;
 this.log = logContext.logger(GroupMetadataManager.class);
 this.snapshotRegistry = snapshotRegistry;
-this.topicsImage = topicsImage;
+this.metadataImage = metadataImage;
 this.assignors = 
assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, 
Function.identity()));
+this.topicPartition = topicPartition;
 this.defaultAssignor = assignors.get(0);
 this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
-this.consumerGroupMaxSize = consumerGroupMaxSize;
+this.groupMaxSize = groupMaxSize;
 this.consumerGroupHeartbeatIntervalMs = 
consumerGroupHeartbeatIntervalMs;
+this.initialRebalanceDelayMs = initialRebalanceDelayMs;
+this.newMemberJoinTimeoutMs = newMemberJoinTimeoutMs;
+this.groupMinSessionTimeoutMs = groupMinSessionTimeoutMs;
+this.groupMaxSessionTimeoutMs = groupMaxSessionTimeoutMs;
+this.timer = timer;
+this.time = time;
+}
+
+/**
+ * When a new metadata image is pushed.
+ *
+ * @param metadataImage The new metadata image.
+ */
+public void onNewMetadataImage(MetadataImage metadataImage) {
+this.metadataImage = metadataImage;
 }
 
 /**
  * Gets or maybe creates a consumer group.
  *
  * @param groupId   The group id.
+ * @param groupType The group type (generic or consumer).
  * @param createIfNotExists A boolean indicating whether the group should 
be
  *  created if it does not exist.
  *
  * @return A ConsumerGroup.
+ * @throws InvalidGroupIdException  if the group id is invalid.
  * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
  *  if the group is not a consumer group.
  *
  * Package private for testing.
  */
-ConsumerGroup getOrMaybeCreateConsumerGroup(
+// Package private for testing.
+Group getOrMaybeCreateGroup(

Review Comment:
   i am not convinced. the downside is that it will be harder to guarantee to 
uniqueness of the group id. it also means that we would have to check both maps 
for all other operations (e.g. list, delete, etc.). i think that it would be 
better to keep them in a single map.
   
   for this particular case, we could just have two methods:  
`getOrMaybeCreateConsumerGroup` and `getOrMaybeCreateGenericGroup`.



-- 
This is an automate

[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-22 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1238403670


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerDe.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerDe implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short keyVersion = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(keyVersion);
+readMessage(keyMessage, keyBuffer, keyVersion, "key");
+
+if (valueBuffer == null) {
+return new Record(
+new ApiMessageAndVersion(keyMessage, keyVersion),
+null
+);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(keyVersion);
+final short valueVersion = readVersion(valueBuffer, "value");
+readMessage(valueMessage, valueBuffer, valueVersion, "value");
+
+return new Record(
+new ApiMessageAndVersion(keyMessage, keyVersion),
+new ApiMessageAndVersion(valueMessage, valueVersion)
+);
+}
+
+private short readVersion(ByteBuffer buffer, String name) throws 
RuntimeException {
+try {
+return buffer.getShort();
+} catch (BufferUnderflowException ex) {
+throw new RuntimeException(String.format("Could not read version 
from %s's buffer.", name));

Review Comment:
   Actually, ApiMessage#apiKey is define

[GitHub] [kafka] cadonna commented on a diff in pull request #13846: KAFKA-15022: [1/N] add configs to control rack aware assignment

2023-06-22 Thread via GitHub


cadonna commented on code in PR #13846:
URL: https://github.com/apache/kafka/pull/13846#discussion_r1238377947


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +755,18 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+public static final String 
RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY = 
"MIN_TRAFFIC_BALANCE_SUBTOPOLOGY";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
   public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of topic partitions into account when assigning"
   ```



##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1375,6 +1375,18 @@ public void shouldReturnDefaultClientSupplier() {
 assertTrue(supplier instanceof DefaultKafkaClientSupplier);
 }
 
+@Test
+public void shouldReturnDefaultRackAwareAssignmentConfig() {
+final String strategy = 
streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY);
+assertEquals(StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, 
strategy);
+}
+

Review Comment:
   I think you also need to test setting the three valid values. In the end 
they are part of the public API, and should be a test that verifies that they 
are accepted. 
   
   The test should be like this:
   
https://github.com/apache/kafka/blob/474053d2973b8790e50ccfe1bb0699694b0de1c7/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java#L615
   
   Do not use constants `RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, or 
RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY` because you 
want to ensure that the test fails if the content of those constants changes. 
   



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,6 +902,12 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY,
+Type.STRING,
+RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC, 
RACK_AWARE_ASSSIGNMENT_STRATEGY_MIN_TRAFFIC_BALANCE_SUBTOPOLOGY),

Review Comment:
   nit: I think you could also use an enum like here: 
https://github.com/apache/kafka/blob/c958d8719dc2588bd27958b54a65dea514808796/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L907
 
   
   But that is not needed, I just wanted to give you an alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1602461968

   Thanks @yashmayya , I addressed your comments and added a response for one 
of the questions. Let me know if that makes sense. Regarding 
https://github.com/apache/kafka/pull/13801#discussion_r1233934686, I am still 
undecided but I feel it's somewhat ok to have it in `OffsetStorageWriterTest` 
still since that's the interface that the tasks actually interact with. We can 
get to this later on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1238381420


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case of ALOS, we wait for the same 
duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (isEOSEnabled) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone offsets to secondary store 
interrupted, cancelling", this);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone offsets to secondary store 
threw an unexpected exception: ", this, e);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (TimeoutException e) {
+log.error("{} Timed out waiting to flush offsets with 
tombstones to secondary storage ", this);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush tombstone 
offsets to secondary storage", this);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+}
+Throwable writeError = secondaryStoreTombstoneWriteError.get();
+if (writeError != null) {
+FutureCallback failedWriteCallback = new 
FutureCallback<>(callback);
+failedWriteCallback.onCompletion(writeError, null);
+return failedWriteCallback;
+}
+}
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
-if (secondaryStore != null) {
+// Secondary store writes have already happened for tombstone 
records
+if (secondaryStore != null && !containsTombstones) {
 if (primaryWriteError != null) {
-log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
+log.trace("Skipping offsets write to secondary store for 
non tombstone offsets because primary write has failed", primaryWriteError);

Review Comment:
   yeah, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1238380902


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case of ALOS, we wait for the same 
duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (isEOSEnabled) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone offsets to secondary store 
interrupted, cancelling", this);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone offsets to secondary store 
threw an unexpected exception: ", this, e);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (TimeoutException e) {
+log.error("{} Timed out waiting to flush offsets with 
tombstones to secondary storage ", this);
+secondaryStoreTombstoneWriteError.compareAndSet(null, e);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush tombstone 
offsets to secondary storage", this);

Review Comment:
   done



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-06-22 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736073#comment-17736073
 ] 

Luke Chen commented on KAFKA-15083:
---

[~satishd] , had a try and I think we still need to pass in the 
remote.log.metadata.* configs into RLMM, for configs like: 
`"remote.log.metadata.topic.replication.factor`, 
`remote.log.metadata.topic.num.partitions`,... etc. I'll work on the 
implementation.

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-06-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15083:
-

Assignee: Luke Chen  (was: Satish Duggana)

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1238371990


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");

Review Comment:
   yeah makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1238370253


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case of ALOS, we wait for the same 
duration as `offset.commit.timeout.ms`

Review Comment:
   ack, makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-22 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1238369128


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});

Review Comment:
   I see what you are saying. I think what you mean is that the callback passed 
to the `set` method itself can be provided to the `KafkaOffsetBackingStore#set` 
method and it will take care of completing it when the `onComplete` of 
`SetCallback` is completed. I think that makes sense but the reason I did this 
was that I can't return the `Future` object created when invoking 
`KafkaOffsetBackingStore#set` object always because if that operation succeeds 
then we also need to write to the primary store. That's why, created this extra 
callback which gets executed upon `onComplete` of `SetCallback` and with this I 
can selectively return a `FutureCallBack` object by giving it the 
original callback. Let me know if that makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics

2023-06-22 Thread via GitHub


hudeqi commented on PR #13852:
URL: https://github.com/apache/kafka/pull/13852#issuecomment-1602409354

   Thank you for your reply! @C0urante, seeing so many of your thoughts, I 
think it is very meaningful to think about! Below I will give my actual results 
and thoughts for each question.
   1. I agree with you, if it is not mandatory to overwrite the user-defined 
value, then we may need to give a warning log to prompt for this case.
   2. The reason why I encountered this kind of case is that there are too many 
topics synchronized in the MM cluster, too many partitions (very common and may 
not be avoided), the frequent update and storage of offset information leads to 
too large internal offset topic. As for internal config topic and internal 
status topic, I think it is difficult to meet the conditions for a large 
increase, and I haven't encountered it yet. Therefore, if we follow the 
"principle of least change", we may not need to make any adjustments for these 
two internal topics.
   3. It's ok here if we respect the point of the first answer.
   4. I've been calling it "MM2" probably inappropriately. Because in fact, I 
realized topic replication in the form of MirrorSourceConnector through 
DistributedHerder. Your idea is right, it may be beneficial for whole connect, 
I have no experience with other types of non-topic replication, this point 
needs your deciding. But if we use the connect cluster to achieve topic 
replication like me, I think this problem also needs to be solved.
   5. This is how I do it as you say: if the current connect cluster already 
exists, I directly adjust the log segment size corresponding to the internal 
offset topic through “kafka-topic.sh”, which is a bit tricky. Although I 
mentioned setting 100MB in PR, in practice, I think it may still be a bit 
large. I set it to 50MB, and finally the startup time was shortened to 30s(It 
has been compacted to a total of only about 700MB), which may not have reached 
the 'worst case' you said (every partition It is full and the prior segment is 
also full)'. Maybe we can also increase the number of consumer threads that 
read offsets, what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-22 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+.timeoutMs((int) timer.remainingMs());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+.get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+timer.update();
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
-
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+connName, e);
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
 }
+updateTimerAndCheckExpiry(time

[GitHub] [kafka] showuon commented on pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-22 Thread via GitHub


showuon commented on PR #13760:
URL: https://github.com/apache/kafka/pull/13760#issuecomment-1602323587

   @tinaselenge , it looks like some tests are failed: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13760/6/
   Please also take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-22 Thread via GitHub


showuon commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1238266317


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched {
+
+private final Map recordsToDelete;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DeleteRecordsHandler(
+Map recordsToDelete,
+LogContext logContext
+) {
+this.recordsToDelete = recordsToDelete;
+this.log = logContext.logger(DeleteRecordsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "deleteRecords";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+public static SimpleAdminApiFuture 
newFuture(
+Collection topicPartitions
+) {
+return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+}
+
+@Override
+public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map 
deletionsForTopic = new HashMap<>();
+for (Map.Entry entry: 
recordsToDelete.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
+topicPartition.topic(),
+key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
+);
+deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+.setPartitionIndex(topicPartition.partition())
+.setOffset(entry.getValue().beforeOffset()));
+}
+
+DeleteRecordsRequestData data = new DeleteRecordsRequestData()
+.setTopics(new ArrayList<>(deletionsForTopic.values()));

Review Comment:
   No, I don't think they are the same thing. The invokeDriver timeout is set 
in the client side for client timeout the request. But the `timeoutMs` field in 
`DeleteRecordsRequest` is to send to the server side to notify the server about 
the timeout.
   
   REF: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DeleteRecordsRequest.json#L39



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-22 Thread via GitHub


showuon commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1238257723


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2359,31 +2334,22 @@ public void testDeleteRecords() throws Exception {
 assertTrue(e0.getCause() instanceof OffsetOutOfRangeException);
 }
 
-// "leader not available" failure on metadata request for 
partition 2
+// not authorized to delete records for partition 2
 KafkaFuture myTopicPartition2Result = 
values.get(myTopicPartition2);
 try {
 myTopicPartition2Result.get();
 fail("get() should throw ExecutionException");
 } catch (ExecutionException e1) {
-assertTrue(e1.getCause() instanceof 
LeaderNotAvailableException);
+assertTrue(e1.getCause() instanceof 
TopicAuthorizationException);
 }

Review Comment:
   Ah, I see. Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-22 Thread via GitHub


dajac opened a new pull request, #13901:
URL: https://github.com/apache/kafka/pull/13901

   This patch adds (1) the logic to propagate a new MetadataImage to the 
running coordinators; and (2) the logic to ensure that all the consumer groups 
subscribed to topics with changes will refresh their subscriptions metadata on 
the next heartbeat. In the mean time, it ensures that freshly loaded consumer 
groups also refresh their subscriptions metadata on the next heartbeat.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15113) Gracefully handle cases where a sink connector's admin and consumer client config overrides target different Kafka clusters

2023-06-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15113:
--

 Summary: Gracefully handle cases where a sink connector's admin 
and consumer client config overrides target different Kafka clusters
 Key: KAFKA-15113
 URL: https://issues.apache.org/jira/browse/KAFKA-15113
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Yash Mayya


Background reading -
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]
 

 

>From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] -
{quote}Currently, admin clients are only instantiated for sink connectors to 
create the DLQ topic if required. So it seems like it could be technically 
possible for a sink connector's consumer client overrides to target a different 
Kafka cluster from its producer and admin client overrides. Such a setup won't 
work with this implementation of the get offsets API as it is using an admin 
client to get a sink connector's consumer group offsets. However, I'm not sure 
we want to use a consumer client to retrieve the offsets either as we shouldn't 
be disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently 
document|https://kafka.apache.org/documentation/#connect] client config 
overrides for Connect beyond just the worker property 
{{{}connector.client.config.override.policy{}}}).
{quote}
 
{quote}I don't think we need to worry too much about this. I cannot imagine a 
sane use case that involves overriding a connector's Kafka clients with 
different Kafka clusters (not just bootstrap servers, but actually different 
clusters) for producer/consumer/admin. I'd be fine with adding a note to our 
docs that that kind of setup isn't supported but I really, really hope that 
it's not necessary and nobody's trying to do that in the first place. I also 
suspect that there are other places where this might cause issues, like with 
exactly-once source support or automatic topic creation for source connectors.

That said, there is a different case we may want to consider: someone may have 
configured consumer overrides for a sink connector, but not admin overrides. 
This may happen if they don't use a DLQ topic. I don't know if we absolutely 
need to handle this now and we may consider filing a follow-up ticket to look 
into this, but one quick-and-dirty thought I've had is to configure the admin 
client used here with a combination of the configurations for the connector's 
admin client and its consumer, giving precedent to the latter.
{quote}
 

Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] -
{quote}We will have undesirable behavior if the connector is targeting a Kafka 
cluster different from the Connect cluster's backing Kafka cluster and the user 
has configured the consumer overrides appropriately for their connector, but 
not the admin overrides (something we also discussed previously 
[here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]).

In the above case, if a user attempts to reset their sink connector's offsets 
via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following 
will occur:
 # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} 
which returns an empty partition offsets map for the sink connector's consumer 
group ID (it exists on a different Kafka cluster to the one that the admin 
client is connecting to).
 # We call {{SinkConnector::alterOffsets}} with an empty offsets map which 
could cause the sink connector to propagate the offsets reset related changes 
to the sink system.
 # We attempt to delete the consumer group via {{Admin::deleteConsumerGroups}} 
which returns {{GroupIdNotFoundException}} which we essentially swallow in 
order to keep offsets reset operations idempotent and return a success message 
to the user (even though the real consumer group for the sink connector on the 
other Kafka cluster hasn't been deleted).

This will occur if the connector's admin overrides are missing OR the admin 
overrides are deliberately configured to target a Kafka cluster different from 
the consumer overrides (although like you pointed out in the other linked 
thread, this doesn't seem like a valid use case that we'd even want to support).

I guess we'd want to pursue the approach you suggested where we'd configure the 
admin client with a combination of the connector's admin overrides and 

[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-22 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1238107526


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))

Review Comment:
   The state machine for the partition is defined here: 
https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java#L137.
 The runtime checks if the coordinator is in active state to service requests; 
otherwise it throws the relevant exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-22 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1238104197


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](

Review Comment:
   correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-22 Thread via GitHub


dajac merged PR #13812:
URL: https://github.com/apache/kafka/pull/13812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-22 Thread via GitHub


dajac commented on PR #13812:
URL: https://github.com/apache/kafka/pull/13812#issuecomment-1602115656

   Failed tests are not related:
   ```
   Build / JDK 17 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 50s
   Build / JDK 8 and Scala 2.12 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
   1m 14s
   Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org