[GitHub] [kafka] dajac commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-11 Thread via GitHub


dajac commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1260620995


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Yeah, we will. We have to do two things: 1) parameterise existing 
integration/system tests to run with the new group coordinator. i hope that 
this will cover a bug chunk of the new coordinator; and 2) extend 
integration/system tests where needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-11 Thread via GitHub


dajac commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1260620995


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   Yeah, we will. We have to do two things: 1) parameterise existing 
integration/system tests to run with the new group coordinator; and 2) extend 
integration/system tests where needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260618876


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);

Review Comment:
   > i'm a bit confused - if the timer advances by heartbeat interval then 
shouldn't the existing session timeout expire?
   
   The session timeout expires based on the session timeout not the heartbeat 
interval, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13999: KAFKA-15176: add tests for tiered storage metrics

2023-07-11 Thread via GitHub


showuon opened a new pull request, #13999:
URL: https://github.com/apache/kafka/pull/13999

   Added tests for metrics:
   1. RemoteLogReaderTaskQueueSize
   2. RemoteLogReaderAvgIdlePercent
   3. RemoteLogManagerTasksAvgIdlePercent
   
   Also, added tests for `OffsetOutOfRangeException` will be thrown while 
reading logs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260533220


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {

Review Comment:
   should we add a test case for revocation/session timeouts `onLoaded()`?
   
   nvm - looks like we already have one :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260536510


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);

Review Comment:
   looking at the other test it looks like we expire when we pass the deadline 
and not at exactly the deadline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260536510


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);

Review Comment:
   looking at the other test it looks like we expire when we pass the deadline 
and not at exactly the deadline. should we assert that the timeout still exists 
and the member is still part of the group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260535581


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);

Review Comment:
   i'm a bit confused - if the timer advances by heartbeat interval then 
shouldn't the existing session timeout expire?
   
   or is the member removed from the group after this block?
   
   i think it would also be good to assertSessionTimeout after advancing the 
clock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260533220


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {

Review Comment:
   should we add a test case for revocation/session timeouts `onLoaded()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1260529472


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(

Review Comment:
   will we have a full integration test for the new group coordinator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] github-actions[bot] commented on pull request #12980: KAFKA-14464: Make resource leaks for MM2 resources more difficult

2023-07-11 Thread via GitHub


github-actions[bot] commented on PR #12980:
URL: https://github.com/apache/kafka/pull/12980#issuecomment-1631799245

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1260523356


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val backoffDeadlineMs = new AtomicLong(NoRetry)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-if (block == null) {
-  // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-  // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-} else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+var iteration = 0
+while (result == null) {
+  currentProducerIdBlock.get.claimNextId().asScala match {
+case None =>
+  // Check the next block if current block is full
+  val block = nextProducerIdBlock.getAndSet(null)
+  if (block == null) {
+// Return COORDINATOR_LOAD_IN_PROGRESS rather than 
REQUEST_TIMED_OUT since older clients treat the error as fatal
+// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+maybeRequestNextBlock()
+result = 
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is 
full. Waiting for next block"))

Review Comment:
   hmm, i don't recall being involved in that discussion. should i create a 
jira for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


hudeqi commented on PR #13992:
URL: https://github.com/apache/kafka/pull/13992#issuecomment-1631789763

   > Thanks for the fix @hudeqi ! I was wondering if you had plans to address 
this problem/warning elsewhere in the codebase?
   
   I did a global search for the `removeAll` method of set in the codebase. 
There are indeed many `set.removeAll(list)`, but this usage may have 
performance problems only when there are many collection elements. At least in 
the connect module there is nothing extra except that mentioned in this PR.
   In other modules, there may be this 
[one](https://github.com/apache/kafka/blob/f3ee9ff90ff107942280fb0c4bd83e7ebc5c062c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L177)
 that needs to be optimized to meet these conditions, which may not be suitable 
for mentioning in this PR (belonging to the MM2 tag). @gharris1727 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


hudeqi commented on code in PR #13992:
URL: https://github.com/apache/kafka/pull/13992#discussion_r1260504405


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -321,10 +321,10 @@ void refreshTopicPartitions()
 if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || 
!missingInTarget.isEmpty()) {
 
 Set newTopicPartitions = sourceTopicPartitionsSet;
-newTopicPartitions.removeAll(knownSourceTopicPartitions);
+newTopicPartitions.removeAll(knownSourceTopicPartitionsSet);
 
 Set deletedTopicPartitions = 
knownSourceTopicPartitionsSet;
-deletedTopicPartitions.removeAll(sourceTopicPartitions);
+deletedTopicPartitions.removeAll(sourceTopicPartitionsSet);

Review Comment:
   sorry, I ignore that...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (KAFKA-13808) Mirrormaker2 stop sync data when modify topic partition in "Running a dedicated MirrorMaker cluster" mode

2023-07-11 Thread YANGLiiN (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YANGLiiN closed KAFKA-13808.


3.5.0 fixed.

> Mirrormaker2  stop sync data when modify topic partition in "Running a 
> dedicated MirrorMaker cluster" mode
> --
>
> Key: KAFKA-13808
> URL: https://issues.apache.org/jira/browse/KAFKA-13808
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: YANGLiiN
>Priority: Major
>
> When use the mirrormaker2 with the "Running a dedicated MirrorMaker cluster" 
> mode by 3 nodes, once we modify the topic partition , then the mm2 stop sync 
> data with the following ERROR :
>  
> [2022-02-18 10:26:19,410] ERROR Error forwarding REST request 
> (org.apache.kafka.connect.runtime.rest.RestClient)
> java.lang.IllegalArgumentException: Invalid URI host: null (authority: null)
>     at org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521)
>     at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506)
>     at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464)
>     at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453)
>     at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107)
>     at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> [2022-02-18 10:26:19,416] ERROR [Worker clientId=connect-1, groupId=sync2022] 
> Request to leader to reconfigure connector tasks failed 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Error 
> trying to forward REST request: Invalid URI host: null (authority: null)
>     at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:147)
>     at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.IllegalArgumentException: Invalid URI host: null 
> (authority: null)
>     at org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521)
>     at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506)
>     at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464)
>     at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453)
>     at 
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107)
>     ... 6 more
>  
> the root cause is the connect doesn't process the mm2 add the 'NOTUSED' url 
> on the slave node
> see 
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L122]
>  
> we should add the "NOTUSED" logic below the code 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1593-L1599]
>  
> {code:java}
> if (leaderUrl.startsWith("NOTUSED")) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> return;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-11 Thread via GitHub


jeffkbkim commented on PR #13870:
URL: https://github.com/apache/kafka/pull/13870#issuecomment-1631769383

   @dajac thanks for the review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1260500922


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions(
-return false;
+// First round of join requests. Generate member ids. All requests 
will be accepted
+// as the group is still Empty.
+List> responseFutures = new 
ArrayList<>();
+for (int i = 0; i < groupMaxSize + 1; i++) {
+if (i == 0) {
+responseFutures.add(context.sendGenericGroupJoin(
+request,
+requiredKnownMemberId,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+));
+} else {
+responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+}
+}
 
-return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 

[GitHub] [kafka] rreddy-22 opened a new pull request, #13998: KIP-848: Rack awareness interface changes for assignors

2023-07-11 Thread via GitHub


rreddy-22 opened a new pull request, #13998:
URL: https://github.com/apache/kafka/pull/13998

   We want to make the assignors rack aware and in order to obtain and provide 
the rack Information the following interface modifications were made:-
   1) AbstractPartitionAssignor implements the PartitionAssignor Interface : 
This contains an assign function with a metadata image argument that will be 
called from the TargetAssignmentBuilder.
   2) TopicAndClusterMetadata :- This is a new data structure that will hold 
the ClusterImage and the TopicsImage both of which will be obtained from the 
MetadataImage.
   
   The rackInfo will be passed to another assign function which will have a 
List of rackAwareTopicIdPartitions. This information will be used by the 
assignors to match partition replica racks with consumer racks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-07-11 Thread via GitHub


philipnee commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1260427182


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());

Review Comment:
   Hey @junrao - I believe we still want to commit the offset before there is a 
partition change. I think if we don't commit offset here, it is possible to 
lose some progress. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-11 Thread via GitHub


philipnee commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1631684973

   Hey @flashmouse - could you explain what do you mean by `useless loop`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #13997: KAFKA-15180: Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-11 Thread via GitHub


kirktrue opened a new pull request, #13997:
URL: https://github.com/apache/kafka/pull/13997

   Update the integration tests to swap the use of the concrete `KafkaConsumer` 
class to the generic `Consumer` interface.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky opened a new pull request, #13996: KAFKA-15022: [2/N] introduce graph to compute min cost

2023-07-11 Thread via GitHub


lihaosky opened a new pull request, #13996:
URL: https://github.com/apache/kafka/pull/13996

   ### Description
   Introduce graph to calculate min-flow if an existing flow already input
   
   ### Test
   Unit test. Will add more unit test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15139) Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`

2023-07-11 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15139:

Fix Version/s: 3.6.0

> Optimize the performance of `Set.removeAll(List)` in 
> `MirrorCheckpointConnector`
> 
>
> Key: KAFKA-15139
> URL: https://issues.apache.org/jira/browse/KAFKA-15139
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.5.0
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
> Fix For: 3.6.0
>
>
> This is the hint of `removeAll` method in `Set`:
> _This implementation determines which is the smaller of this set and the 
> specified collection, by invoking the size method on each. If this set has 
> fewer elements, then the implementation iterates over this set, checking each 
> element returned by the iterator in turn to see if it is contained in the 
> specified collection. If it is so contained, it is removed from this set with 
> the iterator's remove method. If the specified collection has fewer elements, 
> then the implementation iterates over the specified collection, removing from 
> this set each element returned by the iterator, using this set's remove 
> method._
> That's said, assume that _M_ is the number of elements in the set and _N_ is 
> the number of elements in the List, if the type of the specified collection 
> is `List`, and {_}M<=N{_}, then the time complexity of `removeAll` is _O(MN)_ 
> (because the time complexity of searching in List is {_}O(N){_}), on the 
> contrary, if {_}N {_}O(N){_}.
> In `MirrorCheckpointConnector`, `refreshConsumerGroups` method is repeatedly 
> called in a daemon thread. There are two `removeAll` in this method. From a 
> logical point of view, when this method is called in one round, when the 
> number of groups in the source cluster simply increases or decreases, the two 
> `removeAll` execution strategies will always hit the _O(MN)_ situation 
> mentioned above. Therefore, it is better to change all the variables here to 
> Set type to avoid this "low performance".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15180:
--
Description: For the consumer threading refactor project, we're introducing 
a new implementation of the {{Consumer}} interface. However, most of the 
instances in the integration tests specifically use the concrete implementation 
{{{}KafkaConsumer{}}}. This task is to generalize those uses where possible to 
use the {{Consumer}} interface.  (was: For the consumer threading refactor 
project, we're introducing a new implementation of the `Consumer` interface. 
However, most of the instances in the integration tests specifically use the 
concrete implementation `KafkaConsumer`. This task is to generalize those uses 
where possible to the `Consumer` interface.)

> Generalize integration tests to change use of KafkaConsumer to Consumer
> ---
>
> Key: KAFKA-15180
> URL: https://issues.apache.org/jira/browse/KAFKA-15180
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> For the consumer threading refactor project, we're introducing a new 
> implementation of the {{Consumer}} interface. However, most of the instances 
> in the integration tests specifically use the concrete implementation 
> {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible 
> to use the {{Consumer}} interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-11 Thread Kirk True (Jira)
Kirk True created KAFKA-15180:
-

 Summary: Generalize integration tests to change use of 
KafkaConsumer to Consumer
 Key: KAFKA-15180
 URL: https://issues.apache.org/jira/browse/KAFKA-15180
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True


For the consumer threading refactor project, we're introducing a new 
implementation of the `Consumer` interface. However, most of the instances in 
the integration tests specifically use the concrete implementation 
`KafkaConsumer`. This task is to generalize those uses where possible to the 
`Consumer` interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on pull request #13528: KAFKA-14879: Update system tests to use latest versions

2023-07-11 Thread via GitHub


kirktrue commented on PR #13528:
URL: https://github.com/apache/kafka/pull/13528#issuecomment-1631558225

   @jolshan Can you take a look at this when you have some time? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1260298077


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions(
-return false;
+// First round of join requests. Generate member ids. All requests 
will be accepted
+// as the group is still Empty.
+List> responseFutures = new 
ArrayList<>();
+for (int i = 0; i < groupMaxSize + 1; i++) {
+if (i == 0) {
+responseFutures.add(context.sendGenericGroupJoin(
+request,
+requiredKnownMemberId,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+));
+} else {
+responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+}
+}
 
-return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-11 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1260282790


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
-private  void assertUnorderedListEquals(
-List expected,
-List actual
-) {
-assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-}
+@Test
+public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(10)
+.build();
 
-private void assertResponseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (!responseEquals(expected, actual)) {
-assertionFailure()
-.expected(expected)
-.actual(actual)
-.buildAndThrow();
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.withReason("exceed max group size")
+.build();
+
+for (int i = 0; i < 10; i++) {
+CompletableFuture responseFuture;
+if (i == 0) {
+responseFuture = context.sendGenericGroupJoin(
+request,
+false,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+);
+} else {
+responseFuture = context.sendGenericGroupJoin(request);
+}
+assertFalse(responseFuture.isDone());
 }
+CompletableFuture responseFuture = 
context.sendGenericGroupJoin(request);
+assertTrue(responseFuture.isDone());
+assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
 }
 
-private boolean responseEquals(
-ConsumerGroupHeartbeatResponseData expected,
-ConsumerGroupHeartbeatResponseData actual
-) {
-if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-if (expected.errorCode() != actual.errorCode()) return false;
-if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-if (expected.memberEpoch() != actual.memberEpoch()) return false;
-if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-// Unordered comparison of the assignments.
-return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-}
+@Test
+public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+boolean requiredKnownMemberId = true;
+int groupMaxSize = 10;
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withGenericGroupMaxSize(groupMaxSize)
+.withGenericGroupInitialRebalanceDelayMs(50)
+.build();
 
-private boolean responseAssignmentEquals(
-ConsumerGroupHeartbeatResponseData.Assignment expected,
-ConsumerGroupHeartbeatResponseData.Assignment actual
-) {
-if (expected == actual) return true;
-if (expected == null) return false;
-if (actual == null) return false;
+JoinGroupRequestData request = new JoinGroupRequestBuilder()
+.withGroupId("group-id")
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
 
-if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions(
-return false;
+// First round of join requests. Generate member ids. All requests 
will be accepted
+// as the group is still Empty.
+List> responseFutures = new 
ArrayList<>();
+for (int i = 0; i < groupMaxSize + 1; i++) {
+if (i == 0) {
+responseFutures.add(context.sendGenericGroupJoin(
+request,
+requiredKnownMemberId,
+false,
+new ExpectedGenericGroupResult(Errors.NONE, true)
+));
+} else {
+responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+}
+}
 
-return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 

[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260243479


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() {
 .setGroupId(groupId)
 .setMemberId(memberId1)
 .setMemberEpoch(1)
-.setRebalanceTimeoutMs(9)

Review Comment:
   it must be in first request or when changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260243017


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() {
 .setGroupId(groupId)
 .setMemberId(memberId1)
 .setMemberEpoch(1)
-.setRebalanceTimeoutMs(9)

Review Comment:
   it was not needed so removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260232815


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() {
 .setGroupId(groupId)
 .setMemberId(memberId1)
 .setMemberEpoch(1)
-.setRebalanceTimeoutMs(9)

Review Comment:
   why did we remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-11 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742162#comment-17742162
 ] 

ASF GitHub Bot commented on KAFKA-14995:


stevenbooke commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-163185

   @mimaison I have hidden the GitHub usernames of the kafka committers, they 
do not get displayed when the website is rendered. The display is now the same 
as today.




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742151#comment-17742151
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15178:


Nice catch!

> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix, patch-available
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf

2023-07-11 Thread via GitHub


ableegoldman commented on code in PR #13993:
URL: https://github.com/apache/kafka/pull/13993#discussion_r1260200818


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private Set joinedSubscription;
 private MetadataSnapshot metadataSnapshot;
 private MetadataSnapshot assignmentSnapshot;
+private boolean metadataUpdated;

Review Comment:
   This optimization makes perfect sense but I am extremely paranoid that 
future changes will end up modifying the snapshot without remembering (or even 
knowing about) the need to update this boolean as well. Can you maybe 
encapsulate this in the `MetadataSnapshot` class or otherwise make sure it's 
impossible for this to get out of sync?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13937: Add Streams API broker compatibility table

2023-07-11 Thread via GitHub


mjsax commented on PR #13937:
URL: https://github.com/apache/kafka/pull/13937#issuecomment-1631390728

   Thanks! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13937: Add Streams API broker compatibility table

2023-07-11 Thread via GitHub


mjsax merged PR #13937:
URL: https://github.com/apache/kafka/pull/13937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] JimGalasyn commented on pull request #13937: Add Streams API broker compatibility table

2023-07-11 Thread via GitHub


JimGalasyn commented on PR #13937:
URL: https://github.com/apache/kafka/pull/13937#issuecomment-1631385663

   Thanks for the review, @mjsax! I added the row/col for `3.5.x`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13849: Add 3.5.0 and 3.4.1 to system tests

2023-07-11 Thread via GitHub


mjsax commented on PR #13849:
URL: https://github.com/apache/kafka/pull/13849#issuecomment-1631380479

   Sorry for the delay. Uploaded. Can you verify?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260159325


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[GitHub] [kafka] mjsax commented on a diff in pull request #13937: Add Streams API broker compatibility table

2023-07-11 Thread via GitHub


mjsax commented on code in PR #13937:
URL: https://github.com/apache/kafka/pull/13937#discussion_r1260153209


##
docs/streams/upgrade-guide.html:
##
@@ -1292,6 +1294,57 @@ 
  JoinWindows has no default size anymore: 
JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) 
 
 
+Streams API broker 
compatibility
+
+The following table shows which versions of the Kafka Streams API are 
compatible with various Kafka broker versions.
+
+
+
+  
+
+Kafka Broker (columns)
+  
+
+
+  
+Kafka Streams API (rows)
+0.10.0.x
+0.10.1.x and 0.10.2.x
+0.11.0.x and1.0.x and1.1.x and2.0.x and2.1.x 
and2.2.x and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x 
and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x 
and3.4.x

Review Comment:
   Seems we need to at 3.5 that was released in the mean time?



##
docs/streams/upgrade-guide.html:
##
@@ -1292,6 +1294,57 @@ 
  JoinWindows has no default size anymore: 
JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) 
 
 
+Streams API broker 
compatibility
+
+The following table shows which versions of the Kafka Streams API are 
compatible with various Kafka broker versions.
+
+
+
+  
+
+Kafka Broker (columns)
+  
+
+
+  
+Kafka Streams API (rows)
+0.10.0.x
+0.10.1.x and 0.10.2.x
+0.11.0.x and1.0.x and1.1.x and2.0.x and2.1.x 
and2.2.x and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x 
and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x 
and3.4.x
+  
+  
+0.10.0.x
+compatible
+compatible
+compatible
+  
+  
+0.10.1.x and 0.10.2.x
+
+compatible
+compatible
+  
+  
+0.11.0.x
+
+compatible with exactly-once turned off(requires broker 
version 0.11.0.x or higher)
+compatible
+  
+  
+1.0.x and1.1.x and2.0.x and2.1.x and2.2.0 
and2.2.0
+
+compatible with exactly-once turned off(requires broker 
version 0.11.0.x or higher);requires message format 0.10 or 
higher;message headers are not supported(requires broker version 
0.11.0.x or higherwith message format 0.11 or higher)
+compatible; requires message format 0.10 or higher;if 
message headers are used, message format 0.11or higher required
+  
+  
+2.2.1 and2.3.x and2.4.x and2.5.x and2.6.x 
and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x 
and3.4.x

Review Comment:
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-11 Thread via GitHub


mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
 // Kill instance, delete state to force restoration.
-assertThat("Streams instance did not close within timeout", 
streams.close(Duration.ofSeconds(60)));
+assertThat("Streams instance did not close within timeout", 
streams.get().close(Duration.ofSeconds(60)));
 IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
 .map(Path::toFile)
 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+} finally {
+streams.get().close();

Review Comment:
   Not sure why we need `AtomicReference`? Can you elaborate?
   
   Why does this not work?
   ```
   KafkaStream streams;
   try {
 stream = new KafkaStreams(...);
 ...
   ```
   
   Btw: `streams.get()` could return `null` in case we fail before calling 
`set(...)`. Need a `null`-check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-11 Thread via GitHub


satishd merged PR #13275:
URL: https://github.com/apache/kafka/pull/13275


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-11 Thread via GitHub


satishd commented on PR #13275:
URL: https://github.com/apache/kafka/pull/13275#issuecomment-1631285075

   A few tests are failed but those are not related to the changes in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] eziosudo commented on pull request #13973: KAFKA-15148: Some integration tests are running as unit tests

2023-07-11 Thread via GitHub


eziosudo commented on PR #13973:
URL: https://github.com/apache/kafka/pull/13973#issuecomment-1631271129

   > Thank you for addressing the comments. Are the results in the description 
updated after the latest change? If not, can you please update them. My 
motivation of asking this is to ensure that we are not accidentally removing 
the tests from running in either unitTest or integTests.
   
   I agree. We shall be careful with every merge. So I run all the tests agian.
   
   These are integration tests in trunk branch after running './gradlew 
unitTest':
   https://github.com/apache/kafka/assets/54128896/80ea1f96-9aea-4779-a77a-3790a570889b;>
   
   In my branch I run './gradlew unitTest' then filter integration tests and 
got nothing, which means integration cases are excluded.
   https://github.com/apache/kafka/assets/54128896/6c4d632a-cc90-4937-ba0a-fb1ea116259a;>
   
   After run './gradle integrationTest' in my branch, I get all the integration 
tests I changed in this PR, which means they are all included.
   https://github.com/apache/kafka/assets/54128896/56837acc-999d-4c34-8ee2-9fc1d1636b77;>
   https://github.com/apache/kafka/assets/54128896/73e98e5d-0aaa-4ca7-af93-2398eb00181b;>
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260099305


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[jira] [Commented] (KAFKA-14359) Idempotent Producer continues to retry on OutOfOrderSequence error when first batch fails

2023-07-11 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742124#comment-17742124
 ] 

Justine Olshan commented on KAFKA-14359:


also https://issues.apache.org/jira/browse/KAFKA-9199 offers a way to fix.

> Idempotent Producer continues to retry on OutOfOrderSequence error when first 
> batch fails
> -
>
> Key: KAFKA-14359
> URL: https://issues.apache.org/jira/browse/KAFKA-14359
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Major
>
> When the idempotent producer does not have any state it can fall into a state 
> where the producer keeps retrying an out of order sequence. Consider the 
> following scenario where an idempotent producer has retries and delivery 
> timeout are int max (a configuration used in streams).
> 1. A producer send out several batches (up to 5) with the first one starting 
> at sequence 0.
> 2. The first batch with sequence 0 fails due to a transient error (ie, 
> NOT_LEADER_OR_FOLLOWER or a timeout error)
> 3. The second batch, say with sequence 200 comes in. Since there is no 
> previous state to invalidate it, it gets written to the log
> 4. The original batch is retried and will get an out of order sequence number
> 5. Current java client will continue to retry this batch, but it will never 
> resolve. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


gharris1727 commented on PR #13992:
URL: https://github.com/apache/kafka/pull/13992#issuecomment-1631204314

   Thanks for the fix @hudeqi ! I was wondering if you had plans to address 
this problem/warning elsewhere in the codebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260035984


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-07-11 Thread via GitHub


erikvanoosten commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1631194625

   @dajac Can you please post your opinion about KIP-944 to the mailing list?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-11 Thread via GitHub


philipnee commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1631191677

   Hey @divijvaidya - I think the fixes make sense.  Did you verify the thread 
leak by checking the threat dump? I think I've used lsof command before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260017274


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[GitHub] [kafka] jolshan commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-11 Thread via GitHub


jolshan commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1631175316

   Apologies I missed the first ping. Left a comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-11 Thread via GitHub


jolshan commented on code in PR #13956:
URL: https://github.com/apache/kafka/pull/13956#discussion_r1260015211


##
core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala:
##
@@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
partitionsToAssign: 
Set[TopicPartition],
userRebalanceListener: 
ConsumerRebalanceListener)
 extends ShutdownableThread("daemon-consumer-assignment", false) {
+setDaemon(true)

Review Comment:
   It seems like one property of shutdownable thread is that we make it 
non-daemon and we need to explicitly shut it down. Were the try blocks not 
enough to fix the issue? Alternatively, can we make this a thread class that 
isn't shutdownable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-11 Thread via GitHub


philipnee commented on code in PR #13956:
URL: https://github.com/apache/kafka/pull/13956#discussion_r1260007558


##
core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala:
##
@@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
partitionsToAssign: 
Set[TopicPartition],
userRebalanceListener: 
ConsumerRebalanceListener)
 extends ShutdownableThread("daemon-consumer-assignment", false) {
+setDaemon(true)

Review Comment:
   Should we just add a parameter to ShutdownableThread to set the thread as 
daemon?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer

2023-07-11 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742100#comment-17742100
 ] 

Philip Nee commented on KAFKA-15175:


Thanks [~ijuma] - Right, it was brought up during the discussion.  
[~divijvaidya] - Is there anything you would like to chip in here?

> Assess the use of nio2 asynchronous channel for KafkaConsumer
> -
>
> Key: KAFKA-15175
> URL: https://issues.apache.org/jira/browse/KAFKA-15175
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Philip Nee
>Priority: Major
>
> We should assess if NIO2 is appropriate to replace the current nio library 
> with more performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-07-11 Thread via GitHub


wcarlson5 commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1259992272


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() {
 );
 }
 
+@Test
+public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+final KTable tableB = source.filter((k, v) -> true);
+streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+assertThat(
+exception.getMessage(),
+is("KTable must be versioned to use a grace period in a stream 
table join.")
+);
+}
+
+@Test
+public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-07-11 Thread via GitHub


wcarlson5 commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1259987822


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() {
 );
 }
 
+@Test
+public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+final KTable tableB = source.filter((k, v) -> true);
+streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+assertThat(
+exception.getMessage(),
+is("KTable must be versioned to use a grace period in a stream 
table join.")
+);
+}
+
+@Test
+public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5;
+final KTable tableB = source.filter((k, v) -> true);
+streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+//should not throw an error
+builder.build();
+}
+
+@Test
+public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   Sure, easy enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


jolshan commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259983786


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


gharris1727 commented on code in PR #13992:
URL: https://github.com/apache/kafka/pull/13992#discussion_r1259982271


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -321,10 +321,10 @@ void refreshTopicPartitions()
 if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || 
!missingInTarget.isEmpty()) {
 
 Set newTopicPartitions = sourceTopicPartitionsSet;
-newTopicPartitions.removeAll(knownSourceTopicPartitions);
+newTopicPartitions.removeAll(knownSourceTopicPartitionsSet);
 
 Set deletedTopicPartitions = 
knownSourceTopicPartitionsSet;
-deletedTopicPartitions.removeAll(sourceTopicPartitions);
+deletedTopicPartitions.removeAll(sourceTopicPartitionsSet);

Review Comment:
   At this point, the `newTopicPartitions.removeAll` will have mutated the 
`sourceTopicPartitionsSet`, leading this line to miss some removals. I think 
this results in considering topics that exist in both sets to be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


gharris1727 commented on code in PR #13992:
URL: https://github.com/apache/kafka/pull/13992#discussion_r1259982271


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -321,10 +321,10 @@ void refreshTopicPartitions()
 if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || 
!missingInTarget.isEmpty()) {
 
 Set newTopicPartitions = sourceTopicPartitionsSet;
-newTopicPartitions.removeAll(knownSourceTopicPartitions);
+newTopicPartitions.removeAll(knownSourceTopicPartitionsSet);
 
 Set deletedTopicPartitions = 
knownSourceTopicPartitionsSet;
-deletedTopicPartitions.removeAll(sourceTopicPartitions);
+deletedTopicPartitions.removeAll(sourceTopicPartitionsSet);

Review Comment:
   At this point, the `newTopicPartitions.removeAll` will have mutated the 
`sourceTopicPartitionsSet`, leading this line to miss some removals. I think 
this results in considering topics that exist in both sets to be considered 
deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-07-11 Thread via GitHub


wcarlson5 commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1259980518


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java:
##
@@ -64,6 +70,16 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
 // Steam - KTable join only
 if (otherJoinSideNodeName != null) {
 topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
+if (gracePeriod != null) {
+for (final String storeName : storeNames) {
+if (!topologyBuilder.isStoreVersioned(storeName)) {
+throw new IllegalArgumentException("KTable must be 
versioned to use a grace period in a stream table join.");
+}
+if (gracePeriod.toMillis() > 
topologyBuilder.getHistoryRetention(storeName)) {
+throw new IllegalArgumentException("History history 
retention must be at least grace period, but it should be larger.");

Review Comment:
   This change is fine, just equal is enough, if not recommended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors

2023-07-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15179:
--
Issue Type: Test  (was: Improvement)

> Add integration tests for the FileStream Sink and Source connectors
> ---
>
> Key: KAFKA-15179
> URL: https://issues.apache.org/jira/browse/KAFKA-15179
> Project: Kafka
>  Issue Type: Test
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
>
> Add integration tests for the FileStream Sink and Source connectors covering 
> various different common scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259882152


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   I'd prefer to leave the task parsing the same; less work on our part, and 
less risk of a regression in existing parts of the code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #13453: KAFKA-12525: Ignoring stale status statuses when reading from Connect status topic

2023-07-11 Thread via GitHub


kkonstantine merged PR #13453:
URL: https://github.com/apache/kafka/pull/13453


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-11 Thread via GitHub


C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1259876757


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -581,13 +582,26 @@ void incrementalAlterConfigs(Map 
topicConfigs) {
 }));
 }
 
-private void updateTopicAcls(List bindings) {
-log.trace("Syncing {} topic ACL bindings.", bindings.size());
-targetAdminClient.createAcls(bindings).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-if (e != null) {
-log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
-}
-}));
+// Visible for testing
+int updateTopicAcls(List bindings) {
+Set addBindings = new HashSet<>(bindings);
+Set failedBindings = new HashSet<>();
+addBindings.removeAll(knownTopicAclBindings);
+int newBindCount = addBindings.size();
+if (!addBindings.isEmpty()) {
+log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+targetAdminClient.createAcls(addBindings).values().forEach((k, v) 
-> v.whenComplete((x, e) -> {
+if (e != null) {
+log.warn("Could not sync ACL of topic {}.", 
k.pattern().name(), e);
+failedBindings.add(k);
+}
+}));
+knownTopicAclBindings = new HashSet<>(bindings);
+knownTopicAclBindings.removeAll(failedBindings);

Review Comment:
   Isn't `failedBindings` likely to be empty at this point unless the admin 
client is able to perform the request to create the ACL bindings exceptionally 
fast (which also happens to be the scenario we're covering in the unit test)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14481) Move LogSegment/LogSegments to storage module

2023-07-11 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742051#comment-17742051
 ] 

Ismael Juma commented on KAFKA-14481:
-

[~satish.duggana] Are you still working on this?

> Move LogSegment/LogSegments to storage module
> -
>
> Key: KAFKA-14481
> URL: https://issues.apache.org/jira/browse/KAFKA-14481
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15179:
--

 Summary: Add integration tests for the FileStream Sink and Source 
connectors
 Key: KAFKA-15179
 URL: https://issues.apache.org/jira/browse/KAFKA-15179
 Project: Kafka
  Issue Type: Improvement
Reporter: Yash Mayya
Assignee: Yash Mayya


Add integration tests for the FileStream Sink and Source connectors covering 
various different common scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on PR #13945:
URL: https://github.com/apache/kafka/pull/13945#issuecomment-1630945143

   I was toying with the idea of adding ITs for these file connectors in this 
very PR earlier; but after a bit of tinkering, left it for later  
   
   I'd be happy to file a separate ticket for that and assign it to myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15172) Allow exact mirroring of ACLs between clusters

2023-07-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15172:
--
Labels: needs-kip  (was: )

> Allow exact mirroring of ACLs between clusters
> --
>
> Key: KAFKA-15172
> URL: https://issues.apache.org/jira/browse/KAFKA-15172
> Project: Kafka
>  Issue Type: Task
>  Components: mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>  Labels: needs-kip
>
> When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The 
> rationale to is prevent other clients to produce to remote topics. 
> However in disaster recovery scenarios, where the target cluster is not used 
> and just a "hot standby", it would be preferable to have exactly the same 
> ACLs on both clusters to speed up failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259827626


##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Nice!  



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Nice!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821609


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;
+}
+
+if (!offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+
+// The 'position' in the offset represents the position in the 
file's byte stream and should be a non-negative long value
+try {
+long offsetPosition = 
Long.parseLong(String.valueOf(offset.get(POSITION_FIELD)));

Review Comment:
   Ah, good catch, thanks. I think it might probably be a bit friendlier if we 
update the task class instead to do similar parsing, WDYT? I'm okay either way, 
since the most common use case would be copy pasting the output from `GET 
/offsets` and modifying it in which case users would end up using a number 
rather than a string anyway.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Whoops, thanks. I forgot that we're no longer doing the single partition 
offset pair validation 臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821859


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Whoops, thanks. I forgot that we're no longer doing the single partition 
offset pair validation 臘 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances

2023-07-11 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742034#comment-17742034
 ] 

Chris Egerton commented on KAFKA-15059:
---

[~divijvaidya] no; this was introduced by 
[https://github.com/apache/kafka/pull/13465]. It did not affect anything except 
trunk and never made it into a release (see the affected/fix versions fields).

> Exactly-once source tasks fail to start during pending rebalances
> -
>
> Key: KAFKA-15059
> URL: https://issues.apache.org/jira/browse/KAFKA-15059
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 3.6.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.6.0
>
>
> When asked to perform a round of zombie fencing, the distributed herder will 
> [reject the 
> request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250]
>  if a rebalance is pending, which can happen if (among other things) a config 
> for a new connector or a new set of task configs has been recently read from 
> the config topic.
> Normally this can be alleviated with a simple task restart, which isn't great 
> but isn't terrible.
> However, when running MirrorMaker 2 in dedicated mode, there is no API to 
> restart failed tasks, and it can be more common to see this kind of failure 
> on a fresh cluster because three connector configurations are written in 
> rapid succession to the config topic.
>  
> In order to provide a better experience for users of both vanilla Kafka 
> Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the 
> same exponential backoff introduced with KAFKA-14732) zombie fencing attempts 
> that fail due to a pending rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-11 Thread via GitHub


clolov commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1259787673


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageHistory.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.stream;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Accumulates and retains the interactions between brokers and {@link 
LocalTieredStorage} instances.
+ * These interactions are modelled via events of type {@link 
LocalTieredStorageEvent}.
+ *
+ * Events from an instance of storage are captured by the {@link 
LocalTieredStorageHistory} after
+ * {@link LocalTieredStorageHistory#listenTo(LocalTieredStorage)} is called.
+ */
+/* @ThreadSafe */
+public final class LocalTieredStorageHistory {

Review Comment:
   This history is only in-memory, unless I am reading the code wrongly. Are 
there plans to write it somewhere for debugging? I suspect the main reason 
behind it is for assertions in tests via the LocalTieredStorageCondition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


C0urante commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259782359


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified. " +
+"This is because stdin is used for input and offsets are 
not tracked.");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset == null) {
+return true;

Review Comment:
   Shouldn't this be `continue`?



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked

Review Comment:
   We can remove this comment now, right?



##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +152,102 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("other_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+assertTrue(connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectOffsetKey() {
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap("other_offset_key", 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsOffsetPositionValues() {

Review Comment:
   Maybe 

[jira] [Assigned] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-07-11 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15177:
-

Assignee: Chris Egerton

> MirrorMaker 2 should implement the alterOffsets KIP-875 API
> ---
>
> Key: KAFKA-15177
> URL: https://issues.apache.org/jira/browse/KAFKA-15177
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect, mirrormaker
>Reporter: Yash Mayya
>Assignee: Chris Egerton
>Priority: Minor
>
> The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
> added in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
>  We could also implement the API in 
> {{MirrorCheckpointConnector}} and 
> {{MirrorHeartbeatConnector}} to prevent external modification of offsets 
> since the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-07-11 Thread via GitHub


fvaleri commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1259763311


##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+Pattern topicPartitionPattern = 
Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?");
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws IOException, 
ExecutionException, InterruptedException {
+GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+getOffsetShell.parseArgs(args);
+
+Map partitionOffsets = 
getOffsetShell.fetchOffsets();
+
+for (Map.Entry entry : 
partitionOffsets.entrySet()) {
+TopicPartition topic = entry.getKey();
+
+System.out.println(String.join(":", new String[]{topic.topic(), 
String.valueOf(topic.partition()), entry.getValue().toString()}));
+}
+}
+
+private OptionSet options;
+private OptionSpec topicPartitionsOpt;

Review Comment:
   Well, that's not exactly a duplicate, as mine also creates the JmxTool 
runnable for convenience. For the last part, I guess I didn't want to break 
ToolsTestUtils visibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-11 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742007#comment-17742007
 ] 

ASF GitHub Bot commented on KAFKA-14995:


mimaison commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1630858699

   Thanks for the updates. I rendered the website with this change and realized 
the Github usernames are displayed. Can we hide them to keep the same display 
as today?




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nicktelford opened a new pull request, #13994: MINOR: Remove StreamsProducer flush under EOS

2023-07-11 Thread via GitHub


nicktelford opened a new pull request, #13994:
URL: https://github.com/apache/kafka/pull/13994

   `KafkaProducer.flush` does not need to be called under EOS, because 
`KafkaProducer.commitTransaction` implicitly flushes the record buffer during 
the transaction commit.
   
   This should reduce the number of round-trips to the Kafka brokers needed by 
the Streams EOS Producer, improving throughput under EOS.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15178:
-
Labels: easyfix patch-available  (was: easyfix)

> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix, patch-available
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nicktelford opened a new pull request, #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf

2023-07-11 Thread via GitHub


nicktelford opened a new pull request, #13993:
URL: https://github.com/apache/kafka/pull/13993

   When there are many topic-partitions being consumed by the application, the 
`rejoinNeededOrPending` method can perform poorly, due to the deep equality 
comparison of `metadataSnapshot` with `assignmentSnapshot`.
   
   Since these snapshots can only diverge in `maybeUpdateSubscriptionMetadata`, 
we can use a `boolean` to cache whether they are already known to be "dirty", 
and therefore need to be compared.
   
   This should shortcut the deep comparison in the common-case that the 
snapshot hasn't been updated.
   
   Fixes KAFKA-15178
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15178:
-
Description: 
Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See the pink tree on the left of the flame graph below.  
!pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.

  was:
Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See flame graph below.  !pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.


> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-15178:


 Summary: Poor performance of ConsumerCoordinator with many 
TopicPartitions
 Key: KAFKA-15178
 URL: https://issues.apache.org/jira/browse/KAFKA-15178
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.5.0
Reporter: Nicholas Telford
Assignee: Nicholas Telford
 Attachments: pollPhase.png

Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See flame graph below.  !pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


hudeqi commented on PR #13992:
URL: https://github.com/apache/kafka/pull/13992#issuecomment-1630770168

   Please help to review this minor pr, thanks! @gharris1727 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector

2023-07-11 Thread via GitHub


hudeqi opened a new pull request, #13992:
URL: https://github.com/apache/kafka/pull/13992

   similar to https://github.com/apache/kafka/pull/13946. The 
`refreshTopicPartitions` method is called periodically. When the number of 
partitions involved in replication is large, performance problems may occur.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-07-11 Thread via GitHub


ruslankrivoshein commented on code in PR #13562:
URL: https://github.com/apache/kafka/pull/13562#discussion_r1259668871


##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.PartitionFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
+import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
+import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.server.util.TopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
+import 
org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.IntFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GetOffsetShell {
+Pattern topicPartitionPattern = 
Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?");
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws IOException, 
ExecutionException, InterruptedException {
+GetOffsetShell getOffsetShell = new GetOffsetShell();
+
+getOffsetShell.parseArgs(args);
+
+Map partitionOffsets = 
getOffsetShell.fetchOffsets();
+
+for (Map.Entry entry : 
partitionOffsets.entrySet()) {
+TopicPartition topic = entry.getKey();
+
+System.out.println(String.join(":", new String[]{topic.topic(), 
String.valueOf(topic.partition()), entry.getValue().toString()}));
+}
+}
+
+private OptionSet options;
+private OptionSpec topicPartitionsOpt;

Review Comment:
   Good point, thank you.
   Speaking about JmxTool, more precisely about JmxToolTest. There is 
[execute](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java#L354)
 method that just duplicates 
[this](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java#L33),
 I don't know why. Do you have any explanation about this your approach?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259657044


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1059,4 +1205,44 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
 }
 });
 }
+
+/**
+ * The coordinator has been loaded. Session timeouts are registered
+ * for all members.
+ */
+public void onLoaded() {
+groups.forEach((groupId, group) -> {
+switch (group.type()) {
+case CONSUMER:
+ConsumerGroup consumerGroup = (ConsumerGroup) group;
+log.info("Loaded consumer group {} with {} members.", 
groupId, consumerGroup.members().size());
+consumerGroup.members().forEach((memberId, member) -> {
+log.debug("Loaded member {} in consumer group {}.", 
memberId, groupId);
+scheduleConsumerGroupSessionTimeout(groupId, memberId);
+if (member.state() == 
ConsumerGroupMember.MemberState.REVOKING) {
+scheduleConsumerGroupRevocationTimeout(
+groupId,
+memberId,
+member.rebalanceTimeoutMs(),
+member.memberEpoch()
+);
+}
+});
+break;
+
+case GENERIC:
+GenericGroup genericGroup = (GenericGroup) group;
+log.info("Loaded generic group {} with {} members.", 
groupId, genericGroup.allMembers().size());

Review Comment:
   I will you complete this part. I think that we need to log each members as I 
did. We also need to schedule the session timeouts here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259656449


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +764,121 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;

Review Comment:
   They don't rely on the group epoch. Members are only fenced by on their 
member epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259655653


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +764,121 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+// Cancel all the timers of the member.
+cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
member.memberId());
+
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
+member.memberEpoch() != expectedMemberEpoch) {
+log.debug("[GroupId " + groupId + "] Ignoring revocation 
timeout for " + memberId + " because the member " +
+"state does not match the expected state.");
+return Collections.emptyList();
+}
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"it failed to revoke partitions within {}ms.", 
revocationTimeoutMs);
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch 

[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259654482


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -720,19 +764,121 @@ private 
CoordinatorResult consumerGr
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
+log.info("[GroupId " + group.groupId() + "] Computed new 
subscription metadata: "
 + subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+records.add(newGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
 }
 
 // We bump the group epoch.
 int groupEpoch = group.groupEpoch() + 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-.setMemberId(memberId)
-.setMemberEpoch(-1)
-);
+// Cancel all the timers of the member.
+cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
member.memberId());
+
+return records;
+}
+
+/**
+ * Schedules (or reschedules) the session timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void scheduleConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+String key = consumerGroupSessionTimeoutKey(groupId, memberId);
+timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"its session expired.");
+
+return consumerGroupFenceMember(group, member);
+} catch (GroupIdNotFoundException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the group " +
+"does not exist.");
+} catch (UnknownMemberIdException ex) {
+log.debug("[GroupId " + groupId + "] Could not fence " + 
memberId + " because the member " +
+"does not exist.");
+}
+
+return Collections.emptyList();
+});
+}
+
+/**
+ * Cancels the session timeout of the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ */
+private void cancelConsumerGroupSessionTimeout(
+String groupId,
+String memberId
+) {
+timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId));
+}
+
+/**
+ * Schedules a revocation timeout for the member.
+ *
+ * @param groupId   The group id.
+ * @param memberId  The member id.
+ * @param revocationTimeoutMs   The revocation timeout.
+ * @param expectedMemberEpoch   The expected member epoch.
+ */
+private void scheduleConsumerGroupRevocationTimeout(
+String groupId,
+String memberId,
+long revocationTimeoutMs,
+int expectedMemberEpoch
+) {
+String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
+timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+try {
+ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
+member.memberEpoch() != expectedMemberEpoch) {
+log.debug("[GroupId " + groupId + "] Ignoring revocation 
timeout for " + memberId + " because the member " +
+"state does not match the expected state.");
+return Collections.emptyList();
+}
+
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
fenced from the group because " +
+"it failed to revoke partitions within {}ms.", 
revocationTimeoutMs);

Review Comment:
   Yeah, that makes sense. Let me update all logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259645303


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
 assertEquals(image, context.groupMetadataManager.image());
 }
 
+@Test
+public void testSessionTimeoutLifecycle() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is rescheduled on second heartbeat.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().memberEpoch()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.
+result = context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(-1));
+assertEquals(-1, result.response().memberEpoch());
+
+// Verify that there are no timers.
+context.assertNoSessionTimeout(groupId, memberId);
+context.assertNoRevocationTimeout(groupId, memberId);
+}
+
+@Test
+public void testSessionTimeoutExpiration() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, 

[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts

2023-07-11 Thread via GitHub


dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1259639025


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MockCoordinatorTimer implements CoordinatorTimer {
+public static class ScheduledTimeout {
+public final String key;
+public final long deadlineMs;
+public final TimeoutOperation operation;
+
+ScheduledTimeout(
+String key,
+long deadlineMs,
+TimeoutOperation operation
+) {
+this.key = key;
+this.deadlineMs = deadlineMs;
+this.operation = operation;
+}
+}
+
+public static class ExpiredTimeout {
+public final String key;
+public final List records;
+
+ExpiredTimeout(
+String key,
+List records
+) {
+this.key = key;
+this.records = records;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+ExpiredTimeout that = (ExpiredTimeout) o;
+
+if (!Objects.equals(key, that.key)) return false;
+return Objects.equals(records, that.records);
+}
+
+@Override
+public int hashCode() {
+int result = key != null ? key.hashCode() : 0;
+result = 31 * result + (records != null ? records.hashCode() : 0);
+return result;
+}
+}
+
+private final Time time;
+
+private final Map> timeoutMap = new 
HashMap<>();
+private final PriorityQueue> timeoutQueue = new 
PriorityQueue<>(
+Comparator.comparingLong(entry -> entry.deadlineMs)
+);
+
+public MockCoordinatorTimer(Time time) {
+this.time = time;
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+boolean retry,
+TimeoutOperation operation
+) {
+cancel(key);
+
+long deadlineMs = time.milliseconds() + unit.toMillis(delay);
+ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, 
operation);
+timeoutQueue.add(timeout);
+timeoutMap.put(key, timeout);
+}
+
+@Override
+public void cancel(String key) {
+ScheduledTimeout timeout = timeoutMap.remove(key);
+if (timeout != null) {
+timeoutQueue.remove(timeout);
+}
+}
+
+public boolean contains(String key) {
+return timeoutMap.containsKey(key);
+}
+
+public ScheduledTimeout timeout(String key) {
+return timeoutMap.get(key);
+}
+
+public int size() {
+return timeoutMap.size();
+}
+
+public List> poll() {

Review Comment:
   Added some javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager

2023-07-11 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741962#comment-17741962
 ] 

Ivan Yurchenko commented on KAFKA-15166:


[~ckamal] I would comment in advance for the future KIP: not all the 
implementation may have all the necessary metadata to delete remote segments 
based on the topic name and partition number. So this probably should be either 
a checkable capability (i.e. if the RSM cannot do this, the broker doesn't call 
it) or it should receive a list of all the segment-to-be-deleted metadata.

> Add deletePartition API to the RemoteStorageManager
> ---
>
> Key: KAFKA-15166
> URL: https://issues.apache.org/jira/browse/KAFKA-15166
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the 
> individual log segments.  Storage providers such as HDFS have support to 
> delete a directory. Having an {{deletePartition}} API to delete the data at 
> the partition level will enhance the topic deletion.
> This task may require a KIP as it touches the user-facing APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac opened a new pull request, #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-11 Thread via GitHub


dajac opened a new pull request, #13991:
URL: https://github.com/apache/kafka/pull/13991

   _Temporary includes d7ebb7bba5bcfe35dd2ee5e37f226a84c27ca638_
   
   This is the last patch for KAFKA-14462. It wires all the pieces together in 
BrokerServer.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-11 Thread via GitHub


hudeqi commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630624613

   I would like to ask a question by the way: Why do we not synchronize the 
write permission of `TopicAclBing` and `GroupAclBinding` in 
MirrorSourceConnector? @C0urante @gharris1727 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-11 Thread via GitHub


yashmayya commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1630582887

   Thanks @vamossagar12, I don't have any other comments other than the 
currently open ones - primarily the timeout case for when exactly-once support 
is enabled, the `ConnectorOffsetBackingStore::set` Javadoc, and the unit tests 
being in `OffsetStorageWriterTest` versus a dedicated 
`ConnectorOffsetBackingStoreTest` class (this one isn't blocking though). 
@C0urante could you please take a look at this whenever you get a chance? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-11 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259538919


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   hmm this is is a bit tricky. This is what the `commitTransaction` method 
does => 
   
https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L303-L312
   
   which basically says `Blocks until all outstanding records have been sent 
and ack'd`. This is what the KIP says as well:
   
   ```
   The worker-level 
[offset.flush.timeout.ms](https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms)
 property will be ignored for exactly-once source tasks. They will be allowed 
to take as long as necessary to complete an offset commit, since the cost of 
failure at that point is to fail the source task. Currently, all source task 
offset commits take place on a single shared worker-global thread. In order to 
support source task commits without a timeout, but also prevent  laggy tasks 
from disrupting the availability of other tasks on the cluster, the worker will 
be modified to permit simultaneous source task offset commits.
   
   It may take longer than the transaction timeout for a task to flush all of 
its records to Kafka. In this case, there are some remedial actions that users 
can take to nurse their connector back to health: tune their producer 
configuration for higher throughput, increase the transaction timeout for the 
producers used by the connector, decrease the offset commit interval (if using 
interval-based transaction boundaries), or switch to the poll  value for the 
transaction.boundary  property. We will include include these steps in the 
error message for a task that fails due to producer transaction timeout.
   ```
   
   Keeping these things in mind, I had assumed it's good to wait forever. But I 
also see your point. I don't think the producer used to write to the global 
offsets topic would even be a transactional one and yet we are waiting.
   
   I think @C0urante would be the best person to answer this at this point, but 
my opinion is to block forever in this case. But I am open to suggestions.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13945:
URL: https://github.com/apache/kafka/pull/13945#discussion_r1259462328


##
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java:
##
@@ -147,4 +151,59 @@ public void testInvalidBatchSize() {
 sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
"abcd");
 assertThrows(ConfigException.class, () -> new 
AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
 }
+
+@Test
+public void testAlterOffsetsStdin() {
+sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
+Map, Map> offsets = Collections.singletonMap(
+Collections.singletonMap(FILENAME_FIELD, FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+);
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, offsets));
+}
+
+@Test
+public void testAlterOffsetsIncorrectPartitionKey() {
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+Collections.singletonMap("invalid_partition_key", FILENAME),
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+
+// null partitions are invalid
+assertThrows(ConnectException.class, () -> 
connector.alterOffsets(sourceProperties, Collections.singletonMap(
+null,
+Collections.singletonMap(POSITION_FIELD, 0)
+)));
+}
+
+@Test
+public void testAlterOffsetsMultiplePartitions() {
+Map, Map> offsets = new HashMap<>();
+offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), 
Collections.singletonMap(POSITION_FIELD, 0));
+offsets.put(Collections.singletonMap(FILENAME_FIELD, 
"/someotherfilename"), null);
+connector.alterOffsets(sourceProperties, offsets);

Review Comment:
   Ah, good point. Done.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");

Review Comment:
   Makes sense, done.



##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) {
 : ExactlyOnceSupport.UNSUPPORTED;
 }
 
+@Override
+public boolean alterOffsets(Map connectorConfig, 
Map, Map> offsets) {
+AbstractConfig config = new AbstractConfig(CONFIG_DEF, 
connectorConfig);
+String filename = config.getString(FILE_CONFIG);
+if (filename == null || filename.isEmpty()) {
+// If the 'file' configuration is unspecified, stdin is used and 
no offsets are tracked
+throw new ConnectException("Offsets cannot be modified if the '" + 
FILE_CONFIG + "' configuration is unspecified");
+}
+
+// This connector makes use of a single source partition at a time 
which represents the file that it is configured to read from.
+// However, there could also be source partitions from previous 
configurations of the connector.
+for (Map.Entry, Map> partitionOffset : 
offsets.entrySet()) {
+Map partition = partitionOffset.getKey();
+if (partition == null) {
+throw new ConnectException("Partition objects cannot be null");
+}
+
+if (!partition.containsKey(FILENAME_FIELD)) {
+throw new ConnectException("Partition objects should contain 
the key '" + FILENAME_FIELD + "'");
+}
+
+Map offset = partitionOffset.getValue();
+// null offsets are allowed and represent a deletion of offsets 
for a partition
+if (offset != null && !offset.containsKey(POSITION_FIELD)) {
+throw new ConnectException("Offset objects should either be 
null or contain the key '" + POSITION_FIELD + "'");
+}
+}
+
+// Let the task validate the actual value for the offset position on 
startup

Review Comment:
   My initial reasoning was that we probably wouldn't want to open an input 
stream here to read from the file and verify whether the offset position is 
actually valid. But yeah, no reason we can't do basic validations 

[jira] [Created] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API

2023-07-11 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15177:
--

 Summary: MirrorMaker 2 should implement the alterOffsets KIP-875 
API
 Key: KAFKA-15177
 URL: https://issues.apache.org/jira/browse/KAFKA-15177
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect, mirrormaker
Reporter: Yash Mayya


The {{MirrorSourceConnector}} class should implement the new alterOffsets API 
added in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect].
 We could also implement the API in 
{{MirrorCheckpointConnector}} and 
{{MirrorHeartbeatConnector}} to prevent external modification of offsets since 
the operation wouldn't really make sense in their case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-11 Thread via GitHub


showuon commented on PR #13944:
URL: https://github.com/apache/kafka/pull/13944#issuecomment-1630519509

   > > @abhijeetk88 @showuon Let us have a followup JIRA/PR on adding more test 
for the metrics.
   > 
   > Sure, please open a JIRA ticket to track it. Thanks.
   
   [KAFKA-15176](https://issues.apache.org/jira/browse/KAFKA-15176) is opened.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >