[GitHub] [kafka] yashmayya commented on pull request #12615: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito
yashmayya commented on PR #12615: URL: https://github.com/apache/kafka/pull/12615#issuecomment-1244913715 @mimaison @C0urante would either of you be able to take a look at this one whenever possible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #12629: MINOR: Clean up FetcherTest
philipnee commented on PR #12629: URL: https://github.com/apache/kafka/pull/12629#issuecomment-1244884249 @showuon @hachikuji - thanks for reviewing the PR. A follow up PR for cleaning up the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #12629: MINOR: Clean up FetcherTest
philipnee opened a new pull request, #12629: URL: https://github.com/apache/kafka/pull/12629 For the residual in https://github.com/apache/kafka/pull/12603 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969137654 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } +@Test +public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { +buildFetcher(); +assignFromUser(singleton(tp0)); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +assertEquals(100, subscriptions.position(tp0).offset); + +assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused Review Comment: oops, this is a typo. Probably some residual after modifying the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits
showuon commented on PR #12626: URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244881797 I can take a look when I'm available today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
artemlivshits commented on code in PR #12570: URL: https://github.com/apache/kafka/pull/12570#discussion_r969101389 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java: ## @@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo partitionInfo) { * @param cluster The cluster information */ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster) { +updatePartitionInfo(partitionInfo, appendedBytes, cluster, true); +} + +/** + * Update partition info with the number of bytes appended and maybe switch partition. + * NOTE this function needs to be called under the partition's batch queue lock. + * + * @param partitionInfo The sticky partition info object returned by peekCurrentPartitionInfo + * @param appendedBytes The number of bytes appended to this partition + * @param cluster The cluster information + * @param enableSwitch If true, switch partition once produced enough bytes + */ +void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) { // partitionInfo may be null if the caller didn't use built-in partitioner. if (partitionInfo == null) return; assert partitionInfo == stickyPartitionInfo.get(); int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes); -if (producedBytes >= stickyBatchSize) { + +// We're trying to switch partition once we produce stickyBatchSize bytes to a partition +// but doing so may hinder batching because partition switch may happen while batch isn't +// ready to send. This situation is especially likely with high linger.ms setting. +// Consider the following example: +// linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB +// - first batch collects 12KB in 500ms, gets sent +// - second batch collects 4KB, then we switch partition, so 4KB gets eventually sent +// - ... and so on - we'd get 12KB and 4KB batches +// To get more optimal batching and avoid 4KB fractional batches, the caller may disallow +// partition switch if batch is not ready to send, so with the example above we'd avoid +// fractional 4KB batches: in that case the scenario would look like this: +// - first batch collects 12KB in 500ms, gets sent +// - second batch collects 4KB, but partition switch doesn't happen because batch in not ready +// - second batch collects 12KB in 500ms, gets sent and now we switch partition. +// - ... and so on - we'd just send 12KB batches +// We cap the produced bytes to not exceed 2x of the batch size to avoid pathological cases +// (e.g. if we have a mix of keyed and unkeyed messages, key messages may create an +// unready batch after the batch that disabled partition switch becomes ready). +// As a result, with high latency.ms setting we end up switching partitions after producing +// between stickyBatchSize and stickyBatchSize * 2 bytes, to better align with batch boundary. +if (producedBytes >= stickyBatchSize * 2) +log.trace("Exceeded {} bytes, produced {} bytes, enable is {}", stickyBatchSize * 2, producedBytes, enableSwitch); Review Comment: ok ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertEquals(partition1, partition.get()); assertEquals(2, mockRandom.get()); -// Produce large record, we should switch to next partition. +// Produce large record, we switched to next partition by previous produce, but +// for this produce the switch would be disabled because of incomplete batch. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); assertEquals(partition2, partition.get()); -assertEquals(3, mockRandom.get()); +assertEquals(2, mockRandom.get()); Review Comment: What happens here is the following: 1. First record (small) -- gets a new partition (because there was none) 2. Second record (large) doesn't fit, so the first record forms a batch (but not enough to switch). 3. Second record (large) creates a new batch, but it's not marked as full (disabling the switch). 4. Third record arrives, doesn't fit into the batch, it's marked as full (completing the switch, that was disabled in step 3). So effectively in the step 4 the switch happens before the records is added, rather than after.
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969136963 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } +@Test +public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { +buildFetcher(); +assignFromUser(singleton(tp0)); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +assertEquals(100, subscriptions.position(tp0).offset); + +assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused Review Comment: I'll clean it up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
hachikuji commented on PR #12611: URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244878455 I'll go ahead and close this since we're going to merge https://github.com/apache/kafka/pull/12626. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery URL: https://github.com/apache/kafka/pull/12611 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits
hachikuji commented on PR #12626: URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244877452 I have been seeing some recent build failures due to a non-zero exit code from `core:unitTest`. I cannot reproduce locally (of course) and this is making it tough to get some of these 3.3 patches over the line. I'll take a look tomorrow if I can, but if anyone has any ideas, do let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits
hachikuji commented on PR #12626: URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244875608 @showuon Thanks! I appreciate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603328#comment-17603328 ] Jason Gustafson commented on KAFKA-14196: - > Also, this is currently marked as a blocker. Is there a crisp description of >the regression? Prior to revocation, eager rebalance strategies will attempt to auto-commit offsets before revoking partitions and joining the rebalance. Originally this logic was synchronous, which meant there was no opportunity for additional data to be returned before the revocation completed. This changed when we introduced asynchronous offset commit logic. Any progress made between the time the asynchronous offset commit was sent and the revocation completed would be lost. This results in duplicate consumption. > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.2 > > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked
[GitHub] [kafka] hachikuji merged pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji merged PR #12603: URL: https://github.com/apache/kafka/pull/12603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969124759 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +switch (protocol) { +case EAGER: +partitions.addAll(subscriptions.assignedPartitions()); +break; + +case COOPERATIVE: +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: I filed this issue: https://issues.apache.org/jira/browse/KAFKA-14224. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14215) KRaft forwarded requests have no quota enforcement
[ https://issues.apache.org/jira/browse/KAFKA-14215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14215. - Resolution: Fixed > KRaft forwarded requests have no quota enforcement > -- > > Key: KAFKA-14215 > URL: https://issues.apache.org/jira/browse/KAFKA-14215 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0, 3.3 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0, 3.3 > > > On the broker, the `BrokerMetadataPublisher` is responsible for propagating > quota changes from `ClientQuota` records to `ClientQuotaManager`. On the > controller, there is no similar logic, so no client quotas are enforced on > the controller. > On the broker side, there is no enforcement as well since the broker assumes > that the controller will be the one to do it. Basically it looks at the > throttle time returned in the response from the controller. If it is 0, then > the response is sent immediately without any throttling. > So the consequence of both of these issues is that controller-bound requests > have no throttling today. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji merged pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota
hachikuji merged PR #12624: URL: https://github.com/apache/kafka/pull/12624 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
showuon commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969115062 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } +@Test +public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { +buildFetcher(); +assignFromUser(singleton(tp0)); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); Review Comment: Any reason we seek tp0 to offset 100 three times? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -769,6 +773,7 @@ private static class TopicPartitionState { private Long logStartOffset; // the log start offset private Long lastStableOffset; private boolean paused; // whether this partition has been paused by the user +private boolean consumable; Review Comment: I like the name: `pendingRevocation`, too. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } +@Test +public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() { +buildFetcher(); +assignFromUser(singleton(tp0)); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +subscriptions.seek(tp0, 100); +assertEquals(100, subscriptions.position(tp0).offset); + +assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused Review Comment: I don't understand the comment here. Where do we pause tp0? and it is fetchable now, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +switch (protocol) { +case EAGER: +partitions.addAll(subscriptions.assignedPartitions()); +break; + +case COOPERATIVE: +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: Agree that we handle the cooperative issue separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
showuon commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1244838525 ZK 3.6.3 uses old Netty that has CVEs. ZK 3.7.1 upgrade the Netty version to fix it. I agree it's late for 3.3, just want to make sure we're aware of it. [CVE-2021-37136](https://nvd.nist.gov/vuln/detail/CVE-2021-37136) [CVE-2021-37137](https://nvd.nist.gov/vuln/detail/CVE-2021-37137) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1
ijuma commented on PR #12620: URL: https://github.com/apache/kafka/pull/12620#issuecomment-1244830200 What CVEs are this? This kind of upgrade requires quite a lot of validation, it's too late for 3.3 unless the impact is severe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14224) Consumer should auto-commit prior to cooperative partition revocation
[ https://issues.apache.org/jira/browse/KAFKA-14224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14224: Description: With the old "eager" reassignment logic, we always revoked all partitions prior to each rebalance. When auto-commit is enabled, a part of this process is committing current position. Under the new "cooperative" logic, we defer revocation until after the rebalance, which means we can continue fetching while the rebalance is in progress. However, when reviewing KAFKA-14196, we noticed that there is no similar logic to commit offsets prior to this deferred revocation. This means that cooperative consumption is more likely to lead to have duplicate consumption even when there is no failure involved. (was: With the old "eager" reassignment logic, we always revoked all partitions prior to each rebalance. When auto-commit is enabled, a part of this process is committing current position. Under the cooperative logic, we defer revocation until after the rebalance, which means we can continue fetching while the rebalance is in progress. However, when reviewing KAFKA-14196, we noticed that there is no similar logic to commit offsets prior to this deferred revocation. This means that cooperative consumption is more likely to lead to have duplicate consumption even when there is no failure involved.) > Consumer should auto-commit prior to cooperative partition revocation > - > > Key: KAFKA-14224 > URL: https://issues.apache.org/jira/browse/KAFKA-14224 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > With the old "eager" reassignment logic, we always revoked all partitions > prior to each rebalance. When auto-commit is enabled, a part of this process > is committing current position. Under the new "cooperative" logic, we defer > revocation until after the rebalance, which means we can continue fetching > while the rebalance is in progress. However, when reviewing KAFKA-14196, we > noticed that there is no similar logic to commit offsets prior to this > deferred revocation. This means that cooperative consumption is more likely > to lead to have duplicate consumption even when there is no failure involved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14224) Consumer should auto-commit prior to cooperative partition revocation
Jason Gustafson created KAFKA-14224: --- Summary: Consumer should auto-commit prior to cooperative partition revocation Key: KAFKA-14224 URL: https://issues.apache.org/jira/browse/KAFKA-14224 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson With the old "eager" reassignment logic, we always revoked all partitions prior to each rebalance. When auto-commit is enabled, a part of this process is committing current position. Under the cooperative logic, we defer revocation until after the rebalance, which means we can continue fetching while the rebalance is in progress. However, when reviewing KAFKA-14196, we noticed that there is no similar logic to commit offsets prior to this deferred revocation. This means that cooperative consumption is more likely to lead to have duplicate consumption even when there is no failure involved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join
vvcephei commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r969070840 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -243,6 +244,26 @@ public class StreamsConfig extends AbstractConfig { */ public static final String OPTIMIZE = "all"; +/** + * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} + * for enabling the specific optimization that reuses source topic as changelog topic + * for KTables. + */ +public static final String REUSE_KTABLE_SOURCE_TOPICS = "reuse.ktable.source.topics"; + +/** + * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} + * for enabling the specific optimization that merges duplicated repartition topics. + */ +public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics"; + +/** + * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} + * for enabling the optimization that optimizes inner stream-stream joins into self-joins when + * both arguments are the same stream. + */ +public static final String SELF_JOIN = "self.join"; Review Comment: Just a reminder to update this to match the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join
vvcephei commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r956220721 ## gradle.properties: ## @@ -20,7 +20,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py -version=3.3.0-SNAPSHOT +version=3.3.0-VICKY2 Review Comment: TODO: we need to remove this from the PR. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) { internalTopologyBuilder.validateCopartition(); } +/** + * A user can provide either the config OPTIMIZE which means all optimizations rules will be + * applied or they can provide a list of optimization rules. + */ +private void optimizeTopology(final Properties props) { +final List optimizationConfigs; +if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); Review Comment: could be `optimizationConfigs = Collections.singletonList` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig { public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; +public static final String SELF_JOIN_OPTIMIZATION_CONFIG = "self.join.optimization"; Review Comment: Thanks for adding a separate config. I strongly feel this is the right approach for optimization flags. Can we make a config namespace convention to keep these things organized, like `topology.optimization.self.join`? ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() { assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } +@Test +public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() { Review Comment: A more general prohibition would be to disallow OPTIMIZE and NO_OPTIMIZATION in conjunction with a comma at all. ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() { assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } +@Test +public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() { +final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION); +props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value); +final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); +assertTrue(exception.getMessage().contains("A topology can either not be optimized with")); +} + +@Test +public void shouldEnableSelfJoin() { +final String value = StreamsConfig.SELF_JOIN; +props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value); +final StreamsConfig config = new StreamsConfig(props); +assertEquals(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG), StreamsConfig.SELF_JOIN); +} + +@Test +public void shouldMultipleOptimizations() { Review Comment: I get what you mean, but "should multiple optimizations" isn't exactly a sensible statement :) By the way, we might want to add at least one more test that we get the right error if you try to include some extra garbage flag in the list. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) { internalTopologyBuilder.validateCopartition(); } +/** + * A user can provide either the config OPTIMIZE which means all optimizations rules will be + * applied or they can provide a list of optimization rules. + */ +private void optimizeTopology(final Properties props) { +final List optimizationConfigs; +if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); Review Comment: Also, maybe we can just pack this logic into the `StreamsConfig.verifyTopologyOptimizationConfigs` method: 1. if NO_OPTIMIZATION, return empty set 2. else if OPTIMIZE, add all the optimization flags to the set 3. else split on comma and add each configured flag to the
[GitHub] [kafka] showuon commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
showuon commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r969053802 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -1933,11 +1941,81 @@ private Map topicPartitionTags(TopicPartition tp) { } } +// Visible for testing +void maybeCloseFetchSessions(final Timer timer) { +final Cluster cluster = metadata.fetch(); +final List> requestFutures = new ArrayList<>(); +for (final Map.Entry entry : sessionHandlers.entrySet()) { +final FetchSessionHandler sessionHandler = entry.getValue(); +// set the session handler to notify close. This will set the next metadata request to send close message. +sessionHandler.notifyClose(); + +final int sessionId = sessionHandler.sessionId(); +final Integer fetchTargetNodeId = entry.getKey(); +// FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will +// skip sending the close request. +final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); +if (fetchTarget == null || client.isUnavailable(fetchTarget)) { +log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); +continue; +} + +log.debug("Sending close request for fetch session: {} to node: {}", sessionId, fetchTarget); +final RequestFuture responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); +responseFuture.addListener(new RequestFutureListener() { +@Override +public void onSuccess(ClientResponse value) { +log.info("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); +} + +@Override +public void onFailure(RuntimeException e) { +log.info("Unable to a close message for fetch session: {} to node: {}. " + +"This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); +} +}); + +requestFutures.add(responseFuture); +} + +// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until +// all requests have received a response. +do { +client.poll(timer, null, true); +} while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)); + +if (!requestFutures.stream().allMatch(RequestFuture::isDone)) { +// we ran out of time before completing all futures. It is ok since we don't want to block the shutdown +// here. +log.warn("All requests couldn't be sent in the specific timeout period {}ms. " + +"This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + +"KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); +} +} + +public void close(final Timer timer) { +if (isClosed.get()) { +log.info("Fetcher {} is already closed.", this); +return; +} + +// Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence, +// it is necessary to acquire a lock on the fetcher instance before modifying the states. +synchronized (Fetcher.this) { +// we do not need to re-enable wakeups since we are closing already +client.disableWakeups(); +if (nextInLineFetch != null) +nextInLineFetch.drain(); +maybeCloseFetchSessions(timer); +Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier"); +sessionHandlers.clear(); +} +this.isClosed.compareAndSet(false, true); Review Comment: I think we can set the `isClosed` earlier, so that it will not enter the synchronize block multiple times for different threads. ex: ```java if (this.isClosed.compareAndSet(false, true)) { // Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence, // it is necessary to acquire a lock on the fetcher instance before modifying the states. synchronized (Fetcher.this) { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); if (nextInLineFetch != null) nextInLineFetch.drain(); maybeCloseFetchSessions(timer); Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier"); sessionHandlers.clear();
[GitHub] [kafka] showuon commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
showuon commented on code in PR #12590: URL: https://github.com/apache/kafka/pull/12590#discussion_r969044196 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java: ## @@ -446,6 +429,31 @@ private RequestFuture sendMetadataRequest(MetadataRequest.Builde return client.send(node, request); } +/** + * Send Fetch Request to Kafka cluster asynchronously. + * + * This method is visible for testing. + * + * @return A future that indicates result of sent Fetch request + */ +RequestFuture sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, + final Node fetchTarget) { +final short maxVersion = requestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12; + +final FetchRequest.Builder request = FetchRequest.Builder +.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, requestData.toSend()) +.isolationLevel(isolationLevel) +.setMaxBytes(this.maxBytes) +.metadata(requestData.metadata()) +.removed(requestData.toForget()) +.replaced(requestData.toReplace()) +.rackId(clientRackId); + +log.debug("Sending {} {} to broker {}", isolationLevel, requestData, fetchTarget); Review Comment: Thanks for the info. Make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()
showuon merged PR #12591: URL: https://github.com/apache/kafka/pull/12591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()
showuon commented on PR #12591: URL: https://github.com/apache/kafka/pull/12591#issuecomment-1244750774 Failed tests are unrelated ``` Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() Build / JDK 8 and Scala 2.12 / kafka.test.ClusterTestExtensionsTest.[1] Type=ZK, Name=Generated Test, MetadataVersion=3.3-IV3, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() Build / JDK 17 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription() Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailurePositiveDelayTest.testInvalidPasswordSaslPlain() Build / JDK 17 and Scala 2.13 / kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits
showuon commented on code in PR #12626: URL: https://github.com/apache/kafka/pull/12626#discussion_r969036313 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java: ## @@ -211,8 +211,23 @@ public void poll(RequestFuture future) { * @throws InterruptException if the calling thread is interrupted */ public boolean poll(RequestFuture future, Timer timer) { +return poll(future, timer, false); +} + +/** + * Block until the provided request future request has finished or the timeout has expired. + * + * @param future The request future to wait for + * @param timer Timer bounding how long this method can block + * @param disableWakeup true if we should not check for wakeups, false otherwise + * + * @return true if the future is done, false otherwise + * @throws WakeupException if {@link #wakeup()} is called from another thread Review Comment: We should mention `WakeupException` only throws when `disableWakeup` is false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg commented on code in PR #12627: URL: https://github.com/apache/kafka/pull/12627#discussion_r969037841 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -129,23 +151,24 @@ public synchronized void loadSnapshot(Map acls) { public List authorize( AuthorizableRequestContext requestContext, List actions) { -StandardAuthorizerData curData = data; -List results = new ArrayList<>(actions.size()); -for (Action action: actions) { -AuthorizationResult result = curData.authorize(requestContext, action); -results.add(result); -} -return results; +return inReadLock(() -> { Review Comment: Okay. I'll remove the functions. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -534,49 +536,19 @@ Iterable acls(AclBindingFilter filter) { } class AclIterable implements Iterable { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969035562 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -2283,6 +2307,37 @@ public void testRestOffsetsAuthorizationFailure() { assertEquals(5, subscriptions.position(tp0).offset); } +@Test +public void testPendingRevacationPartitionFetching() { Review Comment: nit: Revocation is misspelled I did not find the name very clear. It looks like the main difference between this and `testFetchingPendingPartitions` is that this method tests that the pending state gets reset after reassignment? Perhaps the name should reflect that? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -272,6 +272,30 @@ public void testFetchNormal() { } } +@Test +public void testFetchingPendingPartitions() { +buildFetcher(); + +assignFromUser(singleton(tp0)); +subscriptions.seek(tp0, 0); + +// normal fetch +assertEquals(1, fetcher.sendFetches()); +client.prepareResponse(fullFetchResponse(tidp0, this.records, Errors.NONE, 100L, 0)); +consumerClient.poll(time.timer(0)); +assertTrue(fetcher.hasCompletedFetches()); +fetchedRecords(); +assertEquals(4L, subscriptions.position(tp0).offset); // this is the next fetching position + +// mark partition unfetchable +subscriptions.markPendingRevocation(singleton(tp0)); Review Comment: Another scenario is that we already have the fetch inflight when we mark pending revocation. Can we test that as well? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java: ## @@ -256,6 +256,15 @@ public void partitionPause() { assertTrue(state.isFetchable(tp0)); } +@Test +public void testMarkingPartitionPending() { +state.assignFromUser(singleton(tp0)); +state.seek(tp0, 100); +assertTrue(state.isFetchable(tp0)); +state.markPendingRevocation(singleton(tp0)); +assertFalse(state.isFetchable(tp0)); Review Comment: Perhaps we can also assert `isPaused` is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
hachikuji commented on code in PR #12627: URL: https://github.com/apache/kafka/pull/12627#discussion_r969028196 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -129,23 +151,24 @@ public synchronized void loadSnapshot(Map acls) { public List authorize( AuthorizableRequestContext requestContext, List actions) { -StandardAuthorizerData curData = data; -List results = new ArrayList<>(actions.size()); -for (Action action: actions) { -AuthorizationResult result = curData.authorize(requestContext, action); -results.add(result); -} -return results; +return inReadLock(() -> { Review Comment: I know it does not look as pretty, but perhaps we should just do the `try/finally` blocks. Especially for the case of `authorize`, it is annoying to have the additional allocations just to pass the calback. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java: ## @@ -97,17 +119,17 @@ public void completeInitialLoad(Exception e) { @Override public void addAcl(Uuid id, StandardAcl acl) { -data.addAcl(id, acl); +inWriteLock(() -> data.addAcl(id, acl)); Review Comment: I am not sure how much it matters, but it would probably be more efficient for `addAcl` to be a batched API. Perhaps it is ok since hopefully most of the time the brunt of the initialization is in `loadSnapshot`. ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -534,49 +536,19 @@ Iterable acls(AclBindingFilter filter) { } class AclIterable implements Iterable { Review Comment: Do we still need this class since it is just wrapping a list iterator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12597: KAFKA-14205; Document how to replace the disk for the KRaft Controller
jsancio merged PR #12597: URL: https://github.com/apache/kafka/pull/12597 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12597: KAFKA-14205; Document how to replace the disk for the KRaft Controller
jsancio commented on code in PR #12597: URL: https://github.com/apache/kafka/pull/12597#discussion_r969023966 ## docs/ops.html: ## @@ -1373,6 +1373,27 @@ delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance. + Replace KRaft Controller Disk + When Kafka is configured to use KRaft, the controllers store the cluster metadata in the directory specified in metadata.log.dir -- or the first log directory, if metadata.log.dir is not configured. See the documentation for metadata.log.dir for details. + + If the data in the cluster metdata directory is lost either because of hardware failure or the hardware needs to be replace, care should be taken when provisioning the new controller node. The new controller node should not be formatted and started until the majority of the controllers have all of the committed data. To determine if the majority of the controllers have the committed data, run the kafka-metadata-quorum.sh tool to describe the replication status: Review Comment: Fixed. ## docs/ops.html: ## @@ -1373,6 +1373,27 @@ delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance. + Replace KRaft Controller Disk + When Kafka is configured to use KRaft, the controllers store the cluster metadata in the directory specified in metadata.log.dir -- or the first log directory, if metadata.log.dir is not configured. See the documentation for metadata.log.dir for details. + + If the data in the cluster metdata directory is lost either because of hardware failure or the hardware needs to be replace, care should be taken when provisioning the new controller node. The new controller node should not be formatted and started until the majority of the controllers have all of the committed data. To determine if the majority of the controllers have the committed data, run the kafka-metadata-quorum.sh tool to describe the replication status: + + bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --replication + NodeId LogEndOffsetLag LastFetchTimestamp LastCaughtUpTimestamp Status + 1 25806 0 1662500992757 1662500992757 Leader + ... ... ... ... ... ... + + + Check and wait until the Lag is small for the majority of the controllers. Check and wait until the LastFetchTimestamp and LastCaughtUpTimestamp are close to each other for the majority of the controllers. At this point it is safer to format the controller's metadata log directory. This can be done by running the kafka-storage.sh command. Review Comment: Thanks for the suggestion.Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg commented on PR #12628: URL: https://github.com/apache/kafka/pull/12628#issuecomment-1244722682 ```Benchmark (aclCount) (authorizerType) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 ACL20 20 avgt730.070 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 ACL20 20 avgt 0.010 ms/op AclAuthorizerBenchmark.testAuthorizer 50 ACL20 20 avgt 4.505 ms/op AclAuthorizerBenchmark.testUpdateCache 50 ACL20 20 avgt 1936.356 ms/op Benchmark (aclCount) (authorizerType) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 KRAFT20 20 avgt 2084.634 ms/op AclAuthorizerBenchmark.testAuthorizeByResourceType 50 KRAFT20 20 avgt 6180.318 ms/op AclAuthorizerBenchmark.testAuthorizer 50 KRAFT20 20 avgt 2.768 ms/op AclAuthorizerBenchmark.testUpdateCache 50 KRAFT20 20 avgt ≈ 10⁻⁶ ms/op ``` NOTE: `authorizeByResourceType` is not implemented in `StandardAuthorizer`, so it uses the default implementation in `Authorizer`, hence it is not in the same ballpark as AclAuthorizer. Similarly `updateCache` is not implemented for `StandardAuthorizer` (we use `AclMutator`, so we cannot compare the numbers). With the new implementation, StandardAuthorizer seems to be doing worse on the `AclsIterator` benchmark than `AclAuthorizer` and doing better in `testAuthorizer` which calls `Authorizer#authorize`. I updated the iterator method to only loop once through acls and the performance is in the same ballpark as AclAuthorizer. ``` Benchmark (aclCount) (authorizerType) (denyPercentage) (resourceCount) Mode Cnt Score Error Units AclAuthorizerBenchmark.testAclsIterator 50 KRAFT 20 20 avgt 833.482 ms/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969021668 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { autoCommitOffsetRequestFuture = null; timer.update(); -if (exception != null) { -throw new KafkaException("User rebalance callback throws an error", exception); +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); } return true; } +private SortedSet eagerPartitionsToRevoke(RebalanceProtocol protocol) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (protocol != RebalanceProtocol.EAGER) { +return partitions; +} + +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +private void markPendingPartitions() { Review Comment: How about `maybeMarkPartitionsPendingRevocation`? Otherwise it's a little unclear what exactly is pending. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969021352 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { autoCommitOffsetRequestFuture = null; timer.update(); -if (exception != null) { -throw new KafkaException("User rebalance callback throws an error", exception); +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); } return true; } +private SortedSet eagerPartitionsToRevoke(RebalanceProtocol protocol) { Review Comment: nit: not using this anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969020626 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -935,6 +941,9 @@ private boolean isPaused() { return paused; } +private boolean isPendingRevocation() { Review Comment: nit: do we need this? Is it used anywhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
jsancio commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r969020182 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -29,15 +29,15 @@ public class BatchMemoryPool implements MemoryPool { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
hachikuji commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r969019528 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -29,15 +29,15 @@ public class BatchMemoryPool implements MemoryPool { Review Comment: Could we update the javadoc above to match current behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } -final SortedSet partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); - +final SortedSet eagerPartitionsToRevoke = eagerPartitionsToRevoke(protocol); Review Comment: Hmm, thinking about this a little more since we're down to just the eager protocol. Since the assignment won't change until the rebalance completes, maybe we do not need to precompute it. In other words, maybe we can restore the original logic in `revokePartitions` and we can change `markPendingPartitions` to something like this: ```java private void maybeMarkPartitionsPendingRevocation() { if (protocol == RebalanceProtocol.EAGER) { // When asynchronously committing offsets prior to the revocation of a set of partitions, there will be a // window of time between when the offset commit is sent and when it returns and revocation completes. It is // possible for pending fetches for these partitions to return during this time, which means the application's // position may get ahead of the committed position prior to revocation. This can cause duplicate consumption. // To prevent this, we mark the partitions as "pending revocation," which stops the Fetcher from sending new // fetches or returning data from previous fetches to the user. Set partitions = subscriptions.assignedPartitions() log.debug("Marking assigned partitions pending for revocation: {}", partitions); subscriptions.markPendingRevocation(partitions); } } ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -859,36 +848,49 @@ private Optional revokePartitions(SortedSet partition } else { switch (protocol) { case EAGER: -exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); +exception = Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke)); subscriptions.assignFromSubscribed(Collections.emptySet()); - break; case COOPERATIVE: -Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); -partitions.addAll(ownedPartitions.stream() -.filter(tp -> !subscriptions.subscription().contains(tp.topic())) -.collect(Collectors.toSet())); - -if (!partitions.isEmpty()) { -exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); -ownedPartitions.removeAll(partitions); -subscriptions.assignFromSubscribed(ownedPartitions); -} +exception = revokeUnsubscribedPartitions(); break; } } return exception; } -private void markPartitionsUnconsumable(final Set partitions) { -// KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated -// data returned by the consumer poll loop. Without pausing the partitions, the consumer will move forward -// returning the data w/o committing them. And the progress will be lost once the partition is revoked. -// This only applies to autocommits, as we expect user to handle the offsets menually during the partition -// revocation. -log.debug("Marking assigned partitions unconsumable: {}", partitions); +private Optional revokeUnsubscribedPartitions() { +//For the cooperative strategy, partitions are usually revoked in onJoinComplete when the Review Comment: nit: space after `//` Can we move this comment into the `COOPERATIVE` case in `revokePartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg commented on code in PR #12628: URL: https://github.com/apache/kafka/pull/12628#discussion_r969009047 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -534,49 +535,19 @@ Iterable acls(AclBindingFilter filter) { } class AclIterable implements Iterable { -private final AclBindingFilter filter; +private final List aclBindingList; AclIterable(AclBindingFilter filter) { -this.filter = filter; +this.aclBindingList = aclsByResource Review Comment: Yes, you're right. I think there is no other way we can guarantee the consistency here other than giving an Iterable that stays constant to the client accessing `Authorizer#acls`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
mumrah commented on code in PR #12628: URL: https://github.com/apache/kafka/pull/12628#discussion_r969006371 ## metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java: ## @@ -534,49 +535,19 @@ Iterable acls(AclBindingFilter filter) { } class AclIterable implements Iterable { -private final AclBindingFilter filter; +private final List aclBindingList; AclIterable(AclBindingFilter filter) { -this.filter = filter; +this.aclBindingList = aclsByResource Review Comment: Just to clarify my understanding. Previously, we were wrapping the aclsByResource iterator. This was intended to be thread-safe, but as you mentioned offline, there was no guard against the underlying map getting modified during iteration (since the ConcurrentSkipListMap might show updated elements to the iterator). Instead, we are now making a copy of the matching AclBinding and returning that's lists iterator the caller. So, basically trading off memory for consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg opened a new pull request, #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg opened a new pull request, #12628: URL: https://github.com/apache/kafka/pull/12628 KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. The issue with StandardAuthorizer#authorize is, that it looks up aclsByResources (which is of type ConcurrentSkipListMap)twice for every authorize call and uses Iterator with weak consistency guarantees on top of aclsByResources. This can cause the authorize function call to process the concurrent writes out of order. Implemented ReadWrite lock at StandardAuthorizer level to make sure the reads are strongly consistent with write order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
jsancio commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r968998338 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -90,18 +98,12 @@ public long size() { @Override public long availableMemory() { -lock.lock(); -try { -int freeBatches = free.size() + (maxBatches - numAllocatedBatches); -return freeBatches * (long) batchSize; -} finally { -lock.unlock(); -} +return Integer.MAX_VALUE; Review Comment: Yes. This should be `Long.MAX_VALUE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
jsancio commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r968998214 ## raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java: ## @@ -104,4 +128,15 @@ public void testReleaseBufferNotMatchingBatchSize() { assertThrows(IllegalArgumentException.class, () -> pool.release(buffer)); } +private ByteBuffer touch(ByteBuffer buffer) { Review Comment: Renamed the function to `update`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
jsancio commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r968998063 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -72,7 +74,13 @@ public void release(ByteBuffer previouslyAllocated) { + previouslyAllocated.limit()); } -free.offer(previouslyAllocated); +// Free the buffer if the number of pooled buffers is already the maximum number of batches. +// Otherwise return the buffer to the memory pool. +if (free.size() >= maxBatches) { Review Comment: Yes. Fixed the variable name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join
jnh5y commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r968993197 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java: ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamSelfJoin implements ProcessorSupplier { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamSelfJoin.class); + +private final String windowName; +private final long joinThisBeforeMs; +private final long joinThisAfterMs; +private final long joinOtherBeforeMs; +private final long joinOtherAfterMs; +private final ValueJoinerWithKey joinerThis; +private final ValueJoinerWithKey joinerOther; + +private final TimeTracker sharedTimeTracker; + +KStreamKStreamSelfJoin( +final String windowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joinerThis, +final ValueJoinerWithKey joinerOther, +final TimeTracker sharedTimeTracker) { + +this.windowName = windowName; +this.joinThisBeforeMs = windows.beforeMs; +this.joinThisAfterMs = windows.afterMs; +this.joinOtherBeforeMs = windows.afterMs; +this.joinOtherAfterMs = windows.beforeMs; +this.joinerThis = joinerThis; +this.joinerOther = joinerOther; +this.sharedTimeTracker = sharedTimeTracker; +} + +@Override +public Processor get() { +return new KStreamKStreamSelfJoinProcessor(); +} + +private class KStreamKStreamSelfJoinProcessor extends StreamStreamJoinProcessor { +private WindowStore windowStore; +private Sensor droppedRecordsSensor; + +@Override +public void init(final ProcessorContext context) { +super.init(context); + +final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +windowStore = context.getStateStore(windowName); +} + +@SuppressWarnings("unchecked") +@Override +public void process(final Record record) { +System.out.println("---> Processing record: " + record); +if (skipRecord(record, LOG, droppedRecordsSensor)) { +return; +} + +final long inputRecordTimestamp = record.timestamp(); +long timeFrom = Math.max(0L, inputRecordTimestamp - joinThisBeforeMs); +long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs); +boolean emittedJoinWithSelf = false; + +sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +System.out.println("> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo); + +// Join current record with other +System.out.println("> Window store fetch, timeFrom=" + timeFrom + " timeTo=" + timeTo); Review Comment: System.out.println should likely be removed or replaced with logger.trace/debug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe,
[GitHub] [kafka] akhileshchg closed pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg closed pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. URL: https://github.com/apache/kafka/pull/12627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
hachikuji commented on code in PR #12625: URL: https://github.com/apache/kafka/pull/12625#discussion_r968988957 ## raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java: ## @@ -104,4 +128,15 @@ public void testReleaseBufferNotMatchingBatchSize() { assertThrows(IllegalArgumentException.class, () -> pool.release(buffer)); } +private ByteBuffer touch(ByteBuffer buffer) { Review Comment: nit: `touch` seems a little vague. I think we're just trying to simulate some buffer usage? ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -72,7 +74,13 @@ public void release(ByteBuffer previouslyAllocated) { + previouslyAllocated.limit()); } -free.offer(previouslyAllocated); +// Free the buffer if the number of pooled buffers is already the maximum number of batches. +// Otherwise return the buffer to the memory pool. +if (free.size() >= maxBatches) { Review Comment: Perhaps we should rename `maxBatches` since it is no longer serving as a max. How about `maxRetainedBatches` or something like that since it is still a bound on the number of batches which the pool can hold onto indefinitely. ## raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java: ## @@ -90,18 +98,12 @@ public long size() { @Override public long availableMemory() { -lock.lock(); -try { -int freeBatches = free.size() + (maxBatches - numAllocatedBatches); -return freeBatches * (long) batchSize; -} finally { -lock.unlock(); -} +return Integer.MAX_VALUE; Review Comment: 2 billion bytes is 2GB? Is that enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join
jnh5y commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r968986084 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() { } } +/** + * If the join is a self-join, remove the node KStreamJoinWindow corresponding to the + */ +@SuppressWarnings("unchecked") +private void rewriteSelfJoin(final GraphNode currentNode, final Set visited) { +visited.add(currentNode); +if (currentNode instanceof StreamStreamJoinNode && isSelfJoin(currentNode)) { +((StreamStreamJoinNode) currentNode).setSelfJoin(); +// Remove JoinOtherWindowed node +final GraphNode parent = currentNode.parentNodes().stream().findFirst().get(); +GraphNode left = null, right = null; +for (final GraphNode child: parent.children()) { Review Comment: If there are more than 2 children, is it the case that only two satisfy `isStreamJoinWindowNode`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join
jnh5y commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r968982353 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean optimizeTopology) { internalTopologyBuilder.validateCopartition(); } +/** + * A user can provide either the config OPTIMIZE which means all optimizations rules will be + * applied or they can provide a list of optimization rules. + */ +private void optimizeTopology(final Properties props) { +final List optimizationConfigs; +if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { +optimizationConfigs = new ArrayList<>(); +optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION); +} else { +optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs( +(String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) { +LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes"); +optimizeKTableSourceTopics(); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) { +LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); +maybeOptimizeRepartitionOperations(); +} +if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE) +|| optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) { +LOG.debug("Optimizing the Kafka Streams graph for self-joins"); +rewriteSelfJoin(root, new IdentityHashMap<>()); Review Comment: In this method, could order matter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg opened a new pull request, #12627: KGLOBAL-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.
akhileshchg opened a new pull request, #12627: URL: https://github.com/apache/kafka/pull/12627 KGLOBAL-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads. The issue with StandardAuthorizer#authorize is, that it looks up aclsByResources (which is of type ConcurrentSkipListMap)twice for every authorize call and uses Iterator with weak consistency guarantees on top of aclsByResources. This can cause the authorize function call to process the concurrent writes out of order. Implemented ReadWrite lock at StandardAuthorizer level to make sure the reads are strongly consistent with write order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join
jnh5y commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r968974464 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { -buildAndOptimizeTopology(false); +buildAndOptimizeTopology(null); } -public void buildAndOptimizeTopology(final boolean optimizeTopology) { Review Comment: Is this part of the public API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join
jnh5y commented on code in PR #12555: URL: https://github.com/apache/kafka/pull/12555#discussion_r968974016 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1644,6 +1668,29 @@ private Map getClientCustomProps() { return props; } +public static List verifyTopologyOptimizationConfigs(final String config) { +final List acceptableConfigs = Arrays.asList( Review Comment: I wonder if this list of optimization flags should be a static variable somewhere? Maybe it is fine if it is only used here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +switch (protocol) { +case EAGER: +partitions.addAll(subscriptions.assignedPartitions()); +break; + +case COOPERATIVE: +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: I was looking into the cooperative code path. We revoke the partitions in `onJoinComplete`, so that made me wonder why we don't have the same issue there. In fact, there is no additional offset commit in the current logic, which makes me think that the cooperative logic would already be more prone to duplicate consumption. We don't need to fix this here since it seems to be a pre-existing issue, but I am wondering if the failing system tests also cover cooperative assignment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968970980 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +switch (protocol) { +case EAGER: +partitions.addAll(subscriptions.assignedPartitions()); +break; + +case COOPERATIVE: +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: I don't think we have system test covering the cooperative strategy, so it is quite possible that the duplication here. I think it would be good to update these tests with cooperative strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota
hachikuji commented on PR #12624: URL: https://github.com/apache/kafka/pull/12624#issuecomment-1244563584 @mumrah Thanks for reviewing. Note that the controller will not actually be applying the throttles in the future since it does not have a direct client connection to throttle. The intention is just to let it set the throttle value and to have the broker throttle the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +switch (protocol) { +case EAGER: +partitions.addAll(subscriptions.assignedPartitions()); +break; + +case COOPERATIVE: +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: I was looking into the cooperative code path. We revoke the partition in `onJoinComplete`, so that made me wonder why we don't have the same issue there. In fact, there is no additional offset commit in the current logic, which makes me think that the cooperative logic would already be more prone to duplicate consumption. We don't need to fix this here since it seems to be a pre-existing issue, but I am wondering if the failing system tests also cover cooperative assignment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota
hachikuji commented on code in PR #12624: URL: https://github.com/apache/kafka/pull/12624#discussion_r968967605 ## clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java: ## @@ -268,6 +268,8 @@ public ApiKeys apiKey() { public abstract int throttleTimeMs(); +public abstract void setThrottleTimeMs(int throttleTimeMs); Review Comment: Makes sense. Will change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968922937 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// Revoke all partitions +if (protocol == RebalanceProtocol.EAGER) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// only revoke those partitions that are not in the subscription any more. +if (protocol == RebalanceProtocol.COOPERATIVE) { +// Delay the partition revocation because we don't revoke the already owned partitions +return partitions; +} + +log.warn("Invalid protocol: {}. No partition will be revoked.", protocol); +return partitions; +} + +private Optional revokePartitions(SortedSet partitions, int generation, String memberId) { + // the generation / member-id can possibly be reset by the heartbeat thread // upon getting errors or heartbeat timeouts; in this case whatever is previously // owned partitions would be lost, we should trigger the callback and cleanup the assignment; // otherwise we can proceed normally and revoke the partitions depending on the protocol, // and in that case we should only change the assignment AFTER the revoke callback is triggered // so that users can still access the previously owned partitions to commit offsets etc. -Exception exception = null; -final SortedSet revokedPartitions = new TreeSet<>(COMPARATOR); +Optional exception = Optional.empty(); if (generation == Generation.NO_GENERATION.generationId || -memberId.equals(Generation.NO_GENERATION.memberId)) { -revokedPartitions.addAll(subscriptions.assignedPartitions()); - -if (!revokedPartitions.isEmpty()) { +memberId.equals(Generation.NO_GENERATION.memberId)) { +if (!partitions.isEmpty()) { log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," + -"indicating that consumer is in old state or no longer part of the group"); -exception = invokePartitionsLost(revokedPartitions); - +"indicating that consumer is in old state or no longer part of the group"); +exception = Optional.ofNullable(invokePartitionsLost(partitions)); subscriptions.assignFromSubscribed(Collections.emptySet()); } } else { switch (protocol) { case EAGER: -// revoke all partitions - revokedPartitions.addAll(subscriptions.assignedPartitions()); -exception = invokePartitionsRevoked(revokedPartitions); - +exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); subscriptions.assignFromSubscribed(Collections.emptySet()); break; case COOPERATIVE: -// only revoke those partitions that are not in the subscription any more. Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); -revokedPartitions.addAll(ownedPartitions.stream() -.filter(tp -> !subscriptions.subscription().contains(tp.topic())) -.collect(Collectors.toSet())); - -if (!revokedPartitions.isEmpty()) { -exception = invokePartitionsRevoked(revokedPartitions); +partitions.addAll(ownedPartitions.stream() +.filter(tp -> !subscriptions.subscription().contains(tp.topic())) +.collect(Collectors.toSet())); -
[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260 ] Matthew de Detrich edited comment on KAFKA-14221 at 9/12/22 9:52 PM: - Thanks, INFRA tickets created. I checked the matrix and indeed even the latest variants of the JDK's are older than the versions of JDK where this bug has been fixed. was (Author: mdedetrich-aiven): Thanks, INFRA tickets created > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota
mumrah commented on code in PR #12624: URL: https://github.com/apache/kafka/pull/12624#discussion_r968961932 ## clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java: ## @@ -268,6 +268,8 @@ public ApiKeys apiKey() { public abstract int throttleTimeMs(); +public abstract void setThrottleTimeMs(int throttleTimeMs); Review Comment: How about `maybeSetThrottleTimeMs` since not all response schemas support it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260 ] Matthew de Detrich edited comment on KAFKA-14221 at 9/12/22 9:49 PM: - Thanks, INFRA tickets created was (Author: mdedetrich-aiven): Thanks, INFRA ticket created > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14223) Update jdk_17_latest to adoptium 17.0.4+1
[ https://issues.apache.org/jira/browse/KAFKA-14223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich resolved KAFKA-14223. Resolution: Invalid Created in wrong project > Update jdk_17_latest to adoptium 17.0.4+1 > - > > Key: KAFKA-14223 > URL: https://issues.apache.org/jira/browse/KAFKA-14223 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > The current latest JDK 17 is outdated and Apache Kafka appeared to have hit a > bug due to this (see https://issues.apache.org/jira/browse/KAFKA-14221). > Would it be possible to update jdk_17_latest to the adoptium 17.0.4+1 release > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah merged pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
mumrah merged PR #12596: URL: https://github.com/apache/kafka/pull/12596 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260 ] Matthew de Detrich commented on KAFKA-14221: Thanks, INFRA ticket created > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14223) Update jdk_17_latest to adoptium 17.0.4+1
Matthew de Detrich created KAFKA-14223: -- Summary: Update jdk_17_latest to adoptium 17.0.4+1 Key: KAFKA-14223 URL: https://issues.apache.org/jira/browse/KAFKA-14223 Project: Kafka Issue Type: Task Reporter: Matthew de Detrich The current latest JDK 17 is outdated and Apache Kafka appeared to have hit a bug due to this (see https://issues.apache.org/jira/browse/KAFKA-14221). Would it be possible to update jdk_17_latest to the adoptium 17.0.4+1 release -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968910809 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } +final SortedSet partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); Review Comment: i think it makes sense. We probably can compute the partition during the revocation due to the nature of cooperative protocol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
mumrah commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r968903902 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -325,7 +325,7 @@ class BrokerMetadataListener( try { _image = _delta.apply() } catch { - case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) Review Comment: Hm yea, does look we can return `null`. In this case I believe we will see a `NullPointerException`. The EventQueue should still keep going in this case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968897177 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// Revoke all partitions +if (protocol == RebalanceProtocol.EAGER) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// only revoke those partitions that are not in the subscription any more. +if (protocol == RebalanceProtocol.COOPERATIVE) { +// Delay the partition revocation because we don't revoke the already owned partitions +return partitions; +} + +log.warn("Invalid protocol: {}. No partition will be revoked.", protocol); Review Comment: I think this code is dead? Why don't we use a `switch` like we had before? Then the compiler can help us ensure we handle new cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
hachikuji commented on PR #12611: URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244352156 @showuon Since Guozhang is out sick, I raised https://github.com/apache/kafka/pull/12626 which addresses your (and my) comments. Please take a look. I'll leave this open for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits
hachikuji opened a new pull request, #12626: URL: https://github.com/apache/kafka/pull/12626 Asynchronous offset commits may throw an unexpected `WakeupException` following https://github.com/apache/kafka/pull/11631 and https://github.com/apache/kafka/pull/12244. This patch fixes the problem by passing through a flag to `ensureCoordinatorReady` to indicate whether wakeups should be disabled. Note: this patch builds on top of https://github.com/apache/kafka/pull/12611. Co-Authored-By: Guozhang Wang ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r968872243 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -325,7 +325,7 @@ class BrokerMetadataListener( try { _image = _delta.apply() } catch { - case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) Review Comment: Looking at the documentation for `FaultHandler` it looks like it is allowed for `handleFault` to return `null`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer
jsancio opened a new pull request, #12625: URL: https://github.com/apache/kafka/pull/12625 Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is possible for the memory pool to run out of memory if the snapshot is greater than 5 * 8MB. This change allows the BatchMemoryPool to always allocate a buffer when requested. The memory pool frees the extra allocated buffer when released if the number of pooled buffers is greater than the configured maximum batches. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968822443 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// Revoke all partitions +if (protocol == RebalanceProtocol.EAGER) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// only revoke those partitions that are not in the subscription any more. +if (protocol == RebalanceProtocol.COOPERATIVE) { +// Delay the partition revocation because we don't revoke the already owned partitions Review Comment: leaving this purely for documentation purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
hachikuji commented on code in PR #12611: URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { throw fatalException; } final RequestFuture future = lookupCoordinator(); -client.poll(future, timer); + +// if we do not want to block on discovering coordinator at all, +// then we should not try to poll in a loop, and should not throw wake-up exception either +if (timer.timeoutMs() == 0L) { Review Comment: Yeah, I feel it's a bit slippery to leave the logic in place with just a TODO somewhere to go back and fix it. Too many times, the follow-up never happens. Is it really that much additional effort to add an overload? ```java protected synchronized boolean ensureCoordinatorReady(final Timer timer) { return ensureCoordinatorReady(timer, true); } private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean checkWakeup) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
hachikuji commented on code in PR #12611: URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { throw fatalException; } final RequestFuture future = lookupCoordinator(); -client.poll(future, timer); + +// if we do not want to block on discovering coordinator at all, +// then we should not try to poll in a loop, and should not throw wake-up exception either +if (timer.timeoutMs() == 0L) { Review Comment: Yeah, I feel it's a bit slippery to leave the logic in place with just a TODO somewhere to go back and fix it. Too many times, the follow-up never happens. Is it really that much additional effort to add an overload? ```java protected synchronized boolean ensureCoordinatorReady(final Timer timer) { ensureCoordinatorReady(timer, true); } private synchronized boolean ensureCoordinatorReady(final Timer timer, boolean checkWakeup) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r968754388 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -769,6 +773,7 @@ private static class TopicPartitionState { private Long logStartOffset; // the log start offset private Long lastStableOffset; private boolean paused; // whether this partition has been paused by the user +private boolean consumable; Review Comment: How about calling this `pendingRevocation` or something like that? That might make the usage clearer. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,94 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// Revoke all partitions +if (protocol == RebalanceProtocol.EAGER) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// only revoke those partitions that are not in the subscription any more. +if (protocol == RebalanceProtocol.COOPERATIVE) { +Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); +partitions.addAll(ownedPartitions.stream() +.filter(tp -> !subscriptions.subscription().contains(tp.topic())) +.collect(Collectors.toSet())); +return partitions; +} + +log.debug("Invalid protocol: {}. No partition will be revoked.", protocol); +return partitions; +} + +private Optional revokePartitions(SortedSet partitions, int generation, String memberId) { + // the generation / member-id can possibly be reset by the heartbeat thread // upon getting errors or heartbeat timeouts; in this case whatever is previously // owned partitions would be lost, we should trigger the callback and cleanup the assignment; // otherwise we can proceed normally and revoke the partitions depending on the protocol, // and in that case we should only change the assignment AFTER the revoke callback is triggered // so that users can still access the previously owned partitions to commit offsets etc. -Exception exception = null; -final SortedSet revokedPartitions = new TreeSet<>(COMPARATOR); +Optional exception = Optional.empty(); if (generation == Generation.NO_GENERATION.generationId || -memberId.equals(Generation.NO_GENERATION.memberId)) { -revokedPartitions.addAll(subscriptions.assignedPartitions()); - -if (!revokedPartitions.isEmpty()) { +memberId.equals(Generation.NO_GENERATION.memberId)) { +if (!partitions.isEmpty()) { log.info("Giving away all assigned partitions as lost since generation/memberID has been reset," + -"indicating that consumer is in old state or no longer part of the group"); -exception = invokePartitionsLost(revokedPartitions); - +"indicating that consumer is in old state or no longer part of the group"); +exception = Optional.ofNullable(invokePartitionsLost(partitions)); subscriptions.assignFromSubscribed(Collections.emptySet()); } } else { switch (protocol) { case EAGER: -// revoke all partitions - revokedPartitions.addAll(subscriptions.assignedPartitions()); -exception = invokePartitionsRevoked(revokedPartitions); - +exception = Optional.ofNullable(invokePartitionsRevoked(partitions)); subscriptions.assignFromSubscribed(Collections.emptySet()); break; case COOPERATIVE: -// only revoke those partitions that are not in the subscription any more. Set
[jira] [Created] (KAFKA-14222) Exhausted BatchMemoryPool
Jose Armando Garcia Sancio created KAFKA-14222: -- Summary: Exhausted BatchMemoryPool Key: KAFKA-14222 URL: https://issues.apache.org/jira/browse/KAFKA-14222 Project: Kafka Issue Type: Bug Components: kraft Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 3.3.0 For a large number of topics and partition the broker can encounter this issue: {code:java} [2022-09-12 14:14:42,114] ERROR [BrokerMetadataSnapshotter id=4] Unexpected error handling CreateSnapshotEvent (kafka.server.metadata.BrokerMetadataSnapshotter) org.apache.kafka.raft.errors.BufferAllocationException: Append failed because we failed to allocate memory to write the batch at org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:161) at org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:112) at org.apache.kafka.snapshot.RecordsSnapshotWriter.append(RecordsSnapshotWriter.java:167) at kafka.server.metadata.RecordListConsumer.accept(BrokerMetadataSnapshotter.scala:49) at kafka.server.metadata.RecordListConsumer.accept(BrokerMetadataSnapshotter.scala:42) at org.apache.kafka.image.TopicImage.write(TopicImage.java:78) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:79) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:129) at kafka.server.metadata.BrokerMetadataSnapshotter$CreateSnapshotEvent.run(BrokerMetadataSnapshotter.scala:116) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) at java.base/java.lang.Thread.run(Thread.java:829) {code} This can happen because the snapshot is larger than {{{}5 * 8MB{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota
hachikuji opened a new pull request, #12624: URL: https://github.com/apache/kafka/pull/12624 Currently forwarded requests are not applied to any quotas on either the controller or the broker. The controller-side throttling requires the controller to apply the quota changes from the log to the quota managers, which will be done separately. In this patch, we change the response logic on the broker side to also apply the broker's request quota. The enforced throttle time is the maximum of the throttle returned from the controller (which is 0 until we fix the aforementioned issue) and the broker's request throttle time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
jolshan commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r968682161 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } + runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed. assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed. -assertThrows(TimeoutException.class, commitResult::await); +assertThrows(KafkaException.class, commitResult::await); -assertTrue(transactionManager.hasAbortableError()); -assertTrue(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.hasFatalBumpableError()); +assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.isCompleting()); -assertTrue(transactionManager.transactionContainsPartition(tp0)); -TransactionalRequestResult abortResult = transactionManager.beginAbort(); - -prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); -prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); -runUntil(abortResult::isCompleted); -assertTrue(abortResult.isSuccessful()); -assertFalse(transactionManager.hasOngoingTransaction()); -assertFalse(transactionManager.transactionContainsPartition(tp0)); +assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); Review Comment: I'm wondering, is there a way that we could mitigate this server side? Is it possible to prevent writing the late records after the abort marker? I might be missing something though, so let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build
C0urante commented on PR #12616: URL: https://github.com/apache/kafka/pull/12616#issuecomment-1243995392 @ijuma @jsancio I'm assuming we don't need to backport this to 3.3 since the immediate issues with Swagger deps and Connect OpenAPI docs generation have already been fixed on that branch; let me know if that's not the case, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build
C0urante merged PR #12616: URL: https://github.com/apache/kafka/pull/12616 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build
C0urante commented on PR #12616: URL: https://github.com/apache/kafka/pull/12616#issuecomment-1243994114 Failures appear unrelated, merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603173#comment-17603173 ] Ismael Juma edited comment on KAFKA-14221 at 9/12/22 4:21 PM: -- It seems that we may also want to switch the reference in Apache Kafka's Jenkinsfile based on the latest updates to [https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix] was (Author: ijuma): Actually, it seems that we may want to switch the reference in Apache Kafka's Jenkinsfile based on the latest updates to https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603173#comment-17603173 ] Ismael Juma commented on KAFKA-14221: - Actually, it seems that we may want to switch the reference in Apache Kafka's Jenkinsfile based on the latest updates to https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603170#comment-17603170 ] Ismael Juma commented on KAFKA-14221: - We are running the latest JDK 17 build available as can be seen here: [https://github.com/apache/kafka/blob/trunk/Jenkinsfile#L218] An INFRA ticket can be filed to request an update, see https://issues.apache.org/jira/browse/INFRA-22363 for an example. > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r968614919 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -325,7 +325,7 @@ class BrokerMetadataListener( try { _image = _delta.apply() } catch { - case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) + case t: Throwable => throw metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) Review Comment: @mumrah and I spoke offline about this. How about documenting the state of the broker at this point? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test
mdedetrich commented on PR #12524: URL: https://github.com/apache/kafka/pull/12524#issuecomment-1243928456 Thanks for the comments, I am still get my head around the tests and what precisely is being tested. I forgot to mention earlier that `StreamTaskTest` needs more work (doing this now, will push changes). There are other fixes needed due to the `MockitoJUnitRunner.StrictStubs`, i.e. some tests throw exceptions and expect that exception path to never be hit and this needs to be replaced with `verify(times(0))` or something similar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14216) Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc
[ https://issues.apache.org/jira/browse/KAFKA-14216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-14216. -- Fix Version/s: 3.3 Reviewer: Luke Chen Resolution: Fixed > Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback > javadoc > -- > > Key: KAFKA-14216 > URL: https://issues.apache.org/jira/browse/KAFKA-14216 > Project: Kafka > Issue Type: Bug > Components: docs, documentation >Affects Versions: 3.3.0, 3.3 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.3 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14217) app-reset-tool.html should remove reference to --zookeeper flag that no longer exists
[ https://issues.apache.org/jira/browse/KAFKA-14217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-14217. -- Fix Version/s: 3.3 Reviewer: Luke Chen Resolution: Fixed > app-reset-tool.html should remove reference to --zookeeper flag that no > longer exists > - > > Key: KAFKA-14217 > URL: https://issues.apache.org/jira/browse/KAFKA-14217 > Project: Kafka > Issue Type: Bug > Components: docs, documentation >Affects Versions: 3.3.0, 3.3 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.3 > > > app-reset-tool.html should remove reference to --zookeeper flag that no > longer exists -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe merged pull request #12617: KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc
cmccabe merged PR #12617: URL: https://github.com/apache/kafka/pull/12617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12618: KAFKA-14217: app-reset-tool.html should remove reference to --zookeeper flag that no longer exists
cmccabe merged PR #12618: URL: https://github.com/apache/kafka/pull/12618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich updated KAFKA-14221: --- Description: In a recent test (see [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] run the JVM crashed with the following stack trace {code:java} [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java Runtime Environment: [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, tid=7342 [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment (17.0.1+12) (build 17.0.1+12-LTS-39) [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, parallel gc, linux-amd64) [2022-09-12T14:22:22.414Z] # Problematic frame: [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone .part.0]+0x52 {code} After some research online I found that there was a JDK bug filed for the same kind of crash, see [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] This bug was fixed in JDK 17.0.2 which is a newer version than the one that is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. We should update both JDK 11 and JDk 17 to the latest version in the CI to see if this will solve the problem. was: In a recent test (see [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] run the JVM crashed with the following stack trace {code:java} [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java Runtime Environment: [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, tid=7342 [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment (17.0.1+12) (build 17.0.1+12-LTS-39) [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, parallel gc, linux-amd64) [2022-09-12T14:22:22.414Z] # Problematic frame: [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone .part.0]+0x52 {code} After some research online I found that there was a JDK bug filed for the same kind of crash, see [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] This bug was fixed in JDK 17.0.2 which is a newer version than the one that is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). We should update both JDK 11 and JDk 17 to the latest version in the CI to see if this will solve the problem. > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with > the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash. > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603151#comment-17603151 ] Matthew de Detrich commented on KAFKA-14221: [~ijuma] Maybe you have some idea on how to update the JDK on the CI runner? > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
[ https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich updated KAFKA-14221: --- Description: In a recent test (see [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] run the JVM crashed with the following stack trace {code:java} [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java Runtime Environment: [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, tid=7342 [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment (17.0.1+12) (build 17.0.1+12-LTS-39) [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, parallel gc, linux-amd64) [2022-09-12T14:22:22.414Z] # Problematic frame: [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone .part.0]+0x52 {code} After some research online I found that there was a JDK bug filed for the same kind of crash, see [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] This bug was fixed in JDK 17.0.2 which is a newer version than the one that is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). We should update both JDK 11 and JDk 17 to the latest version in the CI to see if this will solve the problem. was: In a recent test (see [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] run the JVM crashed with the following stack trace [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java Runtime Environment: [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, tid=7342 [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment (17.0.1+12) (build 17.0.1+12-LTS-39) [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, parallel gc, linux-amd64) [2022-09-12T14:22:22.414Z] # Problematic frame: [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone .part.0]+0x52 After some research online I found that there was a JDK bug filed for the same kind of crash, see [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] This bug was fixed in JDK 17.0.2 which is a newer version than the one that is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). We should update both JDK 11 and JDk 17 to the latest version in the CI to see if this will solve the problem. > Update Apache Kafka JVM version in CI to latest versions > > > Key: KAFKA-14221 > URL: https://issues.apache.org/jira/browse/KAFKA-14221 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In a recent test (see > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] > run the JVM crashed with the following stack trace > > {code:java} > [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java > Runtime Environment: > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, > pid=6229, tid=7342 > [2022-09-12T14:22:22.414Z] # > [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment > (17.0.1+12) (build 17.0.1+12-LTS-39) > [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM > (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed > class ptrs, parallel gc, linux-amd64) > [2022-09-12T14:22:22.414Z] # Problematic frame: > [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] > PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) > [clone .part.0]+0x52 > {code} > After some research online I found that there was a JDK bug filed for the > same kind of crash, see > [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] > This bug was fixed in JDK 17.0.2 which is a newer version than the one that > is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). > We should update both JDK 11 and JDk 17 to the latest version in the CI to > see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions
Matthew de Detrich created KAFKA-14221: -- Summary: Update Apache Kafka JVM version in CI to latest versions Key: KAFKA-14221 URL: https://issues.apache.org/jira/browse/KAFKA-14221 Project: Kafka Issue Type: Task Reporter: Matthew de Detrich In a recent test (see [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)] run the JVM crashed with the following stack trace [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java Runtime Environment: [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, tid=7342 [2022-09-12T14:22:22.414Z] # [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment (17.0.1+12) (build 17.0.1+12-LTS-39) [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed class ptrs, parallel gc, linux-amd64) [2022-09-12T14:22:22.414Z] # Problematic frame: [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone .part.0]+0x52 After some research online I found that there was a JDK bug filed for the same kind of crash, see [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886] This bug was fixed in JDK 17.0.2 which is a newer version than the one that is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). We should update both JDK 11 and JDk 17 to the latest version in the CI to see if this will solve the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test
cadonna commented on code in PR #12524: URL: https://github.com/apache/kafka/pull/12524#discussion_r968474491 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ## @@ -308,29 +300,21 @@ public void shouldRestoreTimestampedStoreWithConverter() { @Test public void shouldUnregisterChangelogsDuringClose() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); -reset(storeMetadata); -final StateStore store = EasyMock.createMock(StateStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); -expect(storeMetadata.store()).andStubReturn(store); -expect(store.name()).andStubReturn(persistentStoreName); - -context.uninitialize(); -store.init((StateStoreContext) context, store); -replay(storeMetadata, context, store); +final StateStore store = mock(StateStore.class); +when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); -verify(context, store); +verify(context).uninitialize(); +verify(store).init((StateStoreContext) context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); reset(store); -expect(store.name()).andStubReturn(persistentStoreName); -store.close(); -replay(store); +when(store.name()).thenReturn(persistentStoreName); Review Comment: Is the `reset()` and the stubbing of `store.name()` still needed? The stubbing is the same as on line 304. Since we are not doing a new replay, I am not sure that we still need the `reset()` with Mockito. You think you can moves all `verify()` to the end of the method. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ## @@ -195,12 +196,11 @@ public void shouldTransitToRunningAfterInitialization() { assertEquals(RUNNING, task.state()); -EasyMock.verify(stateManager); +verify(stateManager, atLeastOnce()).registerStateStores(any(), any()); Review Comment: ```suggestion verify(stateManager).registerStateStores(any(), any()); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ## @@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldAlwaysCheckpointStateIfEnforced() { stateManager.flush(); -EasyMock.expectLastCall().once(); +verify(stateManager, times(1)).flush(); Review Comment: Again, you make a call on the mock and verify that you made the call. In EasyMock you need to make the calls on the mock to specify what calls you expect. The call to `replay()` ends the specification phase. When you `verify()`, you verify that the call under test indeed called to methods you specified. This is not the case in Mockito. In Mockito the mocks record the calls and you verify what was called. Making calls on the mock directly in the test method does not make sense. The methods of the mocks should be called within the call under test. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ## @@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldAlwaysCheckpointStateIfEnforced() { stateManager.flush(); -EasyMock.expectLastCall().once(); +verify(stateManager, times(1)).flush(); stateManager.checkpoint(); -EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); -EasyMock.replay(stateManager); +verify(stateManager, times(1)).checkpoint(); + when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()); Review Comment: Again here. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ## @@ -177,10 +181,7 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldTransitToRunningAfterInitialization() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); -stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); -EasyMock.replay(stateManager); +stateManager.registerStateStores(any(), any()); Review Comment: This is also a call on the mock which makes the verification on line 199 always true, independently of the call under test. ##
[GitHub] [kafka] divijvaidya commented on pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()
divijvaidya commented on PR #12591: URL: https://github.com/apache/kafka/pull/12591#issuecomment-1243849390 Thank you both for your review. Since, we agree up to pick up this incremental change, I am assuming no action is required from my side before you approve & merge? Note that the test failures are unrelated and I was able to test them successfully on local machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches
junrao commented on PR #12570: URL: https://github.com/apache/kafka/pull/12570#issuecomment-1243846932 Thanks for the feedback @Kaiserchen . Could you run your MM test by changing `last == null || last.isFull()` to `deque.size() > 1 || last == null || last.isFull()` and see if it's still functioning? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org