[GitHub] [kafka] yashmayya commented on pull request #12615: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito

2022-09-12 Thread GitBox


yashmayya commented on PR #12615:
URL: https://github.com/apache/kafka/pull/12615#issuecomment-1244913715

   @mimaison @C0urante would either of you be able to take a look at this one 
whenever possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12629: MINOR: Clean up FetcherTest

2022-09-12 Thread GitBox


philipnee commented on PR #12629:
URL: https://github.com/apache/kafka/pull/12629#issuecomment-1244884249

   @showuon @hachikuji - thanks for reviewing the PR.  A follow up PR for 
cleaning up the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #12629: MINOR: Clean up FetcherTest

2022-09-12 Thread GitBox


philipnee opened a new pull request, #12629:
URL: https://github.com/apache/kafka/pull/12629

   For the residual in https://github.com/apache/kafka/pull/12603

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969137654


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+assertEquals(100, subscriptions.position(tp0).offset);
+
+assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   oops, this is a typo.  Probably some residual after modifying the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits

2022-09-12 Thread GitBox


showuon commented on PR #12626:
URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244881797

   I can take a look when I'm available today
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-12 Thread GitBox


artemlivshits commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r969101389


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##
@@ -170,13 +170,49 @@ boolean isPartitionChanged(StickyPartitionInfo 
partitionInfo) {
  * @param cluster The cluster information
  */
 void updatePartitionInfo(StickyPartitionInfo partitionInfo, int 
appendedBytes, Cluster cluster) {
+updatePartitionInfo(partitionInfo, appendedBytes, cluster, true);
+}
+
+/**
+ * Update partition info with the number of bytes appended and maybe 
switch partition.
+ * NOTE this function needs to be called under the partition's batch queue 
lock.
+ *
+ * @param partitionInfo The sticky partition info object returned by 
peekCurrentPartitionInfo
+ * @param appendedBytes The number of bytes appended to this partition
+ * @param cluster The cluster information
+ * @param enableSwitch If true, switch partition once produced enough bytes
+ */
+void updatePartitionInfo(StickyPartitionInfo partitionInfo, int 
appendedBytes, Cluster cluster, boolean enableSwitch) {
 // partitionInfo may be null if the caller didn't use built-in 
partitioner.
 if (partitionInfo == null)
 return;
 
 assert partitionInfo == stickyPartitionInfo.get();
 int producedBytes = 
partitionInfo.producedBytes.addAndGet(appendedBytes);
-if (producedBytes >= stickyBatchSize) {
+
+// We're trying to switch partition once we produce stickyBatchSize 
bytes to a partition
+// but doing so may hinder batching because partition switch may 
happen while batch isn't
+// ready to send.  This situation is especially likely with high 
linger.ms setting.
+// Consider the following example:
+//   linger.ms=500, producer produces 12KB in 500ms, batch.size=16KB
+// - first batch collects 12KB in 500ms, gets sent
+// - second batch collects 4KB, then we switch partition, so 4KB 
gets eventually sent
+// - ... and so on - we'd get 12KB and 4KB batches
+// To get more optimal batching and avoid 4KB fractional batches, the 
caller may disallow
+// partition switch if batch is not ready to send, so with the example 
above we'd avoid
+// fractional 4KB batches: in that case the scenario would look like 
this:
+// - first batch collects 12KB in 500ms, gets sent
+// - second batch collects 4KB, but partition switch doesn't 
happen because batch in not ready
+// - second batch collects 12KB in 500ms, gets sent and now we 
switch partition.
+// - ... and so on - we'd just send 12KB batches
+// We cap the produced bytes to not exceed 2x of the batch size to 
avoid pathological cases
+// (e.g. if we have a mix of keyed and unkeyed messages, key messages 
may create an
+// unready batch after the batch that disabled partition switch 
becomes ready).
+// As a result, with high latency.ms setting we end up switching 
partitions after producing
+// between stickyBatchSize and stickyBatchSize * 2 bytes, to better 
align with batch boundary.
+if (producedBytes >= stickyBatchSize * 2)
+log.trace("Exceeded {} bytes, produced {} bytes, enable is {}", 
stickyBatchSize * 2, producedBytes, enableSwitch);

Review Comment:
   ok



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1137,23 +1137,26 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 assertEquals(partition1, partition.get());
 assertEquals(2, mockRandom.get());
 
-// Produce large record, we should switch to next partition.
+// Produce large record, we switched to next partition by previous 
produce, but
+// for this produce the switch would be disabled because of 
incomplete batch.
 accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, 
largeValue, Record.EMPTY_HEADERS,
 callbacks, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
 assertEquals(partition2, partition.get());
-assertEquals(3, mockRandom.get());
+assertEquals(2, mockRandom.get());

Review Comment:
   What happens here is the following:
   
   1. First record (small) -- gets a new partition (because there was none)
   2. Second record (large) doesn't fit, so the first record forms a batch (but 
not enough to switch).
   3. Second record (large) creates a new batch, but it's not marked as full 
(disabling the switch).
   4. Third record arrives, doesn't fit into the batch, it's marked as full 
(completing the switch, that was disabled in step 3).
   
   So effectively in the step 4 the switch happens before the records is added, 
rather than after.

[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969136963


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+assertEquals(100, subscriptions.position(tp0).offset);
+
+assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   I'll clean it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-12 Thread GitBox


hachikuji commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244878455

   I'll go ahead and close this since we're going to merge 
https://github.com/apache/kafka/pull/12626.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-12 Thread GitBox


hachikuji closed pull request #12611: KAFKA-14208: Should not wake-up with 
non-blocking coordinator discovery
URL: https://github.com/apache/kafka/pull/12611


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits

2022-09-12 Thread GitBox


hachikuji commented on PR #12626:
URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244877452

   I have been seeing some recent build failures due to a non-zero exit code 
from `core:unitTest`. I cannot reproduce locally (of course) and this is making 
it tough to get some of these 3.3 patches over the line. I'll take a look 
tomorrow if I can, but if anyone has any ideas, do let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits

2022-09-12 Thread GitBox


hachikuji commented on PR #12626:
URL: https://github.com/apache/kafka/pull/12626#issuecomment-1244875608

   @showuon Thanks! I appreciate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-12 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603328#comment-17603328
 ] 

Jason Gustafson commented on KAFKA-14196:
-

>  Also, this is currently marked as a blocker. Is there a crisp description of 
>the regression?

Prior to revocation, eager rebalance strategies will attempt to auto-commit 
offsets before revoking partitions and joining the rebalance. Originally this 
logic was synchronous, which meant there was no opportunity for additional data 
to be returned before the revocation completed. This changed when we introduced 
asynchronous offset commit logic. Any progress made between the time the 
asynchronous offset commit was sent and the revocation completed would be lost. 
This results in duplicate consumption.

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked 

[GitHub] [kafka] hachikuji merged pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji merged PR #12603:
URL: https://github.com/apache/kafka/pull/12603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969124759


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I filed this issue: https://issues.apache.org/jira/browse/KAFKA-14224.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14215) KRaft forwarded requests have no quota enforcement

2022-09-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14215.
-
Resolution: Fixed

> KRaft forwarded requests have no quota enforcement
> --
>
> Key: KAFKA-14215
> URL: https://issues.apache.org/jira/browse/KAFKA-14215
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0, 3.3
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0, 3.3
>
>
> On the broker, the `BrokerMetadataPublisher` is responsible for propagating 
> quota changes from `ClientQuota` records to `ClientQuotaManager`. On the 
> controller, there is no similar logic, so no client quotas are enforced on 
> the controller.
> On the broker side, there is no enforcement as well since the broker assumes 
> that the controller will be the one to do it. Basically it looks at the 
> throttle time returned in the response from the controller. If it is 0, then 
> the response is sent immediately without any throttling. 
> So the consequence of both of these issues is that controller-bound requests 
> have no throttling today.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji merged pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota

2022-09-12 Thread GitBox


hachikuji merged PR #12624:
URL: https://github.com/apache/kafka/pull/12624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


showuon commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969115062


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);

Review Comment:
   Any reason we seek tp0 to offset 100 three times?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
 private Long logStartOffset; // the log start offset
 private Long lastStableOffset;
 private boolean paused;  // whether this partition has been paused by 
the user
+private boolean consumable;

Review Comment:
   I like the name: `pendingRevocation`, too.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2322,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() 
{
+buildFetcher();
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+subscriptions.seek(tp0, 100);
+assertEquals(100, subscriptions.position(tp0).offset);
+
+assertTrue(subscriptions.isFetchable(tp0)); // because tp is paused

Review Comment:
   I don't understand the comment here. Where do we pause tp0? and it is 
fetchable now, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   Agree that we handle the cooperative issue separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2022-09-12 Thread GitBox


showuon commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1244838525

   ZK 3.6.3 uses old Netty that has CVEs. ZK 3.7.1 upgrade the Netty version to 
fix it. I agree it's late for 3.3, just want to make sure we're aware of it.
   [CVE-2021-37136](https://nvd.nist.gov/vuln/detail/CVE-2021-37136) 
   [CVE-2021-37137](https://nvd.nist.gov/vuln/detail/CVE-2021-37137)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2022-09-12 Thread GitBox


ijuma commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1244830200

   What CVEs are this? This kind of upgrade requires quite a lot of validation, 
it's too late for 3.3 unless the impact is severe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14224) Consumer should auto-commit prior to cooperative partition revocation

2022-09-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14224:

Description: With the old "eager" reassignment logic, we always revoked all 
partitions prior to each rebalance. When auto-commit is enabled, a part of this 
process is committing current position. Under the new "cooperative" logic, we 
defer revocation until after the rebalance, which means we can continue 
fetching while the rebalance is in progress. However, when reviewing 
KAFKA-14196, we noticed that there is no similar logic to commit offsets prior 
to this deferred revocation. This means that cooperative consumption is more 
likely to lead to have duplicate consumption even when there is no failure 
involved.  (was: With the old "eager" reassignment logic, we always revoked all 
partitions prior to each rebalance. When auto-commit is enabled, a part of this 
process is committing current position. Under the cooperative logic, we defer 
revocation until after the rebalance, which means we can continue fetching 
while the rebalance is in progress. However, when reviewing KAFKA-14196, we 
noticed that there is no similar logic to commit offsets prior to this deferred 
revocation. This means that cooperative consumption is more likely to lead to 
have duplicate consumption even when there is no failure involved.)

> Consumer should auto-commit prior to cooperative partition revocation
> -
>
> Key: KAFKA-14224
> URL: https://issues.apache.org/jira/browse/KAFKA-14224
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> With the old "eager" reassignment logic, we always revoked all partitions 
> prior to each rebalance. When auto-commit is enabled, a part of this process 
> is committing current position. Under the new "cooperative" logic, we defer 
> revocation until after the rebalance, which means we can continue fetching 
> while the rebalance is in progress. However, when reviewing KAFKA-14196, we 
> noticed that there is no similar logic to commit offsets prior to this 
> deferred revocation. This means that cooperative consumption is more likely 
> to lead to have duplicate consumption even when there is no failure involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14224) Consumer should auto-commit prior to cooperative partition revocation

2022-09-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14224:
---

 Summary: Consumer should auto-commit prior to cooperative 
partition revocation
 Key: KAFKA-14224
 URL: https://issues.apache.org/jira/browse/KAFKA-14224
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


With the old "eager" reassignment logic, we always revoked all partitions prior 
to each rebalance. When auto-commit is enabled, a part of this process is 
committing current position. Under the cooperative logic, we defer revocation 
until after the rebalance, which means we can continue fetching while the 
rebalance is in progress. However, when reviewing KAFKA-14196, we noticed that 
there is no similar logic to commit offsets prior to this deferred revocation. 
This means that cooperative consumption is more likely to lead to have 
duplicate consumption even when there is no failure involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


vvcephei commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r969070840


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -243,6 +244,26 @@ public class StreamsConfig extends AbstractConfig {
  */
 public static final String OPTIMIZE = "all";
 
+/**
+ * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"}
+ * for enabling the specific optimization that reuses source topic as 
changelog topic
+ * for KTables.
+ */
+public static final String REUSE_KTABLE_SOURCE_TOPICS = 
"reuse.ktable.source.topics";
+
+/**
+ * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"}
+ * for enabling the specific optimization that merges duplicated 
repartition topics.
+ */
+public static final String MERGE_REPARTITION_TOPICS = 
"merge.repartition.topics";
+
+/**
+ * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"}
+ * for enabling the optimization that optimizes inner stream-stream joins 
into self-joins when
+ * both arguments are the same stream.
+ */
+public static final String SELF_JOIN = "self.join";

Review Comment:
   Just a reminder to update this to match the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


vvcephei commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r956220721


##
gradle.properties:
##
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.3.0-SNAPSHOT
+version=3.3.0-VICKY2

Review Comment:
   TODO: we need to remove this from the PR.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean 
optimizeTopology) {
 internalTopologyBuilder.validateCopartition();
 }
 
+/**
+ * A user can provide either the config OPTIMIZE which means all 
optimizations rules will be
+ * applied or they can provide a list of optimization rules.
+ */
+private void optimizeTopology(final Properties props) {
+final List optimizationConfigs;
+if (props == null || 
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   could be `optimizationConfigs = Collections.singletonList`



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -663,6 +663,12 @@ public class StreamsConfig extends AbstractConfig {
 public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
 private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
+public static final String SELF_JOIN_OPTIMIZATION_CONFIG = 
"self.join.optimization";

Review Comment:
   Thanks for adding a separate config. I strongly feel this is the right 
approach for optimization flags.
   
   Can we make a config namespace convention to keep these things organized, 
like `topology.optimization.self.join`?



##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
 
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
 
+@Test
+public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {

Review Comment:
   A more general prohibition would be to disallow OPTIMIZE and NO_OPTIMIZATION 
in conjunction with a comma at all.



##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1263,6 +1263,37 @@ public void testInvalidSecurityProtocol() {
 
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
 
+@Test
+public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+final String value = String.join(",", StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION);
+props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+assertTrue(exception.getMessage().contains("A topology can either not 
be optimized with"));
+}
+
+@Test
+public void shouldEnableSelfJoin() {
+final String value = StreamsConfig.SELF_JOIN;
+props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+final StreamsConfig config = new StreamsConfig(props);
+assertEquals(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG), 
StreamsConfig.SELF_JOIN);
+}
+
+@Test
+public void shouldMultipleOptimizations() {

Review Comment:
   I get what you mean, but "should multiple optimizations" isn't exactly a 
sensible statement :)
   
   By the way, we might want to add at least one more test that we get the 
right error if you try to include some extra garbage flag in the list.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean 
optimizeTopology) {
 internalTopologyBuilder.validateCopartition();
 }
 
+/**
+ * A user can provide either the config OPTIMIZE which means all 
optimizations rules will be
+ * applied or they can provide a list of optimization rules.
+ */
+private void optimizeTopology(final Properties props) {
+final List optimizationConfigs;
+if (props == null || 
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);

Review Comment:
   Also, maybe we can just pack this logic into the 
`StreamsConfig.verifyTopologyOptimizationConfigs` method:
   1. if NO_OPTIMIZATION, return empty set
   2. else if OPTIMIZE, add all the optimization flags to the set
   3. else split on comma and add each configured flag to the 

[GitHub] [kafka] showuon commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-12 Thread GitBox


showuon commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r969053802


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -1933,11 +1941,81 @@ private Map 
topicPartitionTags(TopicPartition tp) {
 }
 }
 
+// Visible for testing
+void maybeCloseFetchSessions(final Timer timer) {
+final Cluster cluster = metadata.fetch();
+final List> requestFutures = new 
ArrayList<>();
+for (final Map.Entry entry : 
sessionHandlers.entrySet()) {
+final FetchSessionHandler sessionHandler = entry.getValue();
+// set the session handler to notify close. This will set the next 
metadata request to send close message.
+sessionHandler.notifyClose();
+
+final int sessionId = sessionHandler.sessionId();
+final Integer fetchTargetNodeId = entry.getKey();
+// FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+// skip sending the close request.
+final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+continue;
+}
+
+log.debug("Sending close request for fetch session: {} to node: 
{}", sessionId, fetchTarget);
+final RequestFuture responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+responseFuture.addListener(new 
RequestFutureListener() {
+@Override
+public void onSuccess(ClientResponse value) {
+log.info("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+}
+
+@Override
+public void onFailure(RuntimeException e) {
+log.info("Unable to a close message for fetch session: {} 
to node: {}. " +
+"This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+}
+});
+
+requestFutures.add(responseFuture);
+}
+
+// Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+// all requests have received a response.
+do {
+client.poll(timer, null, true);
+} while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));
+
+if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
+// we ran out of time before completing all futures. It is ok 
since we don't want to block the shutdown
+// here.
+log.warn("All requests couldn't be sent in the specific timeout 
period {}ms. " +
+"This may result in unnecessary fetch sessions at the broker. 
Consider increasing the timeout passed for " +
+"KafkaConsumer.close(Duration timeout)", timer.timeoutMs());
+}
+}
+
+public void close(final Timer timer) {
+if (isClosed.get()) {
+log.info("Fetcher {} is already closed.", this);
+return;
+}
+
+// Shared states (e.g. sessionHandlers) could be accessed by multiple 
threads (such as heartbeat thread), hence,
+// it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
+synchronized (Fetcher.this) {
+// we do not need to re-enable wakeups since we are closing already
+client.disableWakeups();
+if (nextInLineFetch != null)
+nextInLineFetch.drain();
+maybeCloseFetchSessions(timer);
+Utils.closeQuietly(decompressionBufferSupplier, 
"decompressionBufferSupplier");
+sessionHandlers.clear();
+}
+this.isClosed.compareAndSet(false, true);

Review Comment:
   I think we can set the `isClosed` earlier, so that it will not enter the 
synchronize block multiple times for different threads. ex:
   
   ```java
   if (this.isClosed.compareAndSet(false, true)) {
 // Shared states (e.g. sessionHandlers) could be accessed by 
multiple threads (such as heartbeat thread), hence,
   // it is necessary to acquire a lock on the fetcher instance before 
modifying the states.
   synchronized (Fetcher.this) {
   // we do not need to re-enable wakeups since we are closing 
already
   client.disableWakeups();
   if (nextInLineFetch != null)
   nextInLineFetch.drain();
   maybeCloseFetchSessions(timer);
   Utils.closeQuietly(decompressionBufferSupplier, 
"decompressionBufferSupplier");
   sessionHandlers.clear();
   

[GitHub] [kafka] showuon commented on a diff in pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-12 Thread GitBox


showuon commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r969044196


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##
@@ -446,6 +429,31 @@ private RequestFuture 
sendMetadataRequest(MetadataRequest.Builde
 return client.send(node, request);
 }
 
+/**
+ * Send Fetch Request to Kafka cluster asynchronously.
+ *
+ * This method is visible for testing.
+ *
+ * @return A future that indicates result of sent Fetch request
+ */
+RequestFuture sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,
+ final Node 
fetchTarget) {
+final short maxVersion = requestData.canUseTopicIds() ? 
ApiKeys.FETCH.latestVersion() : (short) 12;
+
+final FetchRequest.Builder request = FetchRequest.Builder
+.forConsumer(maxVersion, this.maxWaitMs, this.minBytes, 
requestData.toSend())
+.isolationLevel(isolationLevel)
+.setMaxBytes(this.maxBytes)
+.metadata(requestData.metadata())
+.removed(requestData.toForget())
+.replaced(requestData.toReplace())
+.rackId(clientRackId);
+
+log.debug("Sending {} {} to broker {}", isolationLevel, requestData, 
fetchTarget);

Review Comment:
   Thanks for the info. Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()

2022-09-12 Thread GitBox


showuon merged PR #12591:
URL: https://github.com/apache/kafka/pull/12591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()

2022-09-12 Thread GitBox


showuon commented on PR #12591:
URL: https://github.com/apache/kafka/pull/12591#issuecomment-1244750774

   Failed tests are unrelated
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   Build / JDK 8 and Scala 2.12 / kafka.test.ClusterTestExtensionsTest.[1] 
Type=ZK, Name=Generated Test, MetadataVersion=3.3-IV3, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailurePositiveDelayTest.testInvalidPasswordSaslPlain()
   Build / JDK 17 and Scala 2.13 / 
kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits

2022-09-12 Thread GitBox


showuon commented on code in PR #12626:
URL: https://github.com/apache/kafka/pull/12626#discussion_r969036313


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:
##
@@ -211,8 +211,23 @@ public void poll(RequestFuture future) {
  * @throws InterruptException if the calling thread is interrupted
  */
 public boolean poll(RequestFuture future, Timer timer) {
+return poll(future, timer, false);
+}
+
+/**
+ * Block until the provided request future request has finished or the 
timeout has expired.
+ *
+ * @param future The request future to wait for
+ * @param timer Timer bounding how long this method can block
+ * @param disableWakeup true if we should not check for wakeups, false 
otherwise
+ *
+ * @return true if the future is done, false otherwise
+ * @throws WakeupException if {@link #wakeup()} is called from another 
thread

Review Comment:
   We should mention `WakeupException` only throws when `disableWakeup` is 
false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg commented on code in PR #12627:
URL: https://github.com/apache/kafka/pull/12627#discussion_r969037841


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -129,23 +151,24 @@ public synchronized void loadSnapshot(Map acls) {
 public List authorize(
 AuthorizableRequestContext requestContext,
 List actions) {
-StandardAuthorizerData curData = data;
-List results = new ArrayList<>(actions.size());
-for (Action action: actions) {
-AuthorizationResult result = curData.authorize(requestContext, 
action);
-results.add(result);
-}
-return results;
+return inReadLock(() -> {

Review Comment:
   Okay. I'll remove the functions. 



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -534,49 +536,19 @@ Iterable acls(AclBindingFilter filter) {
 }
 
 class AclIterable implements Iterable {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969035562


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -2283,6 +2307,37 @@ public void testRestOffsetsAuthorizationFailure() {
 assertEquals(5, subscriptions.position(tp0).offset);
 }
 
+@Test
+public void testPendingRevacationPartitionFetching() {

Review Comment:
   nit: Revocation is misspelled
   
   I did not find the name very clear. It looks like the main difference 
between this and `testFetchingPendingPartitions` is that this method tests that 
the pending state gets reset after reassignment? Perhaps the name should 
reflect that?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -272,6 +272,30 @@ public void testFetchNormal() {
 }
 }
 
+@Test
+public void testFetchingPendingPartitions() {
+buildFetcher();
+
+assignFromUser(singleton(tp0));
+subscriptions.seek(tp0, 0);
+
+// normal fetch
+assertEquals(1, fetcher.sendFetches());
+client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+consumerClient.poll(time.timer(0));
+assertTrue(fetcher.hasCompletedFetches());
+fetchedRecords();
+assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+// mark partition unfetchable
+subscriptions.markPendingRevocation(singleton(tp0));

Review Comment:
   Another scenario is that we already have the fetch inflight when we mark 
pending revocation. Can we test that as well?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##
@@ -256,6 +256,15 @@ public void partitionPause() {
 assertTrue(state.isFetchable(tp0));
 }
 
+@Test
+public void testMarkingPartitionPending() {
+state.assignFromUser(singleton(tp0));
+state.seek(tp0, 100);
+assertTrue(state.isFetchable(tp0));
+state.markPendingRevocation(singleton(tp0));
+assertFalse(state.isFetchable(tp0));

Review Comment:
   Perhaps we can also assert `isPaused` is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12627:
URL: https://github.com/apache/kafka/pull/12627#discussion_r969028196


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -129,23 +151,24 @@ public synchronized void loadSnapshot(Map acls) {
 public List authorize(
 AuthorizableRequestContext requestContext,
 List actions) {
-StandardAuthorizerData curData = data;
-List results = new ArrayList<>(actions.size());
-for (Action action: actions) {
-AuthorizationResult result = curData.authorize(requestContext, 
action);
-results.add(result);
-}
-return results;
+return inReadLock(() -> {

Review Comment:
   I know it does not look as pretty, but perhaps we should just do the 
`try/finally` blocks. Especially for the case of `authorize`, it is annoying to 
have the additional allocations just to pass the calback.



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -97,17 +119,17 @@ public void completeInitialLoad(Exception e) {
 
 @Override
 public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
+inWriteLock(() -> data.addAcl(id, acl));

Review Comment:
   I am not sure how much it matters, but it would probably be more efficient 
for `addAcl` to be a batched API. Perhaps it is ok since hopefully most of the 
time the brunt of the initialization is in `loadSnapshot`. 



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -534,49 +536,19 @@ Iterable acls(AclBindingFilter filter) {
 }
 
 class AclIterable implements Iterable {

Review Comment:
   Do we still need this class since it is just wrapping a list iterator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12597: KAFKA-14205; Document how to replace the disk for the KRaft Controller

2022-09-12 Thread GitBox


jsancio merged PR #12597:
URL: https://github.com/apache/kafka/pull/12597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12597: KAFKA-14205; Document how to replace the disk for the KRaft Controller

2022-09-12 Thread GitBox


jsancio commented on code in PR #12597:
URL: https://github.com/apache/kafka/pull/12597#discussion_r969023966


##
docs/ops.html:
##
@@ -1373,6 +1373,27 @@ 
 delalloc: Delayed allocation means that the filesystem avoid 
allocating any blocks until the physical write occurs. This allows ext4 to 
allocate a large extent instead of smaller pages and helps ensure the data is 
written sequentially. This feature is great for throughput. It does seem to 
involve some locking in the filesystem which adds a bit of latency variance.
   
 
+  Replace KRaft Controller Disk
+  When Kafka is configured to use KRaft, the controllers store the cluster 
metadata in the directory specified in metadata.log.dir -- or the 
first log directory, if metadata.log.dir is not configured. See 
the documentation for metadata.log.dir for details.
+
+  If the data in the cluster metdata directory is lost either because of 
hardware failure or the hardware needs to be replace, care should be taken when 
provisioning the new controller node. The new controller node should not be 
formatted and started until the majority of the controllers have all of the 
committed data. To determine if the majority of the controllers have the 
committed data, run the kafka-metadata-quorum.sh tool to describe 
the replication status:

Review Comment:
   Fixed.



##
docs/ops.html:
##
@@ -1373,6 +1373,27 @@ 
 delalloc: Delayed allocation means that the filesystem avoid 
allocating any blocks until the physical write occurs. This allows ext4 to 
allocate a large extent instead of smaller pages and helps ensure the data is 
written sequentially. This feature is great for throughput. It does seem to 
involve some locking in the filesystem which adds a bit of latency variance.
   
 
+  Replace KRaft Controller Disk
+  When Kafka is configured to use KRaft, the controllers store the cluster 
metadata in the directory specified in metadata.log.dir -- or the 
first log directory, if metadata.log.dir is not configured. See 
the documentation for metadata.log.dir for details.
+
+  If the data in the cluster metdata directory is lost either because of 
hardware failure or the hardware needs to be replace, care should be taken when 
provisioning the new controller node. The new controller node should not be 
formatted and started until the majority of the controllers have all of the 
committed data. To determine if the majority of the controllers have the 
committed data, run the kafka-metadata-quorum.sh tool to describe 
the replication status:
+
+
bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe 
--replication
+ NodeId  LogEndOffsetLag LastFetchTimestamp  LastCaughtUpTimestamp 
  Status
+ 1   25806   0   1662500992757   1662500992757 
  Leader
+ ... ... ... ... ...   
  ...
+  
+
+  Check and wait until the Lag is small for the majority of the 
controllers. Check and wait until the LastFetchTimestamp and 
LastCaughtUpTimestamp are close to each other for the majority of 
the controllers. At this point it is safer to format the controller's metadata 
log directory. This can be done by running the kafka-storage.sh 
command.

Review Comment:
   Thanks for the suggestion.Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg commented on PR #12628:
URL: https://github.com/apache/kafka/pull/12628#issuecomment-1244722682

   ```Benchmark   (aclCount)  
(authorizerType)  (denyPercentage)  (resourceCount)  Mode  Cnt Score   
Error  Units
   AclAuthorizerBenchmark.testAclsIterator 50   
ACL20   20  avgt730.070  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50   
ACL20   20  avgt  0.010  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50   
ACL20   20  avgt  4.505  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50   
ACL20   20  avgt   1936.356  ms/op
   
   Benchmark   (aclCount)  
(authorizerType)  (denyPercentage)  (resourceCount)  Mode  Cnt Score   
Error  Units
   AclAuthorizerBenchmark.testAclsIterator 50 
KRAFT20   20  avgt   2084.634  ms/op
   AclAuthorizerBenchmark.testAuthorizeByResourceType  50 
KRAFT20   20  avgt   6180.318  ms/op
   AclAuthorizerBenchmark.testAuthorizer   50 
KRAFT20   20  avgt  2.768  ms/op
   AclAuthorizerBenchmark.testUpdateCache  50 
KRAFT20   20  avgt ≈ 10⁻⁶  ms/op
   ```
   
   NOTE: `authorizeByResourceType` is not implemented in `StandardAuthorizer`, 
so it uses the default implementation in `Authorizer`, hence it is not in the 
same ballpark as AclAuthorizer. Similarly `updateCache` is not implemented for 
`StandardAuthorizer` (we use `AclMutator`, so we cannot compare the numbers).
   
   With the new implementation, StandardAuthorizer seems to be doing worse on 
the `AclsIterator` benchmark than `AclAuthorizer` and doing better in 
`testAuthorizer` which calls `Authorizer#authorize`.
   
   I updated the iterator method to only loop once through acls and the 
performance is in the same ballpark as AclAuthorizer.
   
   ```
   Benchmark   (aclCount)  
(authorizerType)  (denyPercentage)  (resourceCount)  Mode  Cnt Score   
Error  Units
   AclAuthorizerBenchmark.testAclsIterator  50 KRAFT
20   20  avgt   833.482  ms/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021668


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 autoCommitOffsetRequestFuture = null;
 timer.update();
 
-if (exception != null) {
-throw new KafkaException("User rebalance callback throws an 
error", exception);
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
 }
 return true;
 }
 
+private SortedSet 
eagerPartitionsToRevoke(RebalanceProtocol protocol) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (protocol != RebalanceProtocol.EAGER) {
+return partitions;
+}
+
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+private void markPendingPartitions() {

Review Comment:
   How about `maybeMarkPartitionsPendingRevocation`? Otherwise it's a little 
unclear what exactly is pending.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969021352


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -853,12 +850,38 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 autoCommitOffsetRequestFuture = null;
 timer.update();
 
-if (exception != null) {
-throw new KafkaException("User rebalance callback throws an 
error", exception);
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
 }
 return true;
 }
 
+private SortedSet 
eagerPartitionsToRevoke(RebalanceProtocol protocol) {

Review Comment:
   nit: not using this anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969020626


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -935,6 +941,9 @@ private boolean isPaused() {
 return paused;
 }
 
+private boolean isPendingRevocation() {

Review Comment:
   nit: do we need this? Is it used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


jsancio commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r969020182


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -29,15 +29,15 @@
 public class BatchMemoryPool implements MemoryPool {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r969019528


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -29,15 +29,15 @@
 public class BatchMemoryPool implements MemoryPool {

Review Comment:
   Could we update the javadoc above to match current behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969009257


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,12 +756,11 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
-final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
-
+final SortedSet eagerPartitionsToRevoke = 
eagerPartitionsToRevoke(protocol);

Review Comment:
   Hmm, thinking about this a little more since we're down to just the eager 
protocol. Since the assignment won't change until the rebalance completes, 
maybe we do not need to precompute it. In other words, maybe we can restore the 
original logic in `revokePartitions` and we can change `markPendingPartitions` 
to something like this:
   
   ```java
   private void maybeMarkPartitionsPendingRevocation() {
 if (protocol == RebalanceProtocol.EAGER) {
   // When asynchronously committing offsets prior to the revocation of a 
set of partitions, there will be a
   // window of time between when the offset commit is sent and when it 
returns and revocation completes. It is
   // possible for pending fetches for these partitions to return during 
this time, which means the application's
   // position may get ahead of the committed position prior to revocation. 
This can cause duplicate consumption.
   // To prevent this, we mark the partitions as "pending revocation," 
which stops the Fetcher from sending new
   // fetches or returning data from previous fetches to the user.
   Set partitions = subscriptions.assignedPartitions()
   log.debug("Marking assigned partitions pending for revocation: {}", 
partitions);
   subscriptions.markPendingRevocation(partitions);
 }
   }
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -859,36 +848,49 @@ private Optional 
revokePartitions(SortedSet partition
 } else {
 switch (protocol) {
 case EAGER:
-exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
+exception = 
Optional.ofNullable(invokePartitionsRevoked(eagerPartitionsToRevoke));
 subscriptions.assignFromSubscribed(Collections.emptySet());
-
 break;
 
 case COOPERATIVE:
-Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
-partitions.addAll(ownedPartitions.stream()
-.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
-.collect(Collectors.toSet()));
-
-if (!partitions.isEmpty()) {
-exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
-ownedPartitions.removeAll(partitions);
-subscriptions.assignFromSubscribed(ownedPartitions);
-}
+exception = revokeUnsubscribedPartitions();
 break;
 }
 }
 
 return exception;
 }
 
-private void markPartitionsUnconsumable(final Set 
partitions) {
-// KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
-//  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
-//  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
-//  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
-//  revocation.
-log.debug("Marking assigned partitions unconsumable: {}", partitions);
+private Optional revokeUnsubscribedPartitions() {
+//For the cooperative strategy, partitions are usually revoked in 
onJoinComplete when the

Review Comment:
   nit: space after `//`
   
   Can we move this comment into the `COOPERATIVE` case in `revokePartitions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg commented on code in PR #12628:
URL: https://github.com/apache/kafka/pull/12628#discussion_r969009047


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -534,49 +535,19 @@ Iterable acls(AclBindingFilter filter) {
 }
 
 class AclIterable implements Iterable {
-private final AclBindingFilter filter;
+private final List aclBindingList;
 
 AclIterable(AclBindingFilter filter) {
-this.filter = filter;
+this.aclBindingList = aclsByResource

Review Comment:
   Yes, you're right. I think there is no other way we can guarantee the 
consistency here other than giving an Iterable that stays constant to the 
client accessing `Authorizer#acls`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


mumrah commented on code in PR #12628:
URL: https://github.com/apache/kafka/pull/12628#discussion_r969006371


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -534,49 +535,19 @@ Iterable acls(AclBindingFilter filter) {
 }
 
 class AclIterable implements Iterable {
-private final AclBindingFilter filter;
+private final List aclBindingList;
 
 AclIterable(AclBindingFilter filter) {
-this.filter = filter;
+this.aclBindingList = aclsByResource

Review Comment:
   Just to clarify my understanding.
   
   Previously, we were wrapping the aclsByResource iterator. This was intended 
to be thread-safe, but as you mentioned offline, there was no guard against the 
underlying map getting modified during iteration (since the 
ConcurrentSkipListMap might show updated elements to the iterator).
   
   Instead, we are now making a copy of the matching AclBinding and returning 
that's lists iterator the caller. So, basically trading off memory for 
consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg opened a new pull request, #12628: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg opened a new pull request, #12628:
URL: https://github.com/apache/kafka/pull/12628

   KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent 
ACL reads.
   
   The issue with StandardAuthorizer#authorize is, that it looks up
   aclsByResources (which is of type ConcurrentSkipListMap)twice for every
   authorize call and uses Iterator with weak consistency guarantees on top of
   aclsByResources. This can cause the authorize function call to process the
   concurrent writes out of order.
   
   Implemented ReadWrite lock at StandardAuthorizer level to make sure the reads
   are strongly consistent with write order.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


jsancio commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r968998338


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -90,18 +98,12 @@ public long size() {
 
 @Override
 public long availableMemory() {
-lock.lock();
-try {
-int freeBatches = free.size() + (maxBatches - numAllocatedBatches);
-return freeBatches * (long) batchSize;
-} finally {
-lock.unlock();
-}
+return Integer.MAX_VALUE;

Review Comment:
   Yes. This should be `Long.MAX_VALUE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


jsancio commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r968998214


##
raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java:
##
@@ -104,4 +128,15 @@ public void testReleaseBufferNotMatchingBatchSize() {
 assertThrows(IllegalArgumentException.class, () -> 
pool.release(buffer));
 }
 
+private ByteBuffer touch(ByteBuffer buffer) {

Review Comment:
   Renamed the function to `update`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


jsancio commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r968998063


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -72,7 +74,13 @@ public void release(ByteBuffer previouslyAllocated) {
 + previouslyAllocated.limit());
 }
 
-free.offer(previouslyAllocated);
+// Free the buffer if the number of pooled buffers is already the 
maximum number of batches.
+// Otherwise return the buffer to the memory pool.
+if (free.size() >= maxBatches) {

Review Comment:
   Yes. Fixed the variable name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968993197


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+private final ValueJoinerWithKey joinerOther;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final ValueJoinerWithKey joinerOther,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.joinerOther = joinerOther;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
StreamStreamJoinProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+System.out.println("---> Processing record: " + record);
+if (skipRecord(record, LOG, droppedRecordsSensor)) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+System.out.println("> Window store fetch, timeFrom=" + 
timeFrom + " timeTo=" + timeTo);
+
+// Join current record with other
+System.out.println("> Window store fetch, timeFrom=" + 
timeFrom + " timeTo=" + timeTo);

Review Comment:
   System.out.println should likely be removed or replaced with 
logger.trace/debug?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, 

[GitHub] [kafka] akhileshchg closed pull request #12627: KAFKA-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg closed pull request #12627: KAFKA-14214: Introduce read-write lock 
to StandardAuthorizer for consistent ACL reads.
URL: https://github.com/apache/kafka/pull/12627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12625:
URL: https://github.com/apache/kafka/pull/12625#discussion_r968988957


##
raft/src/test/java/org/apache/kafka/raft/internals/BatchMemoryPoolTest.java:
##
@@ -104,4 +128,15 @@ public void testReleaseBufferNotMatchingBatchSize() {
 assertThrows(IllegalArgumentException.class, () -> 
pool.release(buffer));
 }
 
+private ByteBuffer touch(ByteBuffer buffer) {

Review Comment:
   nit: `touch` seems a little vague. I think we're just trying to simulate 
some buffer usage?



##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -72,7 +74,13 @@ public void release(ByteBuffer previouslyAllocated) {
 + previouslyAllocated.limit());
 }
 
-free.offer(previouslyAllocated);
+// Free the buffer if the number of pooled buffers is already the 
maximum number of batches.
+// Otherwise return the buffer to the memory pool.
+if (free.size() >= maxBatches) {

Review Comment:
   Perhaps we should rename `maxBatches` since it is no longer serving as a 
max. How about `maxRetainedBatches` or something like that since it is still a 
bound on the number of batches which the pool can hold onto indefinitely.



##
raft/src/main/java/org/apache/kafka/raft/internals/BatchMemoryPool.java:
##
@@ -90,18 +98,12 @@ public long size() {
 
 @Override
 public long availableMemory() {
-lock.lock();
-try {
-int freeBatches = free.size() + (maxBatches - numAllocatedBatches);
-return freeBatches * (long) batchSize;
-} finally {
-lock.unlock();
-}
+return Integer.MAX_VALUE;

Review Comment:
   2 billion bytes is 2GB? Is that enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968986084


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
 }
 }
 
+/**
+ * If the join is a self-join, remove the node KStreamJoinWindow 
corresponding to the
+ */
+@SuppressWarnings("unchecked")
+private void rewriteSelfJoin(final GraphNode currentNode, final 
Set visited) {
+visited.add(currentNode);
+if (currentNode instanceof StreamStreamJoinNode && 
isSelfJoin(currentNode)) {
+((StreamStreamJoinNode) currentNode).setSelfJoin();
+// Remove JoinOtherWindowed node
+final GraphNode parent = 
currentNode.parentNodes().stream().findFirst().get();
+GraphNode left = null, right = null;
+for (final GraphNode child: parent.children()) {

Review Comment:
   If there are more than 2 children, is it the case that only two satisfy 
`isStreamJoinWindowNode`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968982353


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -305,6 +307,36 @@ public void buildAndOptimizeTopology(final boolean 
optimizeTopology) {
 internalTopologyBuilder.validateCopartition();
 }
 
+/**
+ * A user can provide either the config OPTIMIZE which means all 
optimizations rules will be
+ * applied or they can provide a list of optimization rules.
+ */
+private void optimizeTopology(final Properties props) {
+final List optimizationConfigs;
+if (props == null || 
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+optimizationConfigs = new ArrayList<>();
+optimizationConfigs.add(StreamsConfig.NO_OPTIMIZATION);
+} else {
+optimizationConfigs = 
StreamsConfig.verifyTopologyOptimizationConfigs(
+(String) 
props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| 
optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+LOG.debug("Optimizing the Kafka Streams graph for ktable source 
nodes");
+optimizeKTableSourceTopics();
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| 
optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+maybeOptimizeRepartitionOperations();
+}
+if (optimizationConfigs.contains(StreamsConfig.OPTIMIZE)
+|| optimizationConfigs.contains(StreamsConfig.SELF_JOIN)) {
+LOG.debug("Optimizing the Kafka Streams graph for self-joins");
+rewriteSelfJoin(root, new IdentityHashMap<>());

Review Comment:
   In this method, could order matter?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg opened a new pull request, #12627: KGLOBAL-14214: Introduce read-write lock to StandardAuthorizer for consistent ACL reads.

2022-09-12 Thread GitBox


akhileshchg opened a new pull request, #12627:
URL: https://github.com/apache/kafka/pull/12627

   KGLOBAL-14214: Introduce read-write lock to StandardAuthorizer for 
consistent ACL reads.
   
   The issue with StandardAuthorizer#authorize is, that it looks up 
aclsByResources (which is of type ConcurrentSkipListMap)twice for every 
authorize call and uses Iterator with weak consistency guarantees on top of 
aclsByResources. This can cause the authorize function call to process the 
concurrent writes out of order.
   
   Implemented ReadWrite lock at StandardAuthorizer level to make sure the 
reads are strongly consistent with write order.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968974464


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -270,17 +277,12 @@ private void maybeAddNodeForOptimizationMetadata(final 
GraphNode node) {
 
 // use this method for testing only
 public void buildAndOptimizeTopology() {
-buildAndOptimizeTopology(false);
+buildAndOptimizeTopology(null);
 }
 
-public void buildAndOptimizeTopology(final boolean optimizeTopology) {

Review Comment:
   Is this part of the public API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12555: Optimize self-join

2022-09-12 Thread GitBox


jnh5y commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r968974016


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1644,6 +1668,29 @@ private Map getClientCustomProps() {
 return props;
 }
 
+public static List verifyTopologyOptimizationConfigs(final String 
config) {
+final List acceptableConfigs = Arrays.asList(

Review Comment:
   I wonder if this list of optimization flags should be a static variable 
somewhere?  
   
   Maybe it is fine if it is only used here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partitions in 
`onJoinComplete`, so that made me wonder why we don't have the same issue 
there. In fact, there is no additional offset commit in the current logic, 
which makes me think that the cooperative logic would already be more prone to 
duplicate consumption. We don't need to fix this here since it seems to be a 
pre-existing issue, but I am wondering if the failing system tests also cover 
cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968970980


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I don't think we have system test covering the cooperative strategy, so it 
is quite possible that the duplication here.  I think it would be good to 
update these tests with cooperative strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota

2022-09-12 Thread GitBox


hachikuji commented on PR #12624:
URL: https://github.com/apache/kafka/pull/12624#issuecomment-1244563584

   @mumrah Thanks for reviewing. Note that the controller will not actually be 
applying the throttles in the future since it does not have a direct client 
connection to throttle. The intention is just to let it set the throttle value 
and to have the broker throttle the client. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968965173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,92 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+switch (protocol) {
+case EAGER:
+partitions.addAll(subscriptions.assignedPartitions());
+break;
+
+case COOPERATIVE:
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   I was looking into the cooperative code path. We revoke the partition in 
`onJoinComplete`, so that made me wonder why we don't have the same issue 
there. In fact, there is no additional offset commit in the current logic, 
which makes me think that the cooperative logic would already be more prone to 
duplicate consumption. We don't need to fix this here since it seems to be a 
pre-existing issue, but I am wondering if the failing system tests also cover 
cooperative assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12624:
URL: https://github.com/apache/kafka/pull/12624#discussion_r968967605


##
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java:
##
@@ -268,6 +268,8 @@ public ApiKeys apiKey() {
 
 public abstract int throttleTimeMs();
 
+public abstract void setThrottleTimeMs(int throttleTimeMs);

Review Comment:
   Makes sense. Will change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968922937


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+// Delay the partition revocation because we don't revoke the 
already owned partitions
+return partitions;
+}
+
+log.warn("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private Optional revokePartitions(SortedSet 
partitions, int generation, String memberId) {
+
 // the generation / member-id can possibly be reset by the heartbeat 
thread
 // upon getting errors or heartbeat timeouts; in this case whatever is 
previously
 // owned partitions would be lost, we should trigger the callback and 
cleanup the assignment;
 // otherwise we can proceed normally and revoke the partitions 
depending on the protocol,
 // and in that case we should only change the assignment AFTER the 
revoke callback is triggered
 // so that users can still access the previously owned partitions to 
commit offsets etc.
-Exception exception = null;
-final SortedSet revokedPartitions = new 
TreeSet<>(COMPARATOR);
+Optional exception = Optional.empty();
 if (generation == Generation.NO_GENERATION.generationId ||
-memberId.equals(Generation.NO_GENERATION.memberId)) {
-revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-if (!revokedPartitions.isEmpty()) {
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+if (!partitions.isEmpty()) {
 log.info("Giving away all assigned partitions as lost since 
generation/memberID has been reset," +
-"indicating that consumer is in old state or no longer 
part of the group");
-exception = invokePartitionsLost(revokedPartitions);
-
+"indicating that consumer is in old state or no longer 
part of the group");
+exception = 
Optional.ofNullable(invokePartitionsLost(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 }
 } else {
 switch (protocol) {
 case EAGER:
-// revoke all partitions
-
revokedPartitions.addAll(subscriptions.assignedPartitions());
-exception = invokePartitionsRevoked(revokedPartitions);
-
+exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 
 break;
 
 case COOPERATIVE:
-// only revoke those partitions that are not in the 
subscription any more.
 Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
-revokedPartitions.addAll(ownedPartitions.stream()
-.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
-.collect(Collectors.toSet()));
-
-if (!revokedPartitions.isEmpty()) {
-exception = invokePartitionsRevoked(revokedPartitions);
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
 
-

[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260
 ] 

Matthew de Detrich edited comment on KAFKA-14221 at 9/12/22 9:52 PM:
-

Thanks, INFRA tickets created. I checked the matrix and indeed even the latest 
variants of the JDK's are older than the versions of JDK where this bug has 
been fixed.


was (Author: mdedetrich-aiven):
Thanks, INFRA tickets created

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on a diff in pull request #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota

2022-09-12 Thread GitBox


mumrah commented on code in PR #12624:
URL: https://github.com/apache/kafka/pull/12624#discussion_r968961932


##
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java:
##
@@ -268,6 +268,8 @@ public ApiKeys apiKey() {
 
 public abstract int throttleTimeMs();
 
+public abstract void setThrottleTimeMs(int throttleTimeMs);

Review Comment:
   How about `maybeSetThrottleTimeMs` since not all response schemas support it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260
 ] 

Matthew de Detrich edited comment on KAFKA-14221 at 9/12/22 9:49 PM:
-

Thanks, INFRA tickets created


was (Author: mdedetrich-aiven):
Thanks, INFRA ticket created

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14223) Update jdk_17_latest to adoptium 17.0.4+1

2022-09-12 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich resolved KAFKA-14223.

Resolution: Invalid

Created in wrong project

> Update jdk_17_latest to adoptium 17.0.4+1
> -
>
> Key: KAFKA-14223
> URL: https://issues.apache.org/jira/browse/KAFKA-14223
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> The current latest JDK 17 is outdated and Apache Kafka appeared to have hit a 
> bug due to this (see https://issues.apache.org/jira/browse/KAFKA-14221). 
> Would it be possible to update jdk_17_latest to the adoptium 17.0.4+1 release 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah merged pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-12 Thread GitBox


mumrah merged PR #12596:
URL: https://github.com/apache/kafka/pull/12596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603260#comment-17603260
 ] 

Matthew de Detrich commented on KAFKA-14221:


Thanks, INFRA ticket created

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14223) Update jdk_17_latest to adoptium 17.0.4+1

2022-09-12 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-14223:
--

 Summary: Update jdk_17_latest to adoptium 17.0.4+1
 Key: KAFKA-14223
 URL: https://issues.apache.org/jira/browse/KAFKA-14223
 Project: Kafka
  Issue Type: Task
Reporter: Matthew de Detrich


The current latest JDK 17 is outdated and Apache Kafka appeared to have hit a 
bug due to this (see https://issues.apache.org/jira/browse/KAFKA-14221). Would 
it be possible to update jdk_17_latest to the adoptium 17.0.4+1 release  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968910809


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   i think it makes sense.  We probably can compute the partition during the 
revocation due to the nature of cooperative protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-12 Thread GitBox


mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968903902


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
 try {
   _image = _delta.apply()
 } catch {
-  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+  case t: Throwable => throw 
metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta 
$delta", t)

Review Comment:
   Hm yea, does look we can return `null`. In this case I believe we will see a 
`NullPointerException`. The EventQueue should still keep going in this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968897177


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+// Delay the partition revocation because we don't revoke the 
already owned partitions
+return partitions;
+}
+
+log.warn("Invalid protocol: {}. No partition will be revoked.", 
protocol);

Review Comment:
   I think this code is dead? Why don't we use a `switch` like we had before? 
Then the compiler can help us ensure we handle new cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-12 Thread GitBox


hachikuji commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1244352156

   @showuon Since Guozhang is out sick, I raised 
https://github.com/apache/kafka/pull/12626 which addresses your (and my) 
comments. Please take a look. I'll leave this open for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #12626: KAFKA-14208; Do not raise wakeup in consumer during asynchronous offset commits

2022-09-12 Thread GitBox


hachikuji opened a new pull request, #12626:
URL: https://github.com/apache/kafka/pull/12626

   Asynchronous offset commits may throw an unexpected `WakeupException` 
following https://github.com/apache/kafka/pull/11631 and 
https://github.com/apache/kafka/pull/12244. This patch fixes the problem by 
passing through a flag to `ensureCoordinatorReady` to indicate whether wakeups 
should be disabled.
   
   Note: this patch builds on top of https://github.com/apache/kafka/pull/12611.
   
   Co-Authored-By: Guozhang Wang 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-12 Thread GitBox


jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968872243


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
 try {
   _image = _delta.apply()
 } catch {
-  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+  case t: Throwable => throw 
metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta 
$delta", t)

Review Comment:
   Looking at the documentation for `FaultHandler` it looks like it is allowed 
for `handleFault` to return `null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #12625: KAFKA-14222; KRaft's memory pool should always allocate a buffer

2022-09-12 Thread GitBox


jsancio opened a new pull request, #12625:
URL: https://github.com/apache/kafka/pull/12625

   Because the snapshot writer sets a linger ms of Integer.MAX_VALUE it is 
possible for the memory pool to run out of memory if the snapshot is greater 
than 5 * 8MB.
   
   This change allows the BatchMemoryPool to always allocate a buffer when 
requested. The memory pool frees the extra allocated buffer when released if 
the number of pooled buffers is greater than the configured maximum batches.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968822443


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+// Delay the partition revocation because we don't revoke the 
already owned partitions

Review Comment:
   leaving this purely for documentation purpose.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -249,7 +249,14 @@ protected synchronized boolean 
ensureCoordinatorReady(final Timer timer) {
 throw fatalException;
 }
 final RequestFuture future = lookupCoordinator();
-client.poll(future, timer);
+
+// if we do not want to block on discovering coordinator at all,
+// then we should not try to poll in a loop, and should not throw 
wake-up exception either
+if (timer.timeoutMs() == 0L) {

Review Comment:
   Yeah, I feel it's a bit slippery to leave the logic in place with just a 
TODO somewhere to go back and fix it. Too many times, the follow-up never 
happens. Is it really that much additional effort to add an overload?
   
   ```java
   protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
 return ensureCoordinatorReady(timer, true);
   }
   
   private synchronized boolean ensureCoordinatorReady(final Timer timer, 
boolean checkWakeup) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r968778503


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -249,7 +249,14 @@ protected synchronized boolean 
ensureCoordinatorReady(final Timer timer) {
 throw fatalException;
 }
 final RequestFuture future = lookupCoordinator();
-client.poll(future, timer);
+
+// if we do not want to block on discovering coordinator at all,
+// then we should not try to poll in a loop, and should not throw 
wake-up exception either
+if (timer.timeoutMs() == 0L) {

Review Comment:
   Yeah, I feel it's a bit slippery to leave the logic in place with just a 
TODO somewhere to go back and fix it. Too many times, the follow-up never 
happens. Is it really that much additional effort to add an overload?
   
   ```java
   protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
 ensureCoordinatorReady(timer, true);
   }
   
   private synchronized boolean ensureCoordinatorReady(final Timer timer, 
boolean checkWakeup) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-12 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r968754388


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -769,6 +773,7 @@ private static class TopicPartitionState {
 private Long logStartOffset; // the log start offset
 private Long lastStableOffset;
 private boolean paused;  // whether this partition has been paused by 
the user
+private boolean consumable;

Review Comment:
   How about calling this `pendingRevocation` or something like that? That 
might make the usage clearer.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,94 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
+return partitions;
+}
+
+log.debug("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private Optional revokePartitions(SortedSet 
partitions, int generation, String memberId) {
+
 // the generation / member-id can possibly be reset by the heartbeat 
thread
 // upon getting errors or heartbeat timeouts; in this case whatever is 
previously
 // owned partitions would be lost, we should trigger the callback and 
cleanup the assignment;
 // otherwise we can proceed normally and revoke the partitions 
depending on the protocol,
 // and in that case we should only change the assignment AFTER the 
revoke callback is triggered
 // so that users can still access the previously owned partitions to 
commit offsets etc.
-Exception exception = null;
-final SortedSet revokedPartitions = new 
TreeSet<>(COMPARATOR);
+Optional exception = Optional.empty();
 if (generation == Generation.NO_GENERATION.generationId ||
-memberId.equals(Generation.NO_GENERATION.memberId)) {
-revokedPartitions.addAll(subscriptions.assignedPartitions());
-
-if (!revokedPartitions.isEmpty()) {
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+if (!partitions.isEmpty()) {
 log.info("Giving away all assigned partitions as lost since 
generation/memberID has been reset," +
-"indicating that consumer is in old state or no longer 
part of the group");
-exception = invokePartitionsLost(revokedPartitions);
-
+"indicating that consumer is in old state or no longer 
part of the group");
+exception = 
Optional.ofNullable(invokePartitionsLost(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 }
 } else {
 switch (protocol) {
 case EAGER:
-// revoke all partitions
-
revokedPartitions.addAll(subscriptions.assignedPartitions());
-exception = invokePartitionsRevoked(revokedPartitions);
-
+exception = 
Optional.ofNullable(invokePartitionsRevoked(partitions));
 subscriptions.assignFromSubscribed(Collections.emptySet());
 
 break;
 
 case COOPERATIVE:
-// only revoke those partitions that are not in the 
subscription any more.
 Set 

[jira] [Created] (KAFKA-14222) Exhausted BatchMemoryPool

2022-09-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-14222:
--

 Summary: Exhausted BatchMemoryPool
 Key: KAFKA-14222
 URL: https://issues.apache.org/jira/browse/KAFKA-14222
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.3.0


For a large number of topics and partition the broker can encounter this issue:
{code:java}
[2022-09-12 14:14:42,114] ERROR [BrokerMetadataSnapshotter id=4] Unexpected 
error handling CreateSnapshotEvent 
(kafka.server.metadata.BrokerMetadataSnapshotter)
org.apache.kafka.raft.errors.BufferAllocationException: Append failed because 
we failed to allocate memory to write the batch
at 
org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:161)
at 
org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:112)
at 
org.apache.kafka.snapshot.RecordsSnapshotWriter.append(RecordsSnapshotWriter.java:167)
at 
kafka.server.metadata.RecordListConsumer.accept(BrokerMetadataSnapshotter.scala:49)
at 
kafka.server.metadata.RecordListConsumer.accept(BrokerMetadataSnapshotter.scala:42)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:78)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:79)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:129)
at 
kafka.server.metadata.BrokerMetadataSnapshotter$CreateSnapshotEvent.run(BrokerMetadataSnapshotter.scala:116)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173)
at java.base/java.lang.Thread.run(Thread.java:829) {code}
This can happen because the snapshot is larger than {{{}5 * 8MB{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12624: KAFKA-14215; Ensure forwarded requests are applied to broker request quota

2022-09-12 Thread GitBox


hachikuji opened a new pull request, #12624:
URL: https://github.com/apache/kafka/pull/12624

   Currently forwarded requests are not applied to any quotas on either the 
controller or the broker. The controller-side throttling requires the 
controller to apply the quota changes from the log to the quota managers, which 
will be done separately. In this patch, we change the response logic on the 
broker side to also apply the broker's request quota. The enforced throttle 
time is the maximum of the throttle returned from the controller (which is 0 
until we fix the aforementioned issue) and the broker's request throttle time.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-09-12 Thread GitBox


jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r968682161


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws 
InterruptedException {
 } catch (ExecutionException e) {
 assertTrue(e.getCause() instanceof  TimeoutException);
 }
+
 runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
 assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-assertThrows(TimeoutException.class, commitResult::await);
+assertThrows(KafkaException.class, commitResult::await);
 
-assertTrue(transactionManager.hasAbortableError());
-assertTrue(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.hasFatalBumpableError());
+assertFalse(transactionManager.hasOngoingTransaction());
 assertFalse(transactionManager.isCompleting());
-assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
-
-prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
-prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
-runUntil(abortResult::isCompleted);
-assertTrue(abortResult.isSuccessful());
-assertFalse(transactionManager.hasOngoingTransaction());
-assertFalse(transactionManager.transactionContainsPartition(tp0));
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());

Review Comment:
   I'm wondering, is there a way that we could mitigate this server side? Is it 
possible to prevent writing the late records after the abort marker? I might be 
missing something though, so let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build

2022-09-12 Thread GitBox


C0urante commented on PR #12616:
URL: https://github.com/apache/kafka/pull/12616#issuecomment-1243995392

   @ijuma @jsancio I'm assuming we don't need to backport this to 3.3 since the 
immediate issues with Swagger deps and Connect OpenAPI docs generation have 
already been fixed on that branch; let me know if that's not the case, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build

2022-09-12 Thread GitBox


C0urante merged PR #12616:
URL: https://github.com/apache/kafka/pull/12616


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12616: KAFKA-14198: Define separate swagger configuration in Gradle build

2022-09-12 Thread GitBox


C0urante commented on PR #12616:
URL: https://github.com/apache/kafka/pull/12616#issuecomment-1243994114

   Failures appear unrelated, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603173#comment-17603173
 ] 

Ismael Juma edited comment on KAFKA-14221 at 9/12/22 4:21 PM:
--

It seems that we may also want to switch the reference in Apache Kafka's 
Jenkinsfile based on the latest updates to 
[https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix]


was (Author: ijuma):
Actually, it seems that we may want to switch the reference in Apache Kafka's 
Jenkinsfile based on the latest updates to 
https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603173#comment-17603173
 ] 

Ismael Juma commented on KAFKA-14221:
-

Actually, it seems that we may want to switch the reference in Apache Kafka's 
Jenkinsfile based on the latest updates to 
https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603170#comment-17603170
 ] 

Ismael Juma commented on KAFKA-14221:
-

We are running the latest JDK 17 build available as can be seen here:

[https://github.com/apache/kafka/blob/trunk/Jenkinsfile#L218]

An INFRA ticket can be filed to request an update, see 
https://issues.apache.org/jira/browse/INFRA-22363 for an example.

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-12 Thread GitBox


jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r968614919


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -325,7 +325,7 @@ class BrokerMetadataListener(
 try {
   _image = _delta.apply()
 } catch {
-  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+  case t: Throwable => throw 
metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta 
$delta", t)

Review Comment:
   @mumrah and I spoke offline about this. How about documenting the state of 
the broker at this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-12 Thread GitBox


mdedetrich commented on PR #12524:
URL: https://github.com/apache/kafka/pull/12524#issuecomment-1243928456

   Thanks for the comments, I am still get my head around the tests and what 
precisely is being tested.
   
   I forgot to mention earlier that `StreamTaskTest` needs more work (doing 
this now, will push changes). There are other fixes needed due to the 
`MockitoJUnitRunner.StrictStubs`, i.e. some tests throw exceptions and expect 
that exception path to never be hit and this needs to be replaced with 
`verify(times(0))` or something similar.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14216) Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc

2022-09-12 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14216.
--
Fix Version/s: 3.3
 Reviewer: Luke Chen
   Resolution: Fixed

> Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback 
> javadoc
> --
>
> Key: KAFKA-14216
> URL: https://issues.apache.org/jira/browse/KAFKA-14216
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, documentation
>Affects Versions: 3.3.0, 3.3
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.3
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14217) app-reset-tool.html should remove reference to --zookeeper flag that no longer exists

2022-09-12 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-14217.
--
Fix Version/s: 3.3
 Reviewer: Luke Chen
   Resolution: Fixed

> app-reset-tool.html should remove reference to --zookeeper flag that no 
> longer exists
> -
>
> Key: KAFKA-14217
> URL: https://issues.apache.org/jira/browse/KAFKA-14217
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, documentation
>Affects Versions: 3.3.0, 3.3
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.3
>
>
> app-reset-tool.html should remove reference to --zookeeper flag that no 
> longer exists



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe merged pull request #12617: KAFKA-14216: Remove ZK reference from org.apache.kafka.server.quota.ClientQuotaCallback javadoc

2022-09-12 Thread GitBox


cmccabe merged PR #12617:
URL: https://github.com/apache/kafka/pull/12617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12618: KAFKA-14217: app-reset-tool.html should remove reference to --zookeeper flag that no longer exists

2022-09-12 Thread GitBox


cmccabe merged PR #12618:
URL: https://github.com/apache/kafka/pull/12618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich updated KAFKA-14221:
---
Description: 
In a recent test (see 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
 run the JVM crashed with the following stack trace

 
{code:java}
[2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
Runtime Environment:
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, 
tid=7342
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
(17.0.1+12) (build 17.0.1+12-LTS-39)
[2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
(17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
class ptrs, parallel gc, linux-amd64)
[2022-09-12T14:22:22.414Z] # Problematic frame:
[2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone 
.part.0]+0x52
{code}
After some research online I found that there was a JDK bug filed for the same 
kind of crash, see 
[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]

This bug was fixed in JDK 17.0.2 which is a newer version than the one that is 
run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with the 
latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.

We should update both JDK 11 and JDk 17 to the latest version in the CI to see 
if this will solve the problem.

  was:
In a recent test (see 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
 run the JVM crashed with the following stack trace

 
{code:java}
[2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
Runtime Environment:
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, 
tid=7342
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
(17.0.1+12) (build 17.0.1+12-LTS-39)
[2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
(17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
class ptrs, parallel gc, linux-amd64)
[2022-09-12T14:22:22.414Z] # Problematic frame:
[2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone 
.part.0]+0x52
{code}
After some research online I found that there was a JDK bug filed for the same 
kind of crash, see 
[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]

This bug was fixed in JDK 17.0.2 which is a newer version than the one that is 
run in Apache Kafka CI (which is 17.0.1+12-LTS-39).

We should update both JDK 11 and JDk 17 to the latest version in the CI to see 
if this will solve the problem.


> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39). Note that locally with 
> the latest OpenJDK (specifically 17.0.4.1) I cannot reproduce this crash.
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603151#comment-17603151
 ] 

Matthew de Detrich commented on KAFKA-14221:


[~ijuma] Maybe you have some idea on how to update the JDK on the CI runner?

> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39).
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich updated KAFKA-14221:
---
Description: 
In a recent test (see 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
 run the JVM crashed with the following stack trace

 
{code:java}
[2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
Runtime Environment:
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, 
tid=7342
[2022-09-12T14:22:22.414Z] #
[2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
(17.0.1+12) (build 17.0.1+12-LTS-39)
[2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
(17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
class ptrs, parallel gc, linux-amd64)
[2022-09-12T14:22:22.414Z] # Problematic frame:
[2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone 
.part.0]+0x52
{code}
After some research online I found that there was a JDK bug filed for the same 
kind of crash, see 
[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]

This bug was fixed in JDK 17.0.2 which is a newer version than the one that is 
run in Apache Kafka CI (which is 17.0.1+12-LTS-39).

We should update both JDK 11 and JDk 17 to the latest version in the CI to see 
if this will solve the problem.

  was:
In a recent test (see 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
 run the JVM crashed with the following stack trace


[2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
Runtime Environment:

[2022-09-12T14:22:22.414Z] #

[2022-09-12T14:22:22.414Z] #  SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, 
tid=7342

[2022-09-12T14:22:22.414Z] #

[2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
(17.0.1+12) (build 17.0.1+12-LTS-39)

[2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
(17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
class ptrs, parallel gc, linux-amd64)

[2022-09-12T14:22:22.414Z] # Problematic frame:

[2022-09-12T14:22:22.415Z] # V  [libjvm.so+0xcc6b12]  
PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone 
.part.0]+0x52
After some research online I found that there was a JDK bug filed for the same 
kind of crash, see 
[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]

This bug was fixed in JDK 17.0.2 which is a newer version than the one that is 
run in Apache Kafka CI (which is 17.0.1+12-LTS-39).

We should update both JDK 11 and JDk 17 to the latest version in the CI to see 
if this will solve the problem.


> Update Apache Kafka JVM version in CI to latest versions
> 
>
> Key: KAFKA-14221
> URL: https://issues.apache.org/jira/browse/KAFKA-14221
> Project: Kafka
>  Issue Type: Task
>Reporter: Matthew de Detrich
>Priority: Major
>
> In a recent test (see 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
>  run the JVM crashed with the following stack trace
>  
> {code:java}
> [2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
> Runtime Environment:
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # SIGSEGV (0xb) at pc=0x7f982a771b12, 
> pid=6229, tid=7342
> [2022-09-12T14:22:22.414Z] #
> [2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
> (17.0.1+12) (build 17.0.1+12-LTS-39)
> [2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
> (17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
> class ptrs, parallel gc, linux-amd64)
> [2022-09-12T14:22:22.414Z] # Problematic frame:
> [2022-09-12T14:22:22.415Z] # V [libjvm.so+0xcc6b12] 
> PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) 
> [clone .part.0]+0x52
> {code}
> After some research online I found that there was a JDK bug filed for the 
> same kind of crash, see 
> [https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]
> This bug was fixed in JDK 17.0.2 which is a newer version than the one that 
> is run in Apache Kafka CI (which is 17.0.1+12-LTS-39).
> We should update both JDK 11 and JDk 17 to the latest version in the CI to 
> see if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14221) Update Apache Kafka JVM version in CI to latest versions

2022-09-12 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-14221:
--

 Summary: Update Apache Kafka JVM version in CI to latest versions
 Key: KAFKA-14221
 URL: https://issues.apache.org/jira/browse/KAFKA-14221
 Project: Kafka
  Issue Type: Task
Reporter: Matthew de Detrich


In a recent test (see 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1215/pipeline/16)]
 run the JVM crashed with the following stack trace


[2022-09-12T14:22:22.414Z] # A fatal error has been detected by the Java 
Runtime Environment:

[2022-09-12T14:22:22.414Z] #

[2022-09-12T14:22:22.414Z] #  SIGSEGV (0xb) at pc=0x7f982a771b12, pid=6229, 
tid=7342

[2022-09-12T14:22:22.414Z] #

[2022-09-12T14:22:22.414Z] # JRE version: Java(TM) SE Runtime Environment 
(17.0.1+12) (build 17.0.1+12-LTS-39)

[2022-09-12T14:22:22.414Z] # Java VM: Java HotSpot(TM) 64-Bit Server VM 
(17.0.1+12-LTS-39, mixed mode, sharing, tiered, compressed oops, compressed 
class ptrs, parallel gc, linux-amd64)

[2022-09-12T14:22:22.414Z] # Problematic frame:

[2022-09-12T14:22:22.415Z] # V  [libjvm.so+0xcc6b12]  
PhaseIdealLoop::spinup(Node*, Node*, Node*, Node*, Node*, small_cache*) [clone 
.part.0]+0x52
After some research online I found that there was a JDK bug filed for the same 
kind of crash, see 
[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8270886]

This bug was fixed in JDK 17.0.2 which is a newer version than the one that is 
run in Apache Kafka CI (which is 17.0.1+12-LTS-39).

We should update both JDK 11 and JDk 17 to the latest version in the CI to see 
if this will solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12524: KAFKA-14133: Replace EasyMock with Mockito in streams test

2022-09-12 Thread GitBox


cadonna commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r968474491


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##
@@ -308,29 +300,21 @@ public void shouldRestoreTimestampedStoreWithConverter() {
 @Test
 public void shouldUnregisterChangelogsDuringClose() {
 final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-reset(storeMetadata);
-final StateStore store = EasyMock.createMock(StateStore.class);
-
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-expect(storeMetadata.store()).andStubReturn(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-
-context.uninitialize();
-store.init((StateStoreContext) context, store);
-replay(storeMetadata, context, store);
+final StateStore store = mock(StateStore.class);
+when(store.name()).thenReturn(persistentStoreName);
 
 stateMgr.registerStateStores(singletonList(store), context);
-verify(context, store);
+verify(context).uninitialize();
+verify(store).init((StateStoreContext) context, store);
 
 stateMgr.registerStore(store, noopStateRestoreCallback, null);
 
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
 reset(store);
-expect(store.name()).andStubReturn(persistentStoreName);
-store.close();
-replay(store);
+when(store.name()).thenReturn(persistentStoreName);

Review Comment:
   Is the `reset()` and the stubbing of `store.name()` still needed? The 
stubbing is the same as on line 304. Since we are not doing a new replay, I am 
not sure that we still need the `reset()` with Mockito. You think you can moves 
all `verify()` to the end of the method. 



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -195,12 +196,11 @@ public void shouldTransitToRunningAfterInitialization() {
 
 assertEquals(RUNNING, task.state());
 
-EasyMock.verify(stateManager);
+verify(stateManager, atLeastOnce()).registerStateStores(any(), any());

Review Comment:
   ```suggestion
   verify(stateManager).registerStateStores(any(), any());
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() {
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
 stateManager.flush();
-EasyMock.expectLastCall().once();
+verify(stateManager, times(1)).flush();

Review Comment:
   Again, you make a call on the mock and verify that you made the call. In 
EasyMock you need to make the calls on the mock to specify what calls you 
expect. The call to `replay()` ends the specification phase. When you 
`verify()`, you verify that the call under test indeed called to methods you 
specified. 
   This is not the case in Mockito. In Mockito the mocks record the calls and 
you verify what was called. Making calls on the mock directly in the test 
method does not make sense. The methods of the mocks should be called within 
the call under test.  



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -212,31 +212,29 @@ public void shouldThrowIfCommittingOnIllegalState() {
 @Test
 public void shouldAlwaysCheckpointStateIfEnforced() {
 stateManager.flush();
-EasyMock.expectLastCall().once();
+verify(stateManager, times(1)).flush();
 stateManager.checkpoint();
-EasyMock.expectLastCall().once();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-EasyMock.replay(stateManager);
+verify(stateManager, times(1)).checkpoint();
+
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());

Review Comment:
   Again here.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##
@@ -177,10 +181,7 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
 @Test
 public void shouldTransitToRunningAfterInitialization() {
-
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-stateManager.registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject());
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-EasyMock.replay(stateManager);
+stateManager.registerStateStores(any(), any());

Review Comment:
   This is also a call on the mock which makes the verification on line 199 
always true, independently of the call under test.



##

[GitHub] [kafka] divijvaidya commented on pull request #12591: MINOR: Replace usage of File.createTempFile() with TestUtils.tempFile()

2022-09-12 Thread GitBox


divijvaidya commented on PR #12591:
URL: https://github.com/apache/kafka/pull/12591#issuecomment-1243849390

   Thank you both for your review.
   
   Since, we agree up to pick up this incremental change, I am assuming no 
action is required from my side before you approve & merge?
   
   Note that the test failures are unrelated and I was able to test them 
successfully on local machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-12 Thread GitBox


junrao commented on PR #12570:
URL: https://github.com/apache/kafka/pull/12570#issuecomment-1243846932

   Thanks for the feedback @Kaiserchen . Could you run your MM test by changing 
`last == null || last.isFull()` to `deque.size() > 1 || last == null || 
last.isFull()` and see if it's still functioning?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >