[GitHub] [kafka] dajac commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
dajac commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1260620995 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: Yeah, we will. We have to do two things: 1) parameterise existing integration/system tests to run with the new group coordinator. i hope that this will cover a bug chunk of the new coordinator; and 2) extend integration/system tests where needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
dajac commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1260620995 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: Yeah, we will. We have to do two things: 1) parameterise existing integration/system tests to run with the new group coordinator; and 2) extend integration/system tests where needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260618876 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); Review Comment: > i'm a bit confused - if the timer advances by heartbeat interval then shouldn't the existing session timeout expire? The session timeout expires based on the session timeout not the heartbeat interval, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13999: KAFKA-15176: add tests for tiered storage metrics
showuon opened a new pull request, #13999: URL: https://github.com/apache/kafka/pull/13999 Added tests for metrics: 1. RemoteLogReaderTaskQueueSize 2. RemoteLogReaderAvgIdlePercent 3. RemoteLogManagerTasksAvgIdlePercent Also, added tests for `OffsetOutOfRangeException` will be thrown while reading logs ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260533220 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { Review Comment: should we add a test case for revocation/session timeouts `onLoaded()`? nvm - looks like we already have one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260536510 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); Review Comment: looking at the other test it looks like we expire when we pass the deadline and not at exactly the deadline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260536510 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); Review Comment: looking at the other test it looks like we expire when we pass the deadline and not at exactly the deadline. should we assert that the timeout still exists and the member is still part of the group? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260535581 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); Review Comment: i'm a bit confused - if the timer advances by heartbeat interval then shouldn't the existing session timeout expire? or is the member removed from the group after this block? i think it would also be good to assertSessionTimeout after advancing the clock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jeffkbkim commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260533220 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { Review Comment: should we add a test case for revocation/session timeouts `onLoaded()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
jeffkbkim commented on code in PR #13991: URL: https://github.com/apache/kafka/pull/13991#discussion_r1260529472 ## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ## @@ -54,18 +58,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data) } - @ClusterTest(serverProperties = Array( + @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array( Review Comment: will we have a full integration test for the new group coordinator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #12980: KAFKA-14464: Make resource leaks for MM2 resources more difficult
github-actions[bot] commented on PR #12980: URL: https://github.com/apache/kafka/pull/12980#issuecomment-1631799245 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1260523356 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val backoffDeadlineMs = new AtomicLong(NoRetry) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) -if (block == null) { - // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal - // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") -} else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + override def generateProducerId(): Try[Long] = { +var result: Try[Long] = null +var iteration = 0 +while (result == null) { + currentProducerIdBlock.get.claimNextId().asScala match { +case None => + // Check the next block if current block is full + val block = nextProducerIdBlock.getAndSet(null) + if (block == null) { +// Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal +// when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. +maybeRequestNextBlock() +result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) Review Comment: hmm, i don't recall being involved in that discussion. should i create a jira for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
hudeqi commented on PR #13992: URL: https://github.com/apache/kafka/pull/13992#issuecomment-1631789763 > Thanks for the fix @hudeqi ! I was wondering if you had plans to address this problem/warning elsewhere in the codebase? I did a global search for the `removeAll` method of set in the codebase. There are indeed many `set.removeAll(list)`, but this usage may have performance problems only when there are many collection elements. At least in the connect module there is nothing extra except that mentioned in this PR. In other modules, there may be this [one](https://github.com/apache/kafka/blob/f3ee9ff90ff107942280fb0c4bd83e7ebc5c062c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L177) that needs to be optimized to meet these conditions, which may not be suitable for mentioning in this PR (belonging to the MM2 tag). @gharris1727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
hudeqi commented on code in PR #13992: URL: https://github.com/apache/kafka/pull/13992#discussion_r1260504405 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -321,10 +321,10 @@ void refreshTopicPartitions() if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) { Set newTopicPartitions = sourceTopicPartitionsSet; -newTopicPartitions.removeAll(knownSourceTopicPartitions); +newTopicPartitions.removeAll(knownSourceTopicPartitionsSet); Set deletedTopicPartitions = knownSourceTopicPartitionsSet; -deletedTopicPartitions.removeAll(sourceTopicPartitions); +deletedTopicPartitions.removeAll(sourceTopicPartitionsSet); Review Comment: sorry, I ignore that... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-13808) Mirrormaker2 stop sync data when modify topic partition in "Running a dedicated MirrorMaker cluster" mode
[ https://issues.apache.org/jira/browse/KAFKA-13808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YANGLiiN closed KAFKA-13808. 3.5.0 fixed. > Mirrormaker2 stop sync data when modify topic partition in "Running a > dedicated MirrorMaker cluster" mode > -- > > Key: KAFKA-13808 > URL: https://issues.apache.org/jira/browse/KAFKA-13808 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1 >Reporter: YANGLiiN >Priority: Major > > When use the mirrormaker2 with the "Running a dedicated MirrorMaker cluster" > mode by 3 nodes, once we modify the topic partition , then the mm2 stop sync > data with the following ERROR : > > [2022-02-18 10:26:19,410] ERROR Error forwarding REST request > (org.apache.kafka.connect.runtime.rest.RestClient) > java.lang.IllegalArgumentException: Invalid URI host: null (authority: null) > at org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521) > at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506) > at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464) > at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453) > at > org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > [2022-02-18 10:26:19,416] ERROR [Worker clientId=connect-1, groupId=sync2022] > Request to leader to reconfigure connector tasks failed > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Error > trying to forward REST request: Invalid URI host: null (authority: null) > at > org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:147) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$reconfigureConnector$32(DistributedHerder.java:1607) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.IllegalArgumentException: Invalid URI host: null > (authority: null) > at org.eclipse.jetty.client.HttpClient.checkHost(HttpClient.java:521) > at org.eclipse.jetty.client.HttpClient.newHttpRequest(HttpClient.java:506) > at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:464) > at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453) > at > org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:107) > ... 6 more > > the root cause is the connect doesn't process the mm2 add the 'NOTUSED' url > on the slave node > see > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L122] > > we should add the "NOTUSED" logic below the code > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1593-L1599] > > {code:java} > if (leaderUrl.startsWith("NOTUSED")) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > return; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on PR #13870: URL: https://github.com/apache/kafka/pull/13870#issuecomment-1631769383 @dajac thanks for the review. I have addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1260500922 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions( -return false; +// First round of join requests. Generate member ids. All requests will be accepted +// as the group is still Empty. +List> responseFutures = new ArrayList<>(); +for (int i = 0; i < groupMaxSize + 1; i++) { +if (i == 0) { +responseFutures.add(context.sendGenericGroupJoin( +request, +requiredKnownMemberId, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +)); +} else { +responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); +} +} -return Objects.equals(fromAssignment(expected.assignedTopicPartitions()),
[GitHub] [kafka] rreddy-22 opened a new pull request, #13998: KIP-848: Rack awareness interface changes for assignors
rreddy-22 opened a new pull request, #13998: URL: https://github.com/apache/kafka/pull/13998 We want to make the assignors rack aware and in order to obtain and provide the rack Information the following interface modifications were made:- 1) AbstractPartitionAssignor implements the PartitionAssignor Interface : This contains an assign function with a metadata image argument that will be called from the TargetAssignmentBuilder. 2) TopicAndClusterMetadata :- This is a new data structure that will hold the ClusterImage and the TopicsImage both of which will be obtained from the MetadataImage. The rackInfo will be passed to another assign function which will have a List of rackAwareTopicIdPartitions. This information will be used by the assignors to match partition replica racks with consumer racks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()
philipnee commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1260427182 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); Review Comment: Hey @junrao - I believe we still want to commit the offset before there is a partition change. I think if we don't commit offset here, it is possible to lose some progress. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
philipnee commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1631684973 Hey @flashmouse - could you explain what do you mean by `useless loop`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #13997: KAFKA-15180: Generalize integration tests to change use of KafkaConsumer to Consumer
kirktrue opened a new pull request, #13997: URL: https://github.com/apache/kafka/pull/13997 Update the integration tests to swap the use of the concrete `KafkaConsumer` class to the generic `Consumer` interface. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky opened a new pull request, #13996: KAFKA-15022: [2/N] introduce graph to compute min cost
lihaosky opened a new pull request, #13996: URL: https://github.com/apache/kafka/pull/13996 ### Description Introduce graph to calculate min-flow if an existing flow already input ### Test Unit test. Will add more unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15139) Optimize the performance of `Set.removeAll(List)` in `MirrorCheckpointConnector`
[ https://issues.apache.org/jira/browse/KAFKA-15139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15139: Fix Version/s: 3.6.0 > Optimize the performance of `Set.removeAll(List)` in > `MirrorCheckpointConnector` > > > Key: KAFKA-15139 > URL: https://issues.apache.org/jira/browse/KAFKA-15139 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > Fix For: 3.6.0 > > > This is the hint of `removeAll` method in `Set`: > _This implementation determines which is the smaller of this set and the > specified collection, by invoking the size method on each. If this set has > fewer elements, then the implementation iterates over this set, checking each > element returned by the iterator in turn to see if it is contained in the > specified collection. If it is so contained, it is removed from this set with > the iterator's remove method. If the specified collection has fewer elements, > then the implementation iterates over the specified collection, removing from > this set each element returned by the iterator, using this set's remove > method._ > That's said, assume that _M_ is the number of elements in the set and _N_ is > the number of elements in the List, if the type of the specified collection > is `List`, and {_}M<=N{_}, then the time complexity of `removeAll` is _O(MN)_ > (because the time complexity of searching in List is {_}O(N){_}), on the > contrary, if {_}N {_}O(N){_}. > In `MirrorCheckpointConnector`, `refreshConsumerGroups` method is repeatedly > called in a daemon thread. There are two `removeAll` in this method. From a > logical point of view, when this method is called in one round, when the > number of groups in the source cluster simply increases or decreases, the two > `removeAll` execution strategies will always hit the _O(MN)_ situation > mentioned above. Therefore, it is better to change all the variables here to > Set type to avoid this "low performance". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer
[ https://issues.apache.org/jira/browse/KAFKA-15180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15180: -- Description: For the consumer threading refactor project, we're introducing a new implementation of the {{Consumer}} interface. However, most of the instances in the integration tests specifically use the concrete implementation {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible to use the {{Consumer}} interface. (was: For the consumer threading refactor project, we're introducing a new implementation of the `Consumer` interface. However, most of the instances in the integration tests specifically use the concrete implementation `KafkaConsumer`. This task is to generalize those uses where possible to the `Consumer` interface.) > Generalize integration tests to change use of KafkaConsumer to Consumer > --- > > Key: KAFKA-15180 > URL: https://issues.apache.org/jira/browse/KAFKA-15180 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > For the consumer threading refactor project, we're introducing a new > implementation of the {{Consumer}} interface. However, most of the instances > in the integration tests specifically use the concrete implementation > {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible > to use the {{Consumer}} interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer
Kirk True created KAFKA-15180: - Summary: Generalize integration tests to change use of KafkaConsumer to Consumer Key: KAFKA-15180 URL: https://issues.apache.org/jira/browse/KAFKA-15180 Project: Kafka Issue Type: Test Components: clients, consumer Reporter: Kirk True Assignee: Kirk True For the consumer threading refactor project, we're introducing a new implementation of the `Consumer` interface. However, most of the instances in the integration tests specifically use the concrete implementation `KafkaConsumer`. This task is to generalize those uses where possible to the `Consumer` interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on pull request #13528: KAFKA-14879: Update system tests to use latest versions
kirktrue commented on PR #13528: URL: https://github.com/apache/kafka/pull/13528#issuecomment-1631558225 @jolshan Can you take a look at this when you have some time? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1260298077 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions( -return false; +// First round of join requests. Generate member ids. All requests will be accepted +// as the group is still Empty. +List> responseFutures = new ArrayList<>(); +for (int i = 0; i < groupMaxSize + 1; i++) { +if (i == 0) { +responseFutures.add(context.sendGenericGroupJoin( +request, +requiredKnownMemberId, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +)); +} else { +responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); +} +} -return Objects.equals(fromAssignment(expected.assignedTopicPartitions()),
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1260282790 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,143 +2610,2090 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } -private void assertUnorderedListEquals( -List expected, -List actual -) { -assertEquals(new HashSet<>(expected), new HashSet<>(actual)); -} +@Test +public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(10) +.build(); -private void assertResponseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (!responseEquals(expected, actual)) { -assertionFailure() -.expected(expected) -.actual(actual) -.buildAndThrow(); +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.withReason("exceed max group size") +.build(); + +for (int i = 0; i < 10; i++) { +CompletableFuture responseFuture; +if (i == 0) { +responseFuture = context.sendGenericGroupJoin( +request, +false, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +); +} else { +responseFuture = context.sendGenericGroupJoin(request); +} +assertFalse(responseFuture.isDone()); } +CompletableFuture responseFuture = context.sendGenericGroupJoin(request); +assertTrue(responseFuture.isDone()); +assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } -private boolean responseEquals( -ConsumerGroupHeartbeatResponseData expected, -ConsumerGroupHeartbeatResponseData actual -) { -if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; -if (expected.errorCode() != actual.errorCode()) return false; -if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; -if (!Objects.equals(expected.memberId(), actual.memberId())) return false; -if (expected.memberEpoch() != actual.memberEpoch()) return false; -if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; -if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; -// Unordered comparison of the assignments. -return responseAssignmentEquals(expected.assignment(), actual.assignment()); -} +@Test +public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { +boolean requiredKnownMemberId = true; +int groupMaxSize = 10; +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withGenericGroupMaxSize(groupMaxSize) +.withGenericGroupInitialRebalanceDelayMs(50) +.build(); -private boolean responseAssignmentEquals( -ConsumerGroupHeartbeatResponseData.Assignment expected, -ConsumerGroupHeartbeatResponseData.Assignment actual -) { -if (expected == actual) return true; -if (expected == null) return false; -if (actual == null) return false; +JoinGroupRequestData request = new JoinGroupRequestBuilder() +.withGroupId("group-id") +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); -if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions( -return false; +// First round of join requests. Generate member ids. All requests will be accepted +// as the group is still Empty. +List> responseFutures = new ArrayList<>(); +for (int i = 0; i < groupMaxSize + 1; i++) { +if (i == 0) { +responseFutures.add(context.sendGenericGroupJoin( +request, +requiredKnownMemberId, +false, +new ExpectedGenericGroupResult(Errors.NONE, true) +)); +} else { +responseFutures.add(context.sendGenericGroupJoin(request, requiredKnownMemberId)); +} +} -return Objects.equals(fromAssignment(expected.assignedTopicPartitions()),
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260243479 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() { .setGroupId(groupId) .setMemberId(memberId1) .setMemberEpoch(1) -.setRebalanceTimeoutMs(9) Review Comment: it must be in first request or when changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260243017 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() { .setGroupId(groupId) .setMemberId(memberId1) .setMemberEpoch(1) -.setRebalanceTimeoutMs(9) Review Comment: it was not needed so removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260232815 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2798,8 +2798,6 @@ public void testRevocationTimeoutLifecycle() { .setGroupId(groupId) .setMemberId(memberId1) .setMemberEpoch(1) -.setRebalanceTimeoutMs(9) Review Comment: why did we remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742162#comment-17742162 ] ASF GitHub Bot commented on KAFKA-14995: stevenbooke commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-163185 @mimaison I have hidden the GitHub usernames of the kafka committers, they do not get displayed when the website is rendered. The display is now the same as today. > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
[ https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742151#comment-17742151 ] A. Sophie Blee-Goldman commented on KAFKA-15178: Nice catch! > Poor performance of ConsumerCoordinator with many TopicPartitions > - > > Key: KAFKA-15178 > URL: https://issues.apache.org/jira/browse/KAFKA-15178 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: easyfix, patch-available > Attachments: pollPhase.png > > > Doing some profiling of my Kafka Streams application, I noticed that the > {{pollPhase}} suffers from a minor performance issue. > See the pink tree on the left of the flame graph below. > !pollPhase.png|width=1028,height=308! > {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which > checks the current {{metadataSnapshot}} against the > {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if > there's a large number of topic-partitions being consumed by the application, > then this comparison can perform poorly. > I suspect this can be trivially addressed with a {{boolean}} flag that > indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and > actually needs to be checked, since most of the time it should be identical > to {{{}assignmentSnapshot{}}}. > I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf
ableegoldman commented on code in PR #13993: URL: https://github.com/apache/kafka/pull/13993#discussion_r1260200818 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private Set joinedSubscription; private MetadataSnapshot metadataSnapshot; private MetadataSnapshot assignmentSnapshot; +private boolean metadataUpdated; Review Comment: This optimization makes perfect sense but I am extremely paranoid that future changes will end up modifying the snapshot without remembering (or even knowing about) the need to update this boolean as well. Can you maybe encapsulate this in the `MetadataSnapshot` class or otherwise make sure it's impossible for this to get out of sync? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13937: Add Streams API broker compatibility table
mjsax commented on PR #13937: URL: https://github.com/apache/kafka/pull/13937#issuecomment-1631390728 Thanks! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13937: Add Streams API broker compatibility table
mjsax merged PR #13937: URL: https://github.com/apache/kafka/pull/13937 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on pull request #13937: Add Streams API broker compatibility table
JimGalasyn commented on PR #13937: URL: https://github.com/apache/kafka/pull/13937#issuecomment-1631385663 Thanks for the review, @mjsax! I added the row/col for `3.5.x`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13849: Add 3.5.0 and 3.4.1 to system tests
mjsax commented on PR #13849: URL: https://github.com/apache/kafka/pull/13849#issuecomment-1631380479 Sorry for the delay. Uploaded. Can you verify? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260159325 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[GitHub] [kafka] mjsax commented on a diff in pull request #13937: Add Streams API broker compatibility table
mjsax commented on code in PR #13937: URL: https://github.com/apache/kafka/pull/13937#discussion_r1260153209 ## docs/streams/upgrade-guide.html: ## @@ -1292,6 +1294,57 @@ JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) +Streams API broker compatibility + +The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. + + + + + +Kafka Broker (columns) + + + + +Kafka Streams API (rows) +0.10.0.x +0.10.1.x and 0.10.2.x +0.11.0.x and1.0.x and1.1.x and2.0.x and2.1.x and2.2.x and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x and3.4.x Review Comment: Seems we need to at 3.5 that was released in the mean time? ## docs/streams/upgrade-guide.html: ## @@ -1292,6 +1294,57 @@ JoinWindows has no default size anymore: JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) +Streams API broker compatibility + +The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. + + + + + +Kafka Broker (columns) + + + + +Kafka Streams API (rows) +0.10.0.x +0.10.1.x and 0.10.2.x +0.11.0.x and1.0.x and1.1.x and2.0.x and2.1.x and2.2.x and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x and3.4.x + + +0.10.0.x +compatible +compatible +compatible + + +0.10.1.x and 0.10.2.x + +compatible +compatible + + +0.11.0.x + +compatible with exactly-once turned off(requires broker version 0.11.0.x or higher) +compatible + + +1.0.x and1.1.x and2.0.x and2.1.x and2.2.0 and2.2.0 + +compatible with exactly-once turned off(requires broker version 0.11.0.x or higher);requires message format 0.10 or higher;message headers are not supported(requires broker version 0.11.0.x or higherwith message format 0.11 or higher) +compatible; requires message format 0.10 or higher;if message headers are used, message format 0.11or higher required + + +2.2.1 and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x and3.4.x Review Comment: Same here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
mjsax commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception { }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag."); // Kill instance, delete state to force restoration. -assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); +assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60))); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); +} finally { +streams.get().close(); Review Comment: Not sure why we need `AtomicReference`? Can you elaborate? Why does this not work? ``` KafkaStream streams; try { stream = new KafkaStreams(...); ... ``` Btw: `streams.get()` could return `null` in case we fail before calling `set(...)`. Need a `null`-check here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd merged PR #13275: URL: https://github.com/apache/kafka/pull/13275 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on PR #13275: URL: https://github.com/apache/kafka/pull/13275#issuecomment-1631285075 A few tests are failed but those are not related to the changes in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] eziosudo commented on pull request #13973: KAFKA-15148: Some integration tests are running as unit tests
eziosudo commented on PR #13973: URL: https://github.com/apache/kafka/pull/13973#issuecomment-1631271129 > Thank you for addressing the comments. Are the results in the description updated after the latest change? If not, can you please update them. My motivation of asking this is to ensure that we are not accidentally removing the tests from running in either unitTest or integTests. I agree. We shall be careful with every merge. So I run all the tests agian. These are integration tests in trunk branch after running './gradlew unitTest': https://github.com/apache/kafka/assets/54128896/80ea1f96-9aea-4779-a77a-3790a570889b;> In my branch I run './gradlew unitTest' then filter integration tests and got nothing, which means integration cases are excluded. https://github.com/apache/kafka/assets/54128896/6c4d632a-cc90-4937-ba0a-fb1ea116259a;> After run './gradle integrationTest' in my branch, I get all the integration tests I changed in this PR, which means they are all included. https://github.com/apache/kafka/assets/54128896/56837acc-999d-4c34-8ee2-9fc1d1636b77;> https://github.com/apache/kafka/assets/54128896/73e98e5d-0aaa-4ca7-af93-2398eb00181b;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260099305 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[jira] [Commented] (KAFKA-14359) Idempotent Producer continues to retry on OutOfOrderSequence error when first batch fails
[ https://issues.apache.org/jira/browse/KAFKA-14359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742124#comment-17742124 ] Justine Olshan commented on KAFKA-14359: also https://issues.apache.org/jira/browse/KAFKA-9199 offers a way to fix. > Idempotent Producer continues to retry on OutOfOrderSequence error when first > batch fails > - > > Key: KAFKA-14359 > URL: https://issues.apache.org/jira/browse/KAFKA-14359 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Priority: Major > > When the idempotent producer does not have any state it can fall into a state > where the producer keeps retrying an out of order sequence. Consider the > following scenario where an idempotent producer has retries and delivery > timeout are int max (a configuration used in streams). > 1. A producer send out several batches (up to 5) with the first one starting > at sequence 0. > 2. The first batch with sequence 0 fails due to a transient error (ie, > NOT_LEADER_OR_FOLLOWER or a timeout error) > 3. The second batch, say with sequence 200 comes in. Since there is no > previous state to invalidate it, it gets written to the log > 4. The original batch is retried and will get an out of order sequence number > 5. Current java client will continue to retry this batch, but it will never > resolve. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
gharris1727 commented on PR #13992: URL: https://github.com/apache/kafka/pull/13992#issuecomment-1631204314 Thanks for the fix @hudeqi ! I was wondering if you had plans to address this problem/warning elsewhere in the codebase? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260035984 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
erikvanoosten commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1631194625 @dajac Can you please post your opinion about KIP-944 to the mailing list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
philipnee commented on PR #13956: URL: https://github.com/apache/kafka/pull/13956#issuecomment-1631191677 Hey @divijvaidya - I think the fixes make sense. Did you verify the thread leak by checking the threat dump? I think I've used lsof command before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1260017274 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[GitHub] [kafka] jolshan commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
jolshan commented on PR #13956: URL: https://github.com/apache/kafka/pull/13956#issuecomment-1631175316 Apologies I missed the first ping. Left a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
jolshan commented on code in PR #13956: URL: https://github.com/apache/kafka/pull/13956#discussion_r1260015211 ## core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala: ## @@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { partitionsToAssign: Set[TopicPartition], userRebalanceListener: ConsumerRebalanceListener) extends ShutdownableThread("daemon-consumer-assignment", false) { +setDaemon(true) Review Comment: It seems like one property of shutdownable thread is that we make it non-daemon and we need to explicitly shut it down. Were the try blocks not enough to fix the issue? Alternatively, can we make this a thread class that isn't shutdownable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest
philipnee commented on code in PR #13956: URL: https://github.com/apache/kafka/pull/13956#discussion_r1260007558 ## core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala: ## @@ -346,6 +346,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest { partitionsToAssign: Set[TopicPartition], userRebalanceListener: ConsumerRebalanceListener) extends ShutdownableThread("daemon-consumer-assignment", false) { +setDaemon(true) Review Comment: Should we just add a parameter to ShutdownableThread to set the thread as daemon? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15175) Assess the use of nio2 asynchronous channel for KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742100#comment-17742100 ] Philip Nee commented on KAFKA-15175: Thanks [~ijuma] - Right, it was brought up during the discussion. [~divijvaidya] - Is there anything you would like to chip in here? > Assess the use of nio2 asynchronous channel for KafkaConsumer > - > > Key: KAFKA-15175 > URL: https://issues.apache.org/jira/browse/KAFKA-15175 > Project: Kafka > Issue Type: Wish > Components: consumer >Reporter: Philip Nee >Priority: Major > > We should assess if NIO2 is appropriate to replace the current nio library > with more performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
wcarlson5 commented on code in PR #13942: URL: https://github.com/apache/kafka/pull/13942#discussion_r1259992272 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ## @@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() { ); } +@Test +public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); +final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KTable source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), +Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); +final KTable tableB = source.filter((k, v) -> true); +streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + +final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); +assertThat( +exception.getMessage(), +is("KTable must be versioned to use a grace period in a stream table join.") +); +} + +@Test +public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { Review Comment: good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
wcarlson5 commented on code in PR #13942: URL: https://github.com/apache/kafka/pull/13942#discussion_r1259987822 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ## @@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() { ); } +@Test +public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); +final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KTable source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), +Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); +final KTable tableB = source.filter((k, v) -> true); +streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + +final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); +assertThat( +exception.getMessage(), +is("KTable must be versioned to use a grace period in a stream table join.") +); +} + +@Test +public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { +final StreamsBuilder builder = new StreamsBuilder(); +final Properties props = new Properties(); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); +final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); +final KTable source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), +Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5; +final KTable tableB = source.filter((k, v) -> true); +streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + +//should not throw an error +builder.build(); +} + +@Test +public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() { Review Comment: Sure, easy enough -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259983786 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
gharris1727 commented on code in PR #13992: URL: https://github.com/apache/kafka/pull/13992#discussion_r1259982271 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -321,10 +321,10 @@ void refreshTopicPartitions() if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) { Set newTopicPartitions = sourceTopicPartitionsSet; -newTopicPartitions.removeAll(knownSourceTopicPartitions); +newTopicPartitions.removeAll(knownSourceTopicPartitionsSet); Set deletedTopicPartitions = knownSourceTopicPartitionsSet; -deletedTopicPartitions.removeAll(sourceTopicPartitions); +deletedTopicPartitions.removeAll(sourceTopicPartitionsSet); Review Comment: At this point, the `newTopicPartitions.removeAll` will have mutated the `sourceTopicPartitionsSet`, leading this line to miss some removals. I think this results in considering topics that exist in both sets to be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
gharris1727 commented on code in PR #13992: URL: https://github.com/apache/kafka/pull/13992#discussion_r1259982271 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -321,10 +321,10 @@ void refreshTopicPartitions() if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) { Set newTopicPartitions = sourceTopicPartitionsSet; -newTopicPartitions.removeAll(knownSourceTopicPartitions); +newTopicPartitions.removeAll(knownSourceTopicPartitionsSet); Set deletedTopicPartitions = knownSourceTopicPartitionsSet; -deletedTopicPartitions.removeAll(sourceTopicPartitions); +deletedTopicPartitions.removeAll(sourceTopicPartitionsSet); Review Comment: At this point, the `newTopicPartitions.removeAll` will have mutated the `sourceTopicPartitionsSet`, leading this line to miss some removals. I think this results in considering topics that exist in both sets to be considered deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
wcarlson5 commented on code in PR #13942: URL: https://github.com/apache/kafka/pull/13942#discussion_r1259980518 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java: ## @@ -64,6 +70,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { // Steam - KTable join only if (otherJoinSideNodeName != null) { topologyBuilder.connectProcessorAndStateStores(processorName, storeNames); +if (gracePeriod != null) { +for (final String storeName : storeNames) { +if (!topologyBuilder.isStoreVersioned(storeName)) { +throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); +} +if (gracePeriod.toMillis() > topologyBuilder.getHistoryRetention(storeName)) { +throw new IllegalArgumentException("History history retention must be at least grace period, but it should be larger."); Review Comment: This change is fine, just equal is enough, if not recommended -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors
[ https://issues.apache.org/jira/browse/KAFKA-15179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15179: -- Issue Type: Test (was: Improvement) > Add integration tests for the FileStream Sink and Source connectors > --- > > Key: KAFKA-15179 > URL: https://issues.apache.org/jira/browse/KAFKA-15179 > Project: Kafka > Issue Type: Test >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > > Add integration tests for the FileStream Sink and Source connectors covering > various different common scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259882152 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: I'd prefer to leave the task parsing the same; less work on our part, and less risk of a regression in existing parts of the code base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #13453: KAFKA-12525: Ignoring stale status statuses when reading from Connect status topic
kkonstantine merged PR #13453: URL: https://github.com/apache/kafka/pull/13453 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
C0urante commented on code in PR #13913: URL: https://github.com/apache/kafka/pull/13913#discussion_r1259876757 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -581,13 +582,26 @@ void incrementalAlterConfigs(Map topicConfigs) { })); } -private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +// Visible for testing +int updateTopicAcls(List bindings) { +Set addBindings = new HashSet<>(bindings); +Set failedBindings = new HashSet<>(); +addBindings.removeAll(knownTopicAclBindings); +int newBindCount = addBindings.size(); +if (!addBindings.isEmpty()) { +log.info("Syncing new found {} topic ACL bindings.", newBindCount); +targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); +failedBindings.add(k); +} +})); +knownTopicAclBindings = new HashSet<>(bindings); +knownTopicAclBindings.removeAll(failedBindings); Review Comment: Isn't `failedBindings` likely to be empty at this point unless the admin client is able to perform the request to create the ACL bindings exceptionally fast (which also happens to be the scenario we're covering in the unit test)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14481) Move LogSegment/LogSegments to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742051#comment-17742051 ] Ismael Juma commented on KAFKA-14481: - [~satish.duggana] Are you still working on this? > Move LogSegment/LogSegments to storage module > - > > Key: KAFKA-14481 > URL: https://issues.apache.org/jira/browse/KAFKA-14481 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15179) Add integration tests for the FileStream Sink and Source connectors
Yash Mayya created KAFKA-15179: -- Summary: Add integration tests for the FileStream Sink and Source connectors Key: KAFKA-15179 URL: https://issues.apache.org/jira/browse/KAFKA-15179 Project: Kafka Issue Type: Improvement Reporter: Yash Mayya Assignee: Yash Mayya Add integration tests for the FileStream Sink and Source connectors covering various different common scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on PR #13945: URL: https://github.com/apache/kafka/pull/13945#issuecomment-1630945143 I was toying with the idea of adding ITs for these file connectors in this very PR earlier; but after a bit of tinkering, left it for later I'd be happy to file a separate ticket for that and assign it to myself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15172) Allow exact mirroring of ACLs between clusters
[ https://issues.apache.org/jira/browse/KAFKA-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15172: -- Labels: needs-kip (was: ) > Allow exact mirroring of ACLs between clusters > -- > > Key: KAFKA-15172 > URL: https://issues.apache.org/jira/browse/KAFKA-15172 > Project: Kafka > Issue Type: Task > Components: mirrormaker >Reporter: Mickael Maison >Priority: Major > Labels: needs-kip > > When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The > rationale to is prevent other clients to produce to remote topics. > However in disaster recovery scenarios, where the target cluster is not used > and just a "hot standby", it would be preferable to have exactly the same > ACLs on both clusters to speed up failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259827626 ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("other_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +assertTrue(connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectOffsetKey() { +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap("other_offset_key", 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsOffsetPositionValues() { Review Comment: Nice! ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("other_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +assertTrue(connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectOffsetKey() { +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap("other_offset_key", 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsOffsetPositionValues() { Review Comment: Nice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821609 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; +} + +if (!offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} + +// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value +try { +long offsetPosition = Long.parseLong(String.valueOf(offset.get(POSITION_FIELD))); Review Comment: Ah, good catch, thanks. I think it might probably be a bit friendlier if we update the task class instead to do similar parsing, WDYT? I'm okay either way, since the most common use case would be copy pasting the output from `GET /offsets` and modifying it in which case users would end up using a number rather than a string anyway. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; Review Comment: Whoops, thanks. I forgot that we're no longer doing the single partition offset pair validation 臘 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259821859 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; Review Comment: Whoops, thanks. I forgot that we're no longer doing the single partition offset pair validation 臘 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15059) Exactly-once source tasks fail to start during pending rebalances
[ https://issues.apache.org/jira/browse/KAFKA-15059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742034#comment-17742034 ] Chris Egerton commented on KAFKA-15059: --- [~divijvaidya] no; this was introduced by [https://github.com/apache/kafka/pull/13465]. It did not affect anything except trunk and never made it into a release (see the affected/fix versions fields). > Exactly-once source tasks fail to start during pending rebalances > - > > Key: KAFKA-15059 > URL: https://issues.apache.org/jira/browse/KAFKA-15059 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 3.6.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 3.6.0 > > > When asked to perform a round of zombie fencing, the distributed herder will > [reject the > request|https://github.com/apache/kafka/blob/17fd30e6b457f097f6a524b516eca1a6a74a9144/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1249-L1250] > if a rebalance is pending, which can happen if (among other things) a config > for a new connector or a new set of task configs has been recently read from > the config topic. > Normally this can be alleviated with a simple task restart, which isn't great > but isn't terrible. > However, when running MirrorMaker 2 in dedicated mode, there is no API to > restart failed tasks, and it can be more common to see this kind of failure > on a fresh cluster because three connector configurations are written in > rapid succession to the config topic. > > In order to provide a better experience for users of both vanilla Kafka > Connect and dedicated MirrorMaker 2 clusters, we can retry (likely with the > same exponential backoff introduced with KAFKA-14732) zombie fencing attempts > that fail due to a pending rebalance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
clolov commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1259787673 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageHistory.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Arrays.stream; +import static java.util.Collections.unmodifiableMap; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Accumulates and retains the interactions between brokers and {@link LocalTieredStorage} instances. + * These interactions are modelled via events of type {@link LocalTieredStorageEvent}. + * + * Events from an instance of storage are captured by the {@link LocalTieredStorageHistory} after + * {@link LocalTieredStorageHistory#listenTo(LocalTieredStorage)} is called. + */ +/* @ThreadSafe */ +public final class LocalTieredStorageHistory { Review Comment: This history is only in-memory, unless I am reading the code wrongly. Are there plans to write it somewhere for debugging? I suspect the main reason behind it is for assertions in tests via the LocalTieredStorageCondition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
C0urante commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259782359 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + +"This is because stdin is used for input and offsets are not tracked."); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset == null) { +return true; Review Comment: Shouldn't this be `continue`? ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,50 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked Review Comment: We can remove this comment now, right? ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +152,102 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("other_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +assertTrue(connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectOffsetKey() { +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap("other_offset_key", 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsOffsetPositionValues() { Review Comment: Maybe
[jira] [Assigned] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API
[ https://issues.apache.org/jira/browse/KAFKA-15177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15177: - Assignee: Chris Egerton > MirrorMaker 2 should implement the alterOffsets KIP-875 API > --- > > Key: KAFKA-15177 > URL: https://issues.apache.org/jira/browse/KAFKA-15177 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect, mirrormaker >Reporter: Yash Mayya >Assignee: Chris Egerton >Priority: Minor > > The {{MirrorSourceConnector}} class should implement the new alterOffsets API > added in > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]. > We could also implement the API in > {{MirrorCheckpointConnector}} and > {{MirrorHeartbeatConnector}} to prevent external modification of offsets > since the operation wouldn't really make sense in their case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
fvaleri commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1259763311 ## tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.PartitionFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter; +import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GetOffsetShell { +Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?"); + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws IOException, ExecutionException, InterruptedException { +GetOffsetShell getOffsetShell = new GetOffsetShell(); + +getOffsetShell.parseArgs(args); + +Map partitionOffsets = getOffsetShell.fetchOffsets(); + +for (Map.Entry entry : partitionOffsets.entrySet()) { +TopicPartition topic = entry.getKey(); + +System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()})); +} +} + +private OptionSet options; +private OptionSpec topicPartitionsOpt; Review Comment: Well, that's not exactly a duplicate, as mine also creates the JmxTool runnable for convenience. For the last part, I guess I didn't want to break ToolsTestUtils visibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742007#comment-17742007 ] ASF GitHub Bot commented on KAFKA-14995: mimaison commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1630858699 Thanks for the updates. I rendered the website with this change and realized the Github usernames are displayed. Can we hide them to keep the same display as today? > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nicktelford opened a new pull request, #13994: MINOR: Remove StreamsProducer flush under EOS
nicktelford opened a new pull request, #13994: URL: https://github.com/apache/kafka/pull/13994 `KafkaProducer.flush` does not need to be called under EOS, because `KafkaProducer.commitTransaction` implicitly flushes the record buffer during the transaction commit. This should reduce the number of round-trips to the Kafka brokers needed by the Streams EOS Producer, improving throughput under EOS. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
[ https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15178: - Labels: easyfix patch-available (was: easyfix) > Poor performance of ConsumerCoordinator with many TopicPartitions > - > > Key: KAFKA-15178 > URL: https://issues.apache.org/jira/browse/KAFKA-15178 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: easyfix, patch-available > Attachments: pollPhase.png > > > Doing some profiling of my Kafka Streams application, I noticed that the > {{pollPhase}} suffers from a minor performance issue. > See the pink tree on the left of the flame graph below. > !pollPhase.png|width=1028,height=308! > {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which > checks the current {{metadataSnapshot}} against the > {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if > there's a large number of topic-partitions being consumed by the application, > then this comparison can perform poorly. > I suspect this can be trivially addressed with a {{boolean}} flag that > indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and > actually needs to be checked, since most of the time it should be identical > to {{{}assignmentSnapshot{}}}. > I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nicktelford opened a new pull request, #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf
nicktelford opened a new pull request, #13993: URL: https://github.com/apache/kafka/pull/13993 When there are many topic-partitions being consumed by the application, the `rejoinNeededOrPending` method can perform poorly, due to the deep equality comparison of `metadataSnapshot` with `assignmentSnapshot`. Since these snapshots can only diverge in `maybeUpdateSubscriptionMetadata`, we can use a `boolean` to cache whether they are already known to be "dirty", and therefore need to be compared. This should shortcut the deep comparison in the common-case that the snapshot hasn't been updated. Fixes KAFKA-15178 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
[ https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15178: - Description: Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See the pink tree on the left of the flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. was: Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. > Poor performance of ConsumerCoordinator with many TopicPartitions > - > > Key: KAFKA-15178 > URL: https://issues.apache.org/jira/browse/KAFKA-15178 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: easyfix > Attachments: pollPhase.png > > > Doing some profiling of my Kafka Streams application, I noticed that the > {{pollPhase}} suffers from a minor performance issue. > See the pink tree on the left of the flame graph below. > !pollPhase.png|width=1028,height=308! > {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which > checks the current {{metadataSnapshot}} against the > {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if > there's a large number of topic-partitions being consumed by the application, > then this comparison can perform poorly. > I suspect this can be trivially addressed with a {{boolean}} flag that > indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and > actually needs to be checked, since most of the time it should be identical > to {{{}assignmentSnapshot{}}}. > I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
Nicholas Telford created KAFKA-15178: Summary: Poor performance of ConsumerCoordinator with many TopicPartitions Key: KAFKA-15178 URL: https://issues.apache.org/jira/browse/KAFKA-15178 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.5.0 Reporter: Nicholas Telford Assignee: Nicholas Telford Attachments: pollPhase.png Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
hudeqi commented on PR #13992: URL: https://github.com/apache/kafka/pull/13992#issuecomment-1630770168 Please help to review this minor pr, thanks! @gharris1727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #13992: MINOR:Avoid slow Set.removeAll(List) in MirrorSourceConnector
hudeqi opened a new pull request, #13992: URL: https://github.com/apache/kafka/pull/13992 similar to https://github.com/apache/kafka/pull/13946. The `refreshTopicPartitions` method is called periodically. When the number of partitions involved in replication is large, performance problems may occur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruslankrivoshein commented on a diff in pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools
ruslankrivoshein commented on code in PR #13562: URL: https://github.com/apache/kafka/pull/13562#discussion_r1259668871 ## tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java: ## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.PartitionFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter; +import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GetOffsetShell { +Pattern topicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*?"); + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws IOException, ExecutionException, InterruptedException { +GetOffsetShell getOffsetShell = new GetOffsetShell(); + +getOffsetShell.parseArgs(args); + +Map partitionOffsets = getOffsetShell.fetchOffsets(); + +for (Map.Entry entry : partitionOffsets.entrySet()) { +TopicPartition topic = entry.getKey(); + +System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()})); +} +} + +private OptionSet options; +private OptionSpec topicPartitionsOpt; Review Comment: Good point, thank you. Speaking about JmxTool, more precisely about JmxToolTest. There is [execute](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java#L354) method that just duplicates [this](https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java#L33), I don't know why. Do you have any explanation about this your approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259657044 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1059,4 +1205,44 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { } }); } + +/** + * The coordinator has been loaded. Session timeouts are registered + * for all members. + */ +public void onLoaded() { +groups.forEach((groupId, group) -> { +switch (group.type()) { +case CONSUMER: +ConsumerGroup consumerGroup = (ConsumerGroup) group; +log.info("Loaded consumer group {} with {} members.", groupId, consumerGroup.members().size()); +consumerGroup.members().forEach((memberId, member) -> { +log.debug("Loaded member {} in consumer group {}.", memberId, groupId); +scheduleConsumerGroupSessionTimeout(groupId, memberId); +if (member.state() == ConsumerGroupMember.MemberState.REVOKING) { +scheduleConsumerGroupRevocationTimeout( +groupId, +memberId, +member.rebalanceTimeoutMs(), +member.memberEpoch() +); +} +}); +break; + +case GENERIC: +GenericGroup genericGroup = (GenericGroup) group; +log.info("Loaded generic group {} with {} members.", groupId, genericGroup.allMembers().size()); Review Comment: I will you complete this part. I think that we need to log each members as I did. We also need to schedule the session timeouts here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259656449 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +764,121 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; Review Comment: They don't rely on the group epoch. Members are only fenced by on their member epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259655653 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +764,121 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +// Cancel all the timers of the member. +cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId()); +cancelConsumerGroupRevocationTimeout(group.groupId(), member.memberId()); + +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING || +member.memberEpoch() != expectedMemberEpoch) { +log.debug("[GroupId " + groupId + "] Ignoring revocation timeout for " + memberId + " because the member " + +"state does not match the expected state."); +return Collections.emptyList(); +} + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"it failed to revoke partitions within {}ms.", revocationTimeoutMs); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259654482 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -720,19 +764,121 @@ private CoordinatorResult consumerGr ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { -log.info("[GroupId " + groupId + "] Computed new subscription metadata: " +log.info("[GroupId " + group.groupId() + "] Computed new subscription metadata: " + subscriptionMetadata + "."); -records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); +records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; -records.add(newGroupEpochRecord(groupId, groupEpoch)); +records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); -return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() -.setMemberId(memberId) -.setMemberEpoch(-1) -); +// Cancel all the timers of the member. +cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId()); +cancelConsumerGroupRevocationTimeout(group.groupId(), member.memberId()); + +return records; +} + +/** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void scheduleConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +String key = consumerGroupSessionTimeoutKey(groupId, memberId); +timer.schedule(key, consumerGroupSessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"its session expired."); + +return consumerGroupFenceMember(group, member); +} catch (GroupIdNotFoundException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the group " + +"does not exist."); +} catch (UnknownMemberIdException ex) { +log.debug("[GroupId " + groupId + "] Could not fence " + memberId + " because the member " + +"does not exist."); +} + +return Collections.emptyList(); +}); +} + +/** + * Cancels the session timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ +private void cancelConsumerGroupSessionTimeout( +String groupId, +String memberId +) { +timer.cancel(consumerGroupSessionTimeoutKey(groupId, memberId)); +} + +/** + * Schedules a revocation timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param revocationTimeoutMs The revocation timeout. + * @param expectedMemberEpoch The expected member epoch. + */ +private void scheduleConsumerGroupRevocationTimeout( +String groupId, +String memberId, +long revocationTimeoutMs, +int expectedMemberEpoch +) { +String key = consumerGroupRevocationTimeoutKey(groupId, memberId); +timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { +try { +ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + +if (member.state() != ConsumerGroupMember.MemberState.REVOKING || +member.memberEpoch() != expectedMemberEpoch) { +log.debug("[GroupId " + groupId + "] Ignoring revocation timeout for " + memberId + " because the member " + +"state does not match the expected state."); +return Collections.emptyList(); +} + +log.info("[GroupId " + groupId + "] Member " + memberId + " fenced from the group because " + +"it failed to revoke partitions within {}ms.", revocationTimeoutMs); Review Comment: Yeah, that makes sense. Let me update all logging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259645303 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } +@Test +public void testSessionTimeoutLifecycle() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is rescheduled on second heartbeat. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(result.response().memberEpoch())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId, memberId, 45000); + +// Advance time. +assertEquals( +Collections.emptyList(), +context.sleep(result.response().heartbeatIntervalMs()) +); + +// Session timer is cancelled on leave. +result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(-1)); +assertEquals(-1, result.response().memberEpoch()); + +// Verify that there are no timers. +context.assertNoSessionTimeout(groupId, memberId); +context.assertNoRevocationTimeout(groupId, memberId); +} + +@Test +public void testSessionTimeoutExpiration() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.build(); + +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Session timer is scheduled on first heartbeat. +CoordinatorResult result = +context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(0) +.setRebalanceTimeoutMs(9) +.setSubscribedTopicNames(Collections.singletonList("foo")) +.setTopicPartitions(Collections.emptyList())); +assertEquals(1, result.response().memberEpoch()); + +// Verify that there is a session time. +context.assertSessionTimeout(groupId,
[GitHub] [kafka] dajac commented on a diff in pull request #13963: KAFKA-14462; [22/N] Implement session and revocation timeouts
dajac commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259639025 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; + +public class MockCoordinatorTimer implements CoordinatorTimer { +public static class ScheduledTimeout { +public final String key; +public final long deadlineMs; +public final TimeoutOperation operation; + +ScheduledTimeout( +String key, +long deadlineMs, +TimeoutOperation operation +) { +this.key = key; +this.deadlineMs = deadlineMs; +this.operation = operation; +} +} + +public static class ExpiredTimeout { +public final String key; +public final List records; + +ExpiredTimeout( +String key, +List records +) { +this.key = key; +this.records = records; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +ExpiredTimeout that = (ExpiredTimeout) o; + +if (!Objects.equals(key, that.key)) return false; +return Objects.equals(records, that.records); +} + +@Override +public int hashCode() { +int result = key != null ? key.hashCode() : 0; +result = 31 * result + (records != null ? records.hashCode() : 0); +return result; +} +} + +private final Time time; + +private final Map> timeoutMap = new HashMap<>(); +private final PriorityQueue> timeoutQueue = new PriorityQueue<>( +Comparator.comparingLong(entry -> entry.deadlineMs) +); + +public MockCoordinatorTimer(Time time) { +this.time = time; +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +boolean retry, +TimeoutOperation operation +) { +cancel(key); + +long deadlineMs = time.milliseconds() + unit.toMillis(delay); +ScheduledTimeout timeout = new ScheduledTimeout<>(key, deadlineMs, operation); +timeoutQueue.add(timeout); +timeoutMap.put(key, timeout); +} + +@Override +public void cancel(String key) { +ScheduledTimeout timeout = timeoutMap.remove(key); +if (timeout != null) { +timeoutQueue.remove(timeout); +} +} + +public boolean contains(String key) { +return timeoutMap.containsKey(key); +} + +public ScheduledTimeout timeout(String key) { +return timeoutMap.get(key); +} + +public int size() { +return timeoutMap.size(); +} + +public List> poll() { Review Comment: Added some javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15166) Add deletePartition API to the RemoteStorageManager
[ https://issues.apache.org/jira/browse/KAFKA-15166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17741962#comment-17741962 ] Ivan Yurchenko commented on KAFKA-15166: [~ckamal] I would comment in advance for the future KIP: not all the implementation may have all the necessary metadata to delete remote segments based on the topic name and partition number. So this probably should be either a checkable capability (i.e. if the RSM cannot do this, the broker doesn't call it) or it should receive a list of all the segment-to-be-deleted metadata. > Add deletePartition API to the RemoteStorageManager > --- > > Key: KAFKA-15166 > URL: https://issues.apache.org/jira/browse/KAFKA-15166 > Project: Kafka > Issue Type: Sub-task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > Remote Storage Manager exposes {{deleteLogSegmentData}} API to delete the > individual log segments. Storage providers such as HDFS have support to > delete a directory. Having an {{deletePartition}} API to delete the data at > the partition level will enhance the topic deletion. > This task may require a KIP as it touches the user-facing APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac opened a new pull request, #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer
dajac opened a new pull request, #13991: URL: https://github.com/apache/kafka/pull/13991 _Temporary includes d7ebb7bba5bcfe35dd2ee5e37f226a84c27ca638_ This is the last patch for KAFKA-14462. It wires all the pieces together in BrokerServer. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
hudeqi commented on PR #13913: URL: https://github.com/apache/kafka/pull/13913#issuecomment-1630624613 I would like to ask a question by the way: Why do we not synchronize the write permission of `TopicAclBing` and `GroupAclBinding` in MirrorSourceConnector? @C0urante @gharris1727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
yashmayya commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1630582887 Thanks @vamossagar12, I don't have any other comments other than the currently open ones - primarily the timeout case for when exactly-once support is enabled, the `ConnectorOffsetBackingStore::set` Javadoc, and the unit tests being in `OffsetStorageWriterTest` versus a dedicated `ConnectorOffsetBackingStoreTest` class (this one isn't blocking though). @C0urante could you please take a look at this whenever you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1259538919 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,8 +284,51 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +FutureCallback secondaryWriteFuture = new FutureCallback<>(); +secondaryStore.set(values, secondaryWriteFuture); +try { +// For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for +// commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: hmm this is is a bit tricky. This is what the `commitTransaction` method does => https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L303-L312 which basically says `Blocks until all outstanding records have been sent and ack'd`. This is what the KIP says as well: ``` The worker-level [offset.flush.timeout.ms](https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms) property will be ignored for exactly-once source tasks. They will be allowed to take as long as necessary to complete an offset commit, since the cost of failure at that point is to fail the source task. Currently, all source task offset commits take place on a single shared worker-global thread. In order to support source task commits without a timeout, but also prevent laggy tasks from disrupting the availability of other tasks on the cluster, the worker will be modified to permit simultaneous source task offset commits. It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are some remedial actions that users can take to nurse their connector back to health: tune their producer configuration for higher throughput, increase the transaction timeout for the producers used by the connector, decrease the offset commit interval (if using interval-based transaction boundaries), or switch to the poll value for the transaction.boundary property. We will include include these steps in the error message for a task that fails due to producer transaction timeout. ``` Keeping these things in mind, I had assumed it's good to wait forever. But I also see your point. I don't think the producer used to write to the global offsets topic would even be a transactional one and yet we are waiting. I think @C0urante would be the best person to answer this at this point, but my opinion is to block forever in this case. But I am open to suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #13945: KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector
yashmayya commented on code in PR #13945: URL: https://github.com/apache/kafka/pull/13945#discussion_r1259462328 ## connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java: ## @@ -147,4 +151,59 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + +@Test +public void testAlterOffsetsStdin() { +sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); +Map, Map> offsets = Collections.singletonMap( +Collections.singletonMap(FILENAME_FIELD, FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +); +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); +} + +@Test +public void testAlterOffsetsIncorrectPartitionKey() { +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +Collections.singletonMap("invalid_partition_key", FILENAME), +Collections.singletonMap(POSITION_FIELD, 0) +))); + +// null partitions are invalid +assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( +null, +Collections.singletonMap(POSITION_FIELD, 0) +))); +} + +@Test +public void testAlterOffsetsMultiplePartitions() { +Map, Map> offsets = new HashMap<>(); +offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0)); +offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); +connector.alterOffsets(sourceProperties, offsets); Review Comment: Ah, good point. Done. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); Review Comment: Makes sense, done. ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -101,4 +105,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } +@Override +public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { +AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); +String filename = config.getString(FILE_CONFIG); +if (filename == null || filename.isEmpty()) { +// If the 'file' configuration is unspecified, stdin is used and no offsets are tracked +throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified"); +} + +// This connector makes use of a single source partition at a time which represents the file that it is configured to read from. +// However, there could also be source partitions from previous configurations of the connector. +for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { +Map partition = partitionOffset.getKey(); +if (partition == null) { +throw new ConnectException("Partition objects cannot be null"); +} + +if (!partition.containsKey(FILENAME_FIELD)) { +throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); +} + +Map offset = partitionOffset.getValue(); +// null offsets are allowed and represent a deletion of offsets for a partition +if (offset != null && !offset.containsKey(POSITION_FIELD)) { +throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); +} +} + +// Let the task validate the actual value for the offset position on startup Review Comment: My initial reasoning was that we probably wouldn't want to open an input stream here to read from the file and verify whether the offset position is actually valid. But yeah, no reason we can't do basic validations
[jira] [Created] (KAFKA-15177) MirrorMaker 2 should implement the alterOffsets KIP-875 API
Yash Mayya created KAFKA-15177: -- Summary: MirrorMaker 2 should implement the alterOffsets KIP-875 API Key: KAFKA-15177 URL: https://issues.apache.org/jira/browse/KAFKA-15177 Project: Kafka Issue Type: Improvement Components: KafkaConnect, mirrormaker Reporter: Yash Mayya The {{MirrorSourceConnector}} class should implement the new alterOffsets API added in [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]. We could also implement the API in {{MirrorCheckpointConnector}} and {{MirrorHeartbeatConnector}} to prevent external modification of offsets since the operation wouldn't really make sense in their case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13944: KAFKA-14953: Add tiered storage related metrics
showuon commented on PR #13944: URL: https://github.com/apache/kafka/pull/13944#issuecomment-1630519509 > > @abhijeetk88 @showuon Let us have a followup JIRA/PR on adding more test for the metrics. > > Sure, please open a JIRA ticket to track it. Thanks. [KAFKA-15176](https://issues.apache.org/jira/browse/KAFKA-15176) is opened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org