[jira] [Commented] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest

2022-07-14 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17567091#comment-17567091
 ] 

Christo Lolov commented on KAFKA-14058:
---

Okay, thank you for the suggestion, I will have a look at them!

> Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
> --
>
> Key: KAFKA-14058
> URL: https://issues.apache.org/jira/browse/KAFKA-14058
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12411: KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica

2022-07-14 Thread GitBox


hachikuji opened a new pull request, #12411:
URL: https://github.com/apache/kafka/pull/12411

   After the fix for https://github.com/apache/kafka/pull/12150, if a follower 
receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH 
even if the leader epoch matches. We need to do epoch leader/epoch validation 
first before we check whether we have a valid replica.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14078) Replica fetches to follower should return NOT_LEADER error

2022-07-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14078:
---

 Summary: Replica fetches to follower should return NOT_LEADER error
 Key: KAFKA-14078
 URL: https://issues.apache.org/jira/browse/KAFKA-14078
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.3.0


After the fix for KAFKA-13837, if a follower receives a request from another 
replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. 
We need to do epoch leader/epoch validation first before we check whether we 
have a valid replica.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-14 Thread GitBox


showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1185155731

   @dajac @guozhangwang , do you want to have another look at this PR? You can 
also check this comment to know what we're trying to do in this PR: 
https://github.com/apache/kafka/pull/12349#pullrequestreview-1028116488. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14024:
--
Description: 
Hi 

In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
that consumer#poll(duration) will be returned after the provided duration. It's 
because if rebalance needed, we'll try to commit current offset first before 
rebalance synchronously. And if the offset committing takes too long, the 
consumer#poll will spend more time than provided duration. To fix that, we 
change commit sync with commit async before rebalance (i.e. onPrepareJoin).

 

However, in this ticket, we found the async commit will keep sending a new 
commit request during each Consumer#poll, because the offset commit never 
completes in time. The impact is that the existing consumer will be kicked out 
of the group after rebalance timeout without joining the group. That is, 
suppose we have consumer A in group G, and now consumer B joined the group, 
after the rebalance, only consumer B in the group.

 

The workaround for this issue is to change the assignor back to eager 
assignors, ex: StickyAssignor, RoundRobinAssignor.

 

To fix the issue, we come out 2 solutions:
 # we can explicitly wait for the async commit complete in onPrepareJoin, but 
that would let the KAFKA-13310 issue happen again.
 # 2.we can try to keep the async commit offset future currently inflight. So 
that we can make sure each Consumer#poll, we are waiting for the future 
completes

 

Besides, there's also another bug found during fixing this bug. Before 
KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when 
retriable error until timeout. After KAFKA-13310, we thought we have retry, but 
we'll retry after partitions revoking. That is, even though the retried offset 
commit successfully, it still causes some partitions offsets un-committed, and 
after rebalance, other consumers will consume overlapping records.

 

 

===

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]

 

we didn't wait for client to receive commit offset response here, so 
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
client will loop in invoking onJoinPrepare.

I think the EAGER mode don't have this problem because it will revoke the 
partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to 
commit next round.

reproduce:
 * single node Kafka version 3.2.0 && client version 3.2.0
 * topic1 have 5 partititons
 * start a consumer1 (cooperative rebalance)
 * start another consumer2 (same consumer group)
 * consumer1 will hang for a long time before re-join
 * from server log consumer1 rebalance timeout before joineGroup and re-join 
with another memberId

consume1's log keeps printing:

16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
(ConsumerCoordinator.java:1143)

 

and coordinator's log:

[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 56 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic 
members who haven't joined: 
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
generation 57 (__consumer_offsets-30) with 3 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group xxx in CompletingRebalance state. Created a new 
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
generation 57. The group has 3 members, 0 of which are static. 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in 

[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-14 Thread GitBox


showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r921765863


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+}
+
+// wait for commit offset response before timer.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
 }
 
+// return false when:
+//   1. offset commit haven't done
+//   2. offset commit failed with retriable exception and joinPrepare 
haven't expired
+boolean onJoinPrepareAsyncCommitCompleted = true;
+if (autoCommitOffsetRequestFuture != null) {
+if (!autoCommitOffsetRequestFuture.isDone()) {
+onJoinPrepareAsyncCommitCompleted = false;
+} else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+onJoinPrepareAsyncCommitCompleted = 
joinPrepareTimer.isExpired();
+} else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+log.error("Asynchronous auto-commit of offsets failed: {}", 
autoCommitOffsetRequestFuture.exception().getMessage());

Review Comment:
   These 2 else if conditions are the same. Please fix it.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+}
+
+// wait for commit offset response before timer.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
 }
 
+// return false when:
+//   1. offset commit haven't done
+//   2. offset commit failed with retriable exception and joinPrepare 
haven't expired
+boolean onJoinPrepareAsyncCommitCompleted = true;
+if (autoCommitOffsetRequestFuture != null) {
+if (!autoCommitOffsetRequestFuture.isDone()) {
+onJoinPrepareAsyncCommitCompleted = false;

Review Comment:
   I think we should also check `joinPrepareTimer.isExpired();` here, right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +743,45 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-

[GitHub] [kafka] showuon commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-14 Thread GitBox


showuon commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1185110992

   > I've been able to get one green run of the test locally, but all other 
attempts have failed with timeouts, even when bumping the permitted duration 
for worker startup from one minute to five.
   > 
   > I also fixed a typo that would have broken the `test_bounce` case.
   
   Yes, that's what I saw when running in my local env. I think we need to make 
sure it works well before we can merge it.
   
   @jsancio , this is the last PR for KIP-618. We'd like to put this into v3.3, 
but needs to make sure the new added/updated system tests didn't break any 
test. Could you help run it and confirm it? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1185100303

   @junrao , thanks for your review! 
   @tombentley , do you want to have another look?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-14 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1185094040

   I've been able to get one green run of the test locally, but all other 
attempts have failed with timeouts, even when bumping the permitted duration 
for worker startup from one minute to five.
   
   I also fixed a typo that would have broken the `test_bounce` case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #12410: MINOR: Remove unused ShutdownableThread class and ineffective ThreadedTest classes

2022-07-14 Thread GitBox


C0urante opened a new pull request, #12410:
URL: https://github.com/apache/kafka/pull/12410

   The `ShutdownableThread` class isn't used anywhere outside of tests, and the 
`ThreadedTest` class only works in cases where the logic that's being tested 
uses a `ShutdownableThread`. Both of these classes are removed, and the 
`DistributedHerderTest` class is updated to properly report unexpected 
exceptions that take place on other threads (which appears to be the original 
purpose of the `ThreadedTest` class, although it was not actually doing this 
anywhere).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito

2022-07-14 Thread GitBox


C0urante commented on PR #12409:
URL: https://github.com/apache/kafka/pull/12409#issuecomment-1185021831

   @clolov since you've been working on the JUnit migration for Streams, would 
you be interested in reviewing this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito

2022-07-14 Thread GitBox


C0urante opened a new pull request, #12409:
URL: https://github.com/apache/kafka/pull/12409

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14058)
   
   Some notes:
   
   1. Introduced new `ConcurrencyUtils` and `MockitoUtils` classes for reusable 
testing logic that will likely be used in the near future for 
[KAFKA-14059](https://issues.apache.org/jira/browse/KAFKA-14059) and 
[KAFKA-14060](https://issues.apache.org/jira/browse/KAFKA-14060).
   2. Refactored a lot of common logic into dedicated methods, which reduces 
test size and should make tests easier to write.
   3. Doubled the default record batch size from 1 to 2. This provides better 
coverage and, after all the refactoring from step 2, required no modifications 
to mocking, verification, or assertion logic anywhere in the test suite.
   4. Stopped inheriting from the `ThreadedTest` class as it does nothing.
   5. Once this looks good enough to merge, I'll begin applying the same 
changes to the `WorkerSourceTaskTest` and possibly 
`AbstractWorkerSourceTaskTest` test suites.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13888) KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag

2022-07-14 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13888:

Priority: Blocker  (was: Major)

> KIP-836: Addition of Information in DescribeQuorumResponse about Voter Lag
> --
>
> Key: KAFKA-13888
> URL: https://issues.apache.org/jira/browse/KAFKA-13888
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Niket Goel
>Assignee: lqjacklee
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Tracking issue for the implementation of KIP:836



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14077) KRaft should support recovery from failed disk

2022-07-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14077:
---

 Summary: KRaft should support recovery from failed disk
 Key: KAFKA-14077
 URL: https://issues.apache.org/jira/browse/KAFKA-14077
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
 Fix For: 3.3.0


If one of the nodes in the metadata quorum has a disk failure, there is no way 
currently to safely bring the node back into the quorum. When we lose disk 
state, we are at risk of losing committed data even if the failure only affects 
a minority of the cluster.

Here is an example. Suppose that a metadata quorum has 3 members: v1, v2, and 
v3. Initially, v1 is the leader and writes a record at offset 1. After v2 
acknowledges replication of the record, it becomes committed. Suppose that v1 
fails before v3 has a chance to replicate this record. As long as v1 remains 
down, the raft protocol guarantees that only v2 can become leader, so the 
record cannot be lost. The raft protocol expects that when v1 returns, it will 
still have that record, but what if there is a disk failure, the state cannot 
be recovered and v1 participates in leader election? Then we would have 
committed data on a minority of the voters. The main problem here concerns how 
we recover from this impaired state without risking the loss of this data.

Consider a naive solution which brings v1 back with an empty disk. Since the 
node has lost is prior knowledge of the state of the quorum, it will vote for 
any candidate that comes along. If v3 becomes a candidate, then it will vote 
for itself and it just needs the vote from v1 to become leader. If that 
happens, then the committed data on v2 will become lost.

This is just one scenario. In general, the invariants that the raft protocol is 
designed to preserve go out the window when disk state is lost. For example, it 
is also possible to contrive a scenario where the loss of disk state leads to 
multiple leaders. There is a good reason why raft requires that any vote cast 
by a voter is written to disk since otherwise the voter may vote for different 
candidates in the same epoch.

Many systems solve this problem with a unique identifier which is generated 
automatically and stored on disk. This identifier is then committed to the raft 
log. If a disk changes, we would see a new identifier and we can prevent the 
node from breaking raft invariants. Then recovery from a failed disk requires a 
quorum reconfiguration. We need something like this in KRaft to make disk 
recovery possible.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-07-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566989#comment-17566989
 ] 

Guozhang Wang commented on KAFKA-13846:
---

Hi Jose the PR has been merged actually so we should close this ticket. I will 
go ahead and do it.

> Add an overloaded metricOrElseCreate function in Metrics
> 
>
> Key: KAFKA-13846
> URL: https://issues.apache.org/jira/browse/KAFKA-13846
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
>
> The `Metrics` registry is often used by concurrent threads, however it's 
> get/create APIs are not well suited for it. A common pattern from the user 
> today is:
> {code}
> metric = metrics.metric(metricName);
> if (metric == null) {
>   try {
> metrics.createMetric(..)
>   } catch (IllegalArgumentException e){
> // another thread may create the metric at the mean time
>   }
> } 
> {code}
> Otherwise the caller would need to synchronize the whole block trying to get 
> the metric. However, the `createMetric` function call itself indeed 
> synchronize internally on updating the metric map.
> So we could consider adding a metricOrElseCreate function which is similar to 
> createMetric, but instead of throwing an illegal argument exception within 
> the internal synchronization block, it would just return the already existing 
> metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-07-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13846.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

> Add an overloaded metricOrElseCreate function in Metrics
> 
>
> Key: KAFKA-13846
> URL: https://issues.apache.org/jira/browse/KAFKA-13846
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
> Fix For: 3.3.0
>
>
> The `Metrics` registry is often used by concurrent threads, however it's 
> get/create APIs are not well suited for it. A common pattern from the user 
> today is:
> {code}
> metric = metrics.metric(metricName);
> if (metric == null) {
>   try {
> metrics.createMetric(..)
>   } catch (IllegalArgumentException e){
> // another thread may create the metric at the mean time
>   }
> } 
> {code}
> Otherwise the caller would need to synchronize the whole block trying to get 
> the metric. However, the `createMetric` function call itself indeed 
> synchronize internally on updating the metric map.
> So we could consider adding a metricOrElseCreate function which is similar to 
> createMetric, but instead of throwing an illegal argument exception within 
> the internal synchronization block, it would just return the already existing 
> metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-14 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566986#comment-17566986
 ] 

Jim Hughes commented on KAFKA-14076:


CloseOptions was introduced in 
https://github.com/apache/kafka/commit/9dc332f5ca34b80af369646f767c40c6b189f831.

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-14 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r921566920


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {

Review Comment:
   I'm not wild about this IT as written.  I copied from the 
`AbstractResetIntegrationTest` and I'd be happy to hear a suggestion on how to 
make a more minimal test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y opened a new pull request, #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-14 Thread GitBox


jnh5y opened a new pull request, #12408:
URL: https://github.com/apache/kafka/pull/12408

   * Addresses issues with `KafkaStreams.close(CloseOptions)`.
   * Adds an integration test for this new functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-14 Thread Jim Hughes (Jira)
Jim Hughes created KAFKA-14076:
--

 Summary: Fix issues with KafkaStreams.CloseOptions
 Key: KAFKA-14076
 URL: https://issues.apache.org/jira/browse/KAFKA-14076
 Project: Kafka
  Issue Type: Bug
Reporter: Jim Hughes


The new `close(CloseOptions)` function has a few bugs.  
([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]

Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] peternied opened a new pull request, #12407: [BUG] Remove duplicate common.message.* from clients:test jar file

2022-07-14 Thread GitBox


peternied opened a new pull request, #12407:
URL: https://github.com/apache/kafka/pull/12407

   When consuming both `kafka-client:3.0.1` and `kafka-client:3.0.1:test`
   through maven a hygene tool was detecting multiple instances of the same
   class loaded into the classpath.
   
   Verified this change by building locally with a before and after build with 
`./gradlew clients:publishToMavenLocal`, then used beyond compare to verify the 
contents.
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
  - Minor change to existing build process, the java classes was duplicated 
and unused.
   - [X] Verify test coverage and CI build status
  - There should be no changes in test coverage and CI build status.
   - [X] Verify documentation (including upgrade notes)
  - No documentation updates need to be made


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14075) Consumer Group deletion does not delete pending transactional offset commits

2022-07-14 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-14075:


 Summary: Consumer Group deletion does not delete pending 
transactional offset commits
 Key: KAFKA-14075
 URL: https://issues.apache.org/jira/browse/KAFKA-14075
 Project: Kafka
  Issue Type: Bug
Reporter: Jeff Kim
Assignee: Jeff Kim


In 
[GroupMetadata.removeAllOffsets()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L729-L740]
 we pass in the offsets cache to delete pendingTransactionalOffsetCommits upon 
group deletion. So only transactional offset commits for topic partitions 
already in the offsets cache will be deleted.

However, we add a transactional offset commit to the offsets cache only after 
the commit/abort marker is written to the log in 
[GroupMetadata.completePendingTxnOffsetCommit()|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L692]

So even after a group deletion we can still have pending transactional offset 
commits for a group that's supposed to be deleted. The group metadata manager 
will throw an IllegalStateException 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L740]
 while loading group to memory. We will hit this exception on every load to 
group as long as the hanging transaction never completes. 

We should delete all pending transactional offset commits (instead of only 
topic partitions that exist in the offsets cache) when a group is deleted in 
GroupMetadata.removeOffsets()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

2022-07-14 Thread GitBox


C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921456034


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
 public void addMetricsRecorder(final RocksDBMetricsRecorder 
metricsRecorder) {
 final String metricsRecorderName = 
metricsRecorderName(metricsRecorder);
-if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = 
metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   This is a nice improvement in readability, but are we certain it's 
necessary? I commented on the change to the plugin scanning logic in Connect 
because I'm familiar with that part of the code base; I don't have the same 
familiarity with Streams, though.
   
   I think it's fine to merge this change, but if this method isn't intended to 
be invoked concurrently, we should modify the PR title so that the commit 
message doesn't imply this is a bug fix and instead recognizes it as a cosmetic 
improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

2022-07-14 Thread GitBox


C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
 public void addMetricsRecorder(final RocksDBMetricsRecorder 
metricsRecorder) {
 final String metricsRecorderName = 
metricsRecorderName(metricsRecorder);
-if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = 
metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   ~Does this change behavior? Potential concurrency bug aside (which I'm not 
sure is actually a bug, since it's unclear if we expect this method to be 
called concurrently), it looks like we're going from failing _before_ 
overwriting values to now failing _after_ overwriting them. Is there any 
fallout from that or is it a benign change?~
   
   Never mind, had to refresh my understanding of `Map::putIfAbsent`. This does 
not change behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram opened a new pull request, #12406: MINOR: Fix options for old-style Admin.listConsumerGroupOffsets

2022-07-14 Thread GitBox


rajinisivaram opened a new pull request, #12406:
URL: https://github.com/apache/kafka/pull/12406

   This fixes the options set in commit 
https://github.com/apache/kafka/commit/beac86f049385932309158c1cb49c8657e53f45f 
for the old style API. Timeout was not being copied to the new options. The 
copy is error-prone, so changing to use the provided options directly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

2022-07-14 Thread GitBox


C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
 public void addMetricsRecorder(final RocksDBMetricsRecorder 
metricsRecorder) {
 final String metricsRecorderName = 
metricsRecorderName(metricsRecorder);
-if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = 
metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   Does this change behavior? Potential concurrency bug aside (which I'm not 
sure is valid, since it's unclear if we expect this method to be called 
concurrently), it looks like we're going from failing _before_ overwriting 
values to now failing _after_ overwriting them. Is there any fallout from that 
or is it a benign change?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
 public void addMetricsRecorder(final RocksDBMetricsRecorder 
metricsRecorder) {
 final String metricsRecorderName = 
metricsRecorderName(metricsRecorder);
-if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = 
metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   Does this change behavior? Potential concurrency bug aside (which I'm not 
sure is actually a bug, since it's unclear if we expect this method to be 
called concurrently), it looks like we're going from failing _before_ 
overwriting values to now failing _after_ overwriting them. Is there any 
fallout from that or is it a benign change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets

2022-07-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566896#comment-17566896
 ] 

Chris Egerton commented on KAFKA-1:
---

Hi [~jagsancio]! We can keep it. All of the functional changes for this ticket 
have been merged as well as integration tests for it; the only remaining PR is 
one to add system tests: [https://github.com/apache/kafka/pull/11783], which 
will likely be merged this week or next. If it makes for easier bookkeeping, I 
can file a separate non-blocker ticket for those system tests and mark this one 
done.

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries

2022-07-14 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566885#comment-17566885
 ] 

Justine Olshan commented on KAFKA-14074:


Thanks for bringing this up [~prestona]. This issue has been brought to my 
attention for ZK clusters but not KRaft as well. KRaft was built to better 
handle this issue it seems. I have been looking into ways to mitigate the issue 
in ZK clusters.

> Restarting a broker during re-assignment can leave log directory entries
> 
>
> Key: KAFKA-14074
> URL: https://issues.apache.org/jira/browse/KAFKA-14074
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.1.0
>Reporter: Adrian Preston
>Priority: Major
>
> Re-starting a broker while replicas are being assigned away from the broker 
> can result in topic partition directories being left in the broker’s log 
> directory. This can trigger further problems if such a topic is deleted and 
> re-created. These problems occur when replicas for the new topic are placed 
> on a broker that hosts a “stale” topic partition directory of the same name, 
> causing the on-disk topic partition state held by different brokers in the 
> cluster to diverge.
> We have also been able to re-produce variants this problem using Kafka 2.8 
> and 3.1, as well as Kafka built from the head of the apache/kafka repository 
> (at the time of writing this is commit: 
> 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to 
> re-produce this problem with Kafka running in KRaft mode.
> A minimal re-create for topic directories being left on disk is as follows:
>  # Start ZooKeeper and a broker (both using the sample config)
>  # Create 100 topics: each with 1 partition, and with replication factor 1
>  # Add a second broker to the Kafka cluster (with minor edits to the sample 
> config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}})
>  # Issue a re-assignment that moves all of the topic partition replicas  from 
> the first broker to the second broker
>  # While this re-assignment is taking place shutdown the first broker (you 
> need to be quick with only two brokers and 100 topics…)
>  # Wait a few seconds for the re-assignment to stall
>  # Restart the first broker and wait for the re-assignment to complete and it 
> to remove any partially deleted topics (e.g. those with a “-delete” suffix).
> Inspecting the logs directory for the first broker should show directories 
> corresponding to topic partitions that are owned by the second broker. These 
> are not cleaned up when the re-assignment completes, and also remain in the 
> logs directory even if the first broker is restarted.  Deleting the topic 
> also does not clean up the topic partitions left behind on the first broker - 
> which leads to a second potential problem.
> For topics that have more than one replica: a new topic that has the same 
> name as a previously deleted topic might have replicas created on a broker 
> with “stale” topic partition directories. If this happens these topics will 
> remain in an under-replicated state.
> A minimal re-create for this is as follows:
>  # Create a three node Kafka cluster (backed by ZK) based off the sample 
> config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2)
>  # Create 100 topics: each with 1 partition, and with replication factor 2
>  # Submit a re-assignment to move all of the topic partition replicas to 
> kafka-0 and kafka-1,  and wait for it to complete
>  # Submit a re-assignment to move all of the topic partition replicas on 
> kafka-0 to kafka-2.
>  # While this re-assignment is taking place shutdown and re-start kafka-0.
>  # Wait for the re-assignment to complete, and check that there’s unexpected 
> topic partition directories in kafka-0’s logs directory
>  # Delete all 100 topics, and re-create 100 new topics with the same name and 
> configuration as the deleted topics.
> In this state kafka-1 and kafka-2 continually generate log messages similar 
> to:
> {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition 
> test-039-0. This error may be returned transiently when the partition is 
> being created or deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread)}}
> Topics that have had replicas created on kafka-0 are under-replicated with 
> kafka-0 missing from the ISR list. Performing a rolling restart of each 
> broker in turn does not resolve the problem, in fact more partitions are 
> listed as under-replicated, as before kafka-0 is missing from their ISR list.
> I also tried to re-create this with Kafka running in Kraft mode, but was 
> unable to do so. My test configuration was three 

[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries

2022-07-14 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566887#comment-17566887
 ] 

Justine Olshan commented on KAFKA-14074:


Is this the same issue as https://issues.apache.org/jira/browse/KAFKA-13972?

> Restarting a broker during re-assignment can leave log directory entries
> 
>
> Key: KAFKA-14074
> URL: https://issues.apache.org/jira/browse/KAFKA-14074
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.1.0
>Reporter: Adrian Preston
>Priority: Major
>
> Re-starting a broker while replicas are being assigned away from the broker 
> can result in topic partition directories being left in the broker’s log 
> directory. This can trigger further problems if such a topic is deleted and 
> re-created. These problems occur when replicas for the new topic are placed 
> on a broker that hosts a “stale” topic partition directory of the same name, 
> causing the on-disk topic partition state held by different brokers in the 
> cluster to diverge.
> We have also been able to re-produce variants this problem using Kafka 2.8 
> and 3.1, as well as Kafka built from the head of the apache/kafka repository 
> (at the time of writing this is commit: 
> 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to 
> re-produce this problem with Kafka running in KRaft mode.
> A minimal re-create for topic directories being left on disk is as follows:
>  # Start ZooKeeper and a broker (both using the sample config)
>  # Create 100 topics: each with 1 partition, and with replication factor 1
>  # Add a second broker to the Kafka cluster (with minor edits to the sample 
> config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}})
>  # Issue a re-assignment that moves all of the topic partition replicas  from 
> the first broker to the second broker
>  # While this re-assignment is taking place shutdown the first broker (you 
> need to be quick with only two brokers and 100 topics…)
>  # Wait a few seconds for the re-assignment to stall
>  # Restart the first broker and wait for the re-assignment to complete and it 
> to remove any partially deleted topics (e.g. those with a “-delete” suffix).
> Inspecting the logs directory for the first broker should show directories 
> corresponding to topic partitions that are owned by the second broker. These 
> are not cleaned up when the re-assignment completes, and also remain in the 
> logs directory even if the first broker is restarted.  Deleting the topic 
> also does not clean up the topic partitions left behind on the first broker - 
> which leads to a second potential problem.
> For topics that have more than one replica: a new topic that has the same 
> name as a previously deleted topic might have replicas created on a broker 
> with “stale” topic partition directories. If this happens these topics will 
> remain in an under-replicated state.
> A minimal re-create for this is as follows:
>  # Create a three node Kafka cluster (backed by ZK) based off the sample 
> config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2)
>  # Create 100 topics: each with 1 partition, and with replication factor 2
>  # Submit a re-assignment to move all of the topic partition replicas to 
> kafka-0 and kafka-1,  and wait for it to complete
>  # Submit a re-assignment to move all of the topic partition replicas on 
> kafka-0 to kafka-2.
>  # While this re-assignment is taking place shutdown and re-start kafka-0.
>  # Wait for the re-assignment to complete, and check that there’s unexpected 
> topic partition directories in kafka-0’s logs directory
>  # Delete all 100 topics, and re-create 100 new topics with the same name and 
> configuration as the deleted topics.
> In this state kafka-1 and kafka-2 continually generate log messages similar 
> to:
> {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition 
> test-039-0. This error may be returned transiently when the partition is 
> being created or deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread)}}
> Topics that have had replicas created on kafka-0 are under-replicated with 
> kafka-0 missing from the ISR list. Performing a rolling restart of each 
> broker in turn does not resolve the problem, in fact more partitions are 
> listed as under-replicated, as before kafka-0 is missing from their ISR list.
> I also tried to re-create this with Kafka running in Kraft mode, but was 
> unable to do so. My test configuration was three brokers configured based on 
> /config/kraft/server.properties. All three brokers were part of the 
> controller quorum. Interestingly I see log lines like the following when 
> 

[GitHub] [kafka] ocadaruma opened a new pull request, #12405: KAFKA-13572 Fix negative preferred replica imbalanced count metric

2022-07-14 Thread GitBox


ocadaruma opened a new pull request, #12405:
URL: https://github.com/apache/kafka/pull/12405

   * Currently, `preferredReplicaImbalanceCount` calculation has a race that 
becomes negative when topic deletion is initiated simultaneously.
   * Please refer [KAFKA-13572's 
comment](https://issues.apache.org/jira/browse/KAFKA-13572?focusedCommentId=17566872=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17566872)
 for the details
   * This PR addresses the problem by fixing 
`cleanPreferredReplicaImbalanceMetric` to be called only once per 
topic-deletion procedure
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13572) Negative value for 'Preferred Replica Imbalance' metric

2022-07-14 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566872#comment-17566872
 ] 

Haruki Okada commented on KAFKA-13572:
--

We experienced similar phenomenon in our Kafka cluster and we found that 
following scenario can cause negative metric.

Let's say there are topic-A, topic-B.

 
 # Initiate topic deletion of topic-A
 ** TopicDeletionManager#enqueueTopicsForDeletion is called with argument 
Set(topic-A)
 *** 
[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/KafkaController.scala#L1771]
 # During topic-A's deletion procedure, topic-A's all partitions are marked as 
Offline (Leader = -1)
 ** 
[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L368]
 # Before topic-A's deletion procedure completes, initiate topic deletion of 
topic-B
 ** Since topic-A's ZK delete-topic node still exists, 
TopicDeletionManager#enqueueTopicsForDeletion is called with argument 
Set(topic-A, topic-B)
 ** ControllerContext#cleanPreferredReplicaImbalanceMetric is called for both 
topic-A, topic-B
 *** 
[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/controller/ControllerContext.scala#L496]
 *** Since topic-A is now NoLeader, `!hasPreferredLeader(replicaAssignment, 
leadershipInfo)` evaluates to true, then `preferredReplicaImbalanceCount` is 
decremented unexpectedly

> Negative value for 'Preferred Replica Imbalance' metric
> ---
>
> Key: KAFKA-13572
> URL: https://issues.apache.org/jira/browse/KAFKA-13572
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Siddharth Ahuja
>Priority: Major
> Attachments: 
> kafka_negative_preferred-replica-imbalance-count_jmx_2.JPG
>
>
> A negative value (-822) for the metric - 
> {{kafka_controller_kafkacontroller_preferredreplicaimbalancecount}} has been 
> observed - please see the attached screenshot and the output below:
> {code:java}
> $ curl -s http://localhost:9101/metrics | fgrep 
> 'kafka_controller_kafkacontroller_preferredreplicaimbalancecount'
> # HELP kafka_controller_kafkacontroller_preferredreplicaimbalancecount 
> Attribute exposed for management (kafka.controller name=PreferredReplicaImbalanceCount><>Value)
> # TYPE kafka_controller_kafkacontroller_preferredreplicaimbalancecount gauge
> kafka_controller_kafkacontroller_preferredreplicaimbalancecount -822.0
> {code}
> The issue has appeared after an operation where the number of partitions for 
> some topics were increased, and some topics were deleted/created in order to 
> decrease the number of their partitions.
> Ran the following command to check if there is/are any instance/s where the 
> preferred leader (1st broker in the Replica list) is not the current Leader:
>  
> {code:java}
> % grep ".*Topic:.*Partition:.*Leader:.*Replicas:.*Isr:.*Offline:.*" 
> kafka-topics_describe.out | awk '{print $6 " " $8}' | cut -d "," -f1 | awk 
> '{print $0, ($1==$2?_:"NOT") "MATCHED"}'|grep NOT | wc -l
>  0
> {code}
> but could not find any such instances.
> {{leader.imbalance.per.broker.percentage=2}} is set for all the brokers in 
> the cluster which means that we are allowed to have an imbalance of up to 2% 
> for preferred leaders. This seems to be a valid value, as such, this setting 
> should not contribute towards a negative metric.
> The metric seems to be getting subtracted in the code 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerContext.scala#L474-L503]
>  , however it is not clear when it can become -ve (i.e. subtracted more than 
> added) in absence of any comments or debug/trace level logs in the code. 
> However, one thing is for sure, you either have no imbalance (0) or have 
> imbalance (> 0), it doesn’t make sense for the metric to be < 0. 
> FWIW, no other anomalies besides this have been detected.
> Considering these metrics get actively monitored, we should look at adding 
> DEBUG/TRACE logging around the addition/subtraction of these metrics (and 
> elsewhere where appropriate) to identify any potential issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12348: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance

2022-07-14 Thread GitBox


dajac commented on PR #12348:
URL: https://github.com/apache/kafka/pull/12348#issuecomment-1184491780

   @aiquestion Any update on this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12318: MINOR: Support --release in FeatureCommand

2022-07-14 Thread GitBox


dengziming commented on PR #12318:
URL: https://github.com/apache/kafka/pull/12318#issuecomment-1184476081

   @mumrah Thank you for your reminder, I added `--release` support for 
describing and added a unit test for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1184476011

   @junrao , PR updated in this commit: 
https://github.com/apache/kafka/commit/53f7ebe3ec519bd1f4c876f21228dfce7ce42403 
. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921131155


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+} finally {
+  log.close()
+}
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingLogsToRecover` metrics
+val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+assertEquals(expectedParams.size, logMetrics.size)
+
+val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+val paths = capturedPath.getAllValues
+val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+// expected the end value is 0
+logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+expectedParams.foreach {
+  case (path, totalLogs) =>
+// make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+var expectedCurRemainingLogs = totalLogs + 1
+for (i <- 0 until paths.size()) {
+  if (paths.get(i).contains(path)) {
+expectedCurRemainingLogs -= 1
+assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+  }
+}
+assertEquals(0, expectedCurRemainingLogs)
+}
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+ logDirs: Seq[File],
+ 
recoveryThreadsPerDataDir: Int,
+ mockMap: 
ConcurrentHashMap[String, Int],
+ expectedParams: 
Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingSegmentsToRecover` metrics
+val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+// expected each log dir has 2 metrics for each thread
+assertEquals(recoveryThreadsPerDataDir * logDirs.size, 
logSegmentMetrics.size)
+
+val capturedThreadName: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingSegments: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingSegments from totalSegments to 0 for 
each thread, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(mockMap, 
times(expectedCallTimes)).put(capturedThreadName.capture(), 
capturedNumRemainingSegments.capture());
+
+// expected the end value is 0
+logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+val threadNames = capturedThreadName.getAllValues
+val numRemainingSegments = capturedNumRemainingSegments.getAllValues
+
+expectedParams.foreach {
+  case (threadName, totalSegments) =>
+// make sure we update the numRemainingSegments from totalSegments to 
0 in order for each thread
+var 

[GitHub] [kafka] dengziming commented on a diff in pull request #12318: MINOR: Support --release in FeatureCommand

2022-07-14 Thread GitBox


dengziming commented on code in PR #12318:
URL: https://github.com/apache/kafka/pull/12318#discussion_r921176571


##
core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala:
##
@@ -107,4 +116,17 @@ class FeatureCommandTest extends BaseRequestTest {
   assertTrue(downgradeDescribeOutput.contains(expectedOutput))
 }
   }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft"))
+  def testUpdateFeatureByReleaseVersion(quorum: String): Unit = {
+val initialDescribeOutput = 
TestUtils.grabConsoleOutput(FeatureCommand.mainNoExit(Array("--bootstrap-server",
 bootstrapServers(), "describe")))
+assertTrue(initialDescribeOutput.contains("Feature: 
metadata.version\tSupportedMinVersion: 1\tSupportedMaxVersion: 
7\tFinalizedVersionLevel: 1"))
+
+FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), 
"upgrade", "--release", IBP_3_3_IV3.version()))

Review Comment:
   I moved the original `FeatureCommandTest` to `FeatureCommandIntegrationTest` 
and add a new `FeatureCommandTest` to do these unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File],
 
 val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
   logDir.isDirectory && 
UnifiedLog.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)
-val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
+numRemainingLogs.put(dir.getAbsolutePath, new 
AtomicInteger(logsToLoad.length))
 
 val jobsForDir = logsToLoad.map { logDir =>
   val runnable: Runnable = () => {
+debug(s"Loading log $logDir")
+var log = None: Option[UnifiedLog]
+val logLoadStartMs = time.hiResClockMs()
 try {
-  debug(s"Loading log $logDir")
-
-  val logLoadStartMs = time.hiResClockMs()
-  val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-defaultConfig, topicConfigOverrides)
-  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-  val currentNumLoaded = numLogsLoaded.incrementAndGet()
-
-  info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
-s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+  log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
+defaultConfig, topicConfigOverrides, numRemainingSegments))
 } catch {
   case e: IOException =>
 handleIOException(logDirAbsolutePath, e)
   case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+} finally {
+  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+  val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
dir.getAbsolutePath)
+  val currentNumLoaded = logsToLoad.length - remainingLogs
+  log match {
+case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
+  s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
+case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")

Review Comment:
   The log output might not be in order, ex: 
   ... (11/100 completed in /tmp/kafkaLogs)
   ... (10/100 completed in /tmp/kafkaLogs)
   ... (12/100 completed in /tmp/kafkaLogs)
   
   but I think that's less important. Otherwise, we need a lock in the 
`finally` block, which I think it'll affect the log recovery performance. That 
said, since we can make sure the metric result is in correct order, the log 
output not in order should be tolerant. WDYT?
   
   One thing to add, the log output might be out of order behavior was already 
there before my change. FYI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921173647


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -363,28 +390,32 @@ class LogManager(logDirs: Seq[File],
 
 val logsToLoad = 
Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
   logDir.isDirectory && 
UnifiedLog.parseTopicPartitionName(logDir).topic != 
KafkaRaftServer.MetadataTopic)
-val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
+numRemainingLogs.put(dir.getAbsolutePath, new 
AtomicInteger(logsToLoad.length))
 
 val jobsForDir = logsToLoad.map { logDir =>
   val runnable: Runnable = () => {
+debug(s"Loading log $logDir")
+var log = None: Option[UnifiedLog]
+val logLoadStartMs = time.hiResClockMs()
 try {
-  debug(s"Loading log $logDir")
-
-  val logLoadStartMs = time.hiResClockMs()
-  val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-defaultConfig, topicConfigOverrides)
-  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-  val currentNumLoaded = numLogsLoaded.incrementAndGet()
-
-  info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
-s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+  log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
+defaultConfig, topicConfigOverrides, numRemainingSegments))
 } catch {
   case e: IOException =>
 handleIOException(logDirAbsolutePath, e)
   case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+} finally {
+  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+  val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
dir.getAbsolutePath)
+  val currentNumLoaded = logsToLoad.length - remainingLogs
+  log match {
+case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
+  s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
+case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")

Review Comment:
   The log output might not be in order, ex: 
   ... (11/100 completed in /tmp/kafkaLogs)
   ... (10/100 completed in /tmp/kafkaLogs)
   ... (12/100 completed in /tmp/kafkaLogs)
   
   but I think that's less important. Otherwise, we need a lock in the 
`finally` block, which I think it'll affect the log recovery performance. That 
said, since we can make sure the metric result is in correct order, the log 
output not in order should be tolerant. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921168895


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -307,6 +309,27 @@ class LogManager(logDirs: Seq[File],
 log
   }
 
+  // factory class for naming the log recovery threads used in metrics
+  class LogRecoveryThreadFactory(val dirPath: String) extends ThreadFactory {
+val threadNum = new AtomicInteger(0)
+
+override def newThread(runnable: Runnable): Thread = {
+  KafkaThread.nonDaemon(logRecoveryThreadName(dirPath, 
threadNum.getAndIncrement()), runnable)
+}
+  }
+
+  // create a unique log recovery thread name for each log dir as the format: 
prefix-dirPath-threadNum, ex: "log-recovery-/tmp/kafkaLogs-0"
+  private def logRecoveryThreadName(dirPath: String, threadNum: Int, prefix: 
String = "log-recovery"): String = s"$prefix-$dirPath-$threadNum"
+
+  /*
+   * decrement the number of remaining logs
+   * @return the number of remaining logs after decremented 1
+   */
+  private[log] def decNumRemainingLogs(numRemainingLogs: ConcurrentMap[String, 
AtomicInteger], path: String): Int = {
+require(path != null, "path cannot be null to update remaining logs 
metric.")
+numRemainingLogs.get(path).decrementAndGet()

Review Comment:
   Decrement 1 to `AtomicInteger` object when one log loaded, so that we can 
make sure the metric can reflect the correct number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921159244


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)

Review Comment:
   > how do we enforce the expected number of segments?
   
   We can make sure the number of segments because we set "segment.byte=1024", 
and the dummy record size is 72 bytes each. So that we can confirm how many 
segments to be created. I've updated the test and add comments.
   
   > should we explicitly call log.roll()?
   
   No, I don't think we need `log.roll()` here, because we only need log 
segments filled with records for recovery. Besides, we don't want to update 
recovery checkpoint to affect the remaining segments metric results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921159244


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)

Review Comment:
   > how do we enforce the expected number of segments?
   
   We can make sure the number of segments because we set "segment.byte=1024", 
and the dummy record size is 72 bytes each. So that we can confirm how many 
segments to be created.
   
   > should we explicitly call log.roll()?
   
   No, I don't think we need `log.roll()` here, because we only need log 
segments filled with records for recovery. Besides, we don't want to update 
recovery checkpoint to affect the remaining segments metric results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries

2022-07-14 Thread Adrian Preston (Jira)
Adrian Preston created KAFKA-14074:
--

 Summary: Restarting a broker during re-assignment can leave log 
directory entries
 Key: KAFKA-14074
 URL: https://issues.apache.org/jira/browse/KAFKA-14074
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0, 2.8.0
Reporter: Adrian Preston


Re-starting a broker while replicas are being assigned away from the broker can 
result in topic partition directories being left in the broker’s log directory. 
This can trigger further problems if such a topic is deleted and re-created. 
These problems occur when replicas for the new topic are placed on a broker 
that hosts a “stale” topic partition directory of the same name, causing the 
on-disk topic partition state held by different brokers in the cluster to 
diverge.

We have also been able to re-produce variants this problem using Kafka 2.8 and 
3.1, as well as Kafka built from the head of the apache/kafka repository (at 
the time of writing this is commit: 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). 
We have *not* being able to re-produce this problem with Kafka running in KRaft 
mode.

A minimal re-create for topic directories being left on disk is as follows:
 # Start ZooKeeper and a broker (both using the sample config)
 # Create 100 topics: each with 1 partition, and with replication factor 1
 # Add a second broker to the Kafka cluster (with minor edits to the sample 
config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}})
 # Issue a re-assignment that moves all of the topic partition replicas  from 
the first broker to the second broker
 # While this re-assignment is taking place shutdown the first broker (you need 
to be quick with only two brokers and 100 topics…)
 # Wait a few seconds for the re-assignment to stall
 # Restart the first broker and wait for the re-assignment to complete and it 
to remove any partially deleted topics (e.g. those with a “-delete” suffix).

Inspecting the logs directory for the first broker should show directories 
corresponding to topic partitions that are owned by the second broker. These 
are not cleaned up when the re-assignment completes, and also remain in the 
logs directory even if the first broker is restarted.  Deleting the topic also 
does not clean up the topic partitions left behind on the first broker - which 
leads to a second potential problem.

For topics that have more than one replica: a new topic that has the same name 
as a previously deleted topic might have replicas created on a broker with 
“stale” topic partition directories. If this happens these topics will remain 
in an under-replicated state.

A minimal re-create for this is as follows:
 # Create a three node Kafka cluster (backed by ZK) based off the sample config 
(to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2)
 # Create 100 topics: each with 1 partition, and with replication factor 2
 # Submit a re-assignment to move all of the topic partition replicas to 
kafka-0 and kafka-1,  and wait for it to complete
 # Submit a re-assignment to move all of the topic partition replicas on 
kafka-0 to kafka-2.
 # While this re-assignment is taking place shutdown and re-start kafka-0.
 # Wait for the re-assignment to complete, and check that there’s unexpected 
topic partition directories in kafka-0’s logs directory
 # Delete all 100 topics, and re-create 100 new topics with the same name and 
configuration as the deleted topics.

In this state kafka-1 and kafka-2 continually generate log messages similar to:
{{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition 
test-039-0. This error may be returned transiently when the partition is being 
created or deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread)}}

Topics that have had replicas created on kafka-0 are under-replicated with 
kafka-0 missing from the ISR list. Performing a rolling restart of each broker 
in turn does not resolve the problem, in fact more partitions are listed as 
under-replicated, as before kafka-0 is missing from their ISR list.

I also tried to re-create this with Kafka running in Kraft mode, but was unable 
to do so. My test configuration was three brokers configured based on 
/config/kraft/server.properties. All three brokers were part of the controller 
quorum. Interestingly I see log lines like the following when re-starting the 
broker that I stopped mid-reassignment:

{{[2022-07-14 13:44:42,705] INFO Found stray log dir 
Log(dir=/tmp/kraft-2/test-029-0, topicId=DMGA3zxyQqGUfeV6cmkcmg, 
topic=test-029, partition=0, highWatermark=0, lastStableOffset=0, 
logStartOffset=0, logEndOffset=0): the current replica assignment [I@530d4c70 
does not contain the local brokerId 2. 
(kafka.server.metadata.BrokerMetadataPublisher$)}}

With later log lines showing the topic 

[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921131155


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+} finally {
+  log.close()
+}
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingLogsToRecover` metrics
+val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+assertEquals(expectedParams.size, logMetrics.size)
+
+val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+val paths = capturedPath.getAllValues
+val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+// expected the end value is 0
+logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+expectedParams.foreach {
+  case (path, totalLogs) =>
+// make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+var expectedCurRemainingLogs = totalLogs + 1
+for (i <- 0 until paths.size()) {
+  if (paths.get(i).contains(path)) {
+expectedCurRemainingLogs -= 1
+assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+  }
+}
+assertEquals(0, expectedCurRemainingLogs)
+}
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+ logDirs: Seq[File],
+ 
recoveryThreadsPerDataDir: Int,
+ mockMap: 
ConcurrentHashMap[String, Int],
+ expectedParams: 
Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingSegmentsToRecover` metrics
+val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+// expected each log dir has 2 metrics for each thread
+assertEquals(recoveryThreadsPerDataDir * logDirs.size, 
logSegmentMetrics.size)
+
+val capturedThreadName: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingSegments: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingSegments from totalSegments to 0 for 
each thread, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(mockMap, 
times(expectedCallTimes)).put(capturedThreadName.capture(), 
capturedNumRemainingSegments.capture());
+
+// expected the end value is 0
+logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+val threadNames = capturedThreadName.getAllValues
+val numRemainingSegments = capturedNumRemainingSegments.getAllValues
+
+expectedParams.foreach {
+  case (threadName, totalSegments) =>
+// make sure we update the numRemainingSegments from totalSegments to 
0 in order for each thread
+var 

[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921127854


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+} finally {
+  log.close()
+}
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingLogsToRecover` metrics
+val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+assertEquals(expectedParams.size, logMetrics.size)
+
+val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+val paths = capturedPath.getAllValues
+val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+// expected the end value is 0
+logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+expectedParams.foreach {
+  case (path, totalLogs) =>
+// make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+var expectedCurRemainingLogs = totalLogs + 1
+for (i <- 0 until paths.size()) {
+  if (paths.get(i).contains(path)) {
+expectedCurRemainingLogs -= 1
+assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+  }
+}
+assertEquals(0, expectedCurRemainingLogs)
+}
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+ logDirs: Seq[File],
+ 
recoveryThreadsPerDataDir: Int,
+ mockMap: 
ConcurrentHashMap[String, Int],
+ expectedParams: 
Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingSegmentsToRecover` metrics
+val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+// expected each log dir has 2 metrics for each thread

Review Comment:
   Good catch! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921127366


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -638,6 +641,221 @@ class LogManagerTest {
 assertTrue(logManager.partitionsInitializing.isEmpty)
   }
 
+  private def appendRecordsToLog(time: MockTime, parentLogDir: File, 
partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: 
Int): Unit = {
+def createRecords = TestUtils.singletonRecords(value = "test".getBytes, 
timestamp = time.milliseconds)
+val tpFile = new File(parentLogDir, s"$name-$partitionId")
+
+val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, 
time.scheduler, time, 0, 0,
+  5 * 60 * 1000, 60 * 60 * 1000, 
LogManager.ProducerIdExpirationCheckIntervalMs)
+
+val numMessages = 20
+try {
+  for (_ <- 0 until numMessages) {
+log.appendAsLeader(createRecords, leaderEpoch = 0)
+  }
+
+  assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
+} finally {
+  log.close()
+}
+  }
+
+  private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingLogsToRecover` metrics
+val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+assertEquals(expectedParams.size, logMetrics.size)
+
+val capturedPath: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingLogs: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingLogs from totalLogs to 0 for each log 
dir, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(spyLogManager, 
times(expectedCallTimes)).updateNumRemainingLogs(any, capturedPath.capture(), 
capturedNumRemainingLogs.capture());
+
+val paths = capturedPath.getAllValues
+val numRemainingLogs = capturedNumRemainingLogs.getAllValues
+
+// expected the end value is 0
+logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+expectedParams.foreach {
+  case (path, totalLogs) =>
+// make sure we update the numRemainingLogs from totalLogs to 0 in 
order for each log dir
+var expectedCurRemainingLogs = totalLogs + 1
+for (i <- 0 until paths.size()) {
+  if (paths.get(i).contains(path)) {
+expectedCurRemainingLogs -= 1
+assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i))
+  }
+}
+assertEquals(0, expectedCurRemainingLogs)
+}
+  }
+
+  private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
+ logDirs: Seq[File],
+ 
recoveryThreadsPerDataDir: Int,
+ mockMap: 
ConcurrentHashMap[String, Int],
+ expectedParams: 
Map[String, Int]): Unit = {
+val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+// get all `remainingSegmentsToRecover` metrics
+val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+  .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+  .map { case (_, gauge) => gauge }
+  .asInstanceOf[ArrayBuffer[Gauge[Int]]]
+
+// expected each log dir has 2 metrics for each thread
+assertEquals(recoveryThreadsPerDataDir * logDirs.size, 
logSegmentMetrics.size)
+
+val capturedThreadName: ArgumentCaptor[String] = 
ArgumentCaptor.forClass(classOf[String])
+val capturedNumRemainingSegments: ArgumentCaptor[Int] = 
ArgumentCaptor.forClass(classOf[Int])
+
+// Since we'll update numRemainingSegments from totalSegments to 0 for 
each thread, so we need to add 1 here
+val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum
+verify(mockMap, 
times(expectedCallTimes)).put(capturedThreadName.capture(), 
capturedNumRemainingSegments.capture());
+
+// expected the end value is 0
+logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
+
+val threadNames = capturedThreadName.getAllValues
+val numRemainingSegments = capturedNumRemainingSegments.getAllValues
+
+expectedParams.foreach {
+  case (threadName, totalSegments) =>
+// make sure we update the numRemainingSegments from totalSegments to 
0 in order for each thread
+var 

[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-14 Thread GitBox


showuon commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r921125134


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -366,25 +392,31 @@ class LogManager(logDirs: Seq[File],
 val numLogsLoaded = new AtomicInteger(0)
 numTotalLogs += logsToLoad.length
 
+updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, 
logsToLoad.length)
+
 val jobsForDir = logsToLoad.map { logDir =>
   val runnable: Runnable = () => {
+debug(s"Loading log $logDir")
+var log = None: Option[UnifiedLog]
+val logLoadStartMs = time.hiResClockMs()
 try {
-  debug(s"Loading log $logDir")
-
-  val logLoadStartMs = time.hiResClockMs()
-  val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
-defaultConfig, topicConfigOverrides)
-  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
-  val currentNumLoaded = numLogsLoaded.incrementAndGet()
-
-  info(s"Completed load of $log with ${log.numberOfSegments} 
segments in ${logLoadDurationMs}ms " +
-s"($currentNumLoaded/${logsToLoad.length} loaded in 
$logDirAbsolutePath)")
+  log = Some(loadLog(logDir, hadCleanShutdown, recoveryPoints, 
logStartOffsets,
+defaultConfig, topicConfigOverrides, numRemainingSegments))
 } catch {
   case e: IOException =>
 handleIOException(logDirAbsolutePath, e)
   case e: KafkaStorageException if 
e.getCause.isInstanceOf[IOException] =>
 // KafkaStorageException might be thrown, ex: during writing 
LeaderEpochFileCache
 // And while converting IOException to KafkaStorageException, 
we've already handled the exception. So we can ignore it here.
+} finally {
+  val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
+  val currentNumLoaded = numLogsLoaded.incrementAndGet()
+  updateNumRemainingLogs(numRemainingLogs, dir.getAbsolutePath, 
logsToLoad.length - currentNumLoaded)

Review Comment:
   Good point! I've changed the type in Map, from `Map[String, Int]` to 
`Map[String, AtomicInteger]`. And we'll decrement one each time update the 
`numRemainingLogs`, so that we can make sure the metric number will be 
decremented in correct order. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching

2022-07-14 Thread GitBox


rajinisivaram merged PR #10964:
URL: https://github.com/apache/kafka/pull/10964


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching

2022-07-14 Thread GitBox


rajinisivaram commented on PR #10964:
URL: https://github.com/apache/kafka/pull/10964#issuecomment-1184403492

   @dajac Thanks for the reviews and changes. Merging to trunk and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-14 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566808#comment-17566808
 ] 

Daniel Urban commented on KAFKA-14053:
--

I understand that increasing the epoch on the client side is probably violating 
the contract in the protocol.

Refactored my change so the client side timeouts (both delivery and request 
timeout) will become fatal errors in transactional producers, resulting a last, 
best-effort epoch bump.

> Transactional producer should bump the epoch when a batch encounters delivery 
> timeout
> -
>
> Key: KAFKA-14053
> URL: https://issues.apache.org/jira/browse/KAFKA-14053
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> When a batch fails due to delivery timeout, it is possible that the batch is 
> still in-flight. Due to underlying infra issues, it is possible that an 
> EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
> batch is processed on the leader. This can cause transactional batches to be 
> appended to the log after the corresponding abort marker.
> This can cause the LSO to be infinitely blocked in the partition, or can even 
> violate processing guarantees, as the out-of-order batch can become part of 
> the next transaction.
> Because of this, the producer should skip aborting the partition, and bump 
> the epoch to fence the in-flight requests.
>  
> More detail can be found here: 
> [https://lists.apache.org/thread/8d2oblsjtdv7740glc37v79f0r7p99dp]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-13868:
-
External issue URL:   (was: 
https://lists.apache.org/thread/8drsbn0hgdhq4g1qgvm9g8pb5t4x42px)

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-13868:
-
External issue URL: 
https://lists.apache.org/thread/8drsbn0hgdhq4g1qgvm9g8pb5t4x42px

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching

2022-07-14 Thread GitBox


rajinisivaram commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r920997757


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws 
Exception {
 }
 }
 
+@Test
+public void testBatchedListConsumerGroupOffsets() throws Exception {
+Cluster cluster = mockCluster(1, 0);
+Time time = new MockTime();
+Map groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+verifyListOffsetsForMultipleGroups(groupSpecs, result);
+}
+}
+
+@Test
+public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() 
throws Exception {

Review Comment:
   @dajac Thank you! I realize now that I was only going through the path where 
we were looking up the coordinator. But you are right, that isn't sufficient. 
Your changes look good. Thank you for checking this out and adding the 
implementation and tests!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lkokhreidze commented on pull request #12404: [MINOR] Fix QueryResult Javadocs

2022-07-14 Thread GitBox


lkokhreidze commented on PR #12404:
URL: https://github.com/apache/kafka/pull/12404#issuecomment-1184249327

   Call for review @vvcephei @cadonna @mjsax


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lkokhreidze opened a new pull request, #12404: [MINOR] Fix QueryResult Javadocs

2022-07-14 Thread GitBox


lkokhreidze opened a new pull request, #12404:
URL: https://github.com/apache/kafka/pull/12404

   Fixes the `QueryResult` javadocs. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching

2022-07-14 Thread GitBox


dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r920973675


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws 
Exception {
 }
 }
 
+@Test
+public void testBatchedListConsumerGroupOffsets() throws Exception {
+Cluster cluster = mockCluster(1, 0);
+Time time = new MockTime();
+Map groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+verifyListOffsetsForMultipleGroups(groupSpecs, result);
+}
+}
+
+@Test
+public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() 
throws Exception {

Review Comment:
   @rajinisivaram As I had to build the code to prove my theory, I pushed the 
commit so you can reuse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2022-07-14 Thread GitBox


ashwinpankaj commented on code in PR #8690:
URL: https://github.com/apache/kafka/pull/8690#discussion_r920869174


##
clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java:
##
@@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] 
keyBytes, Object value, by
 }
 
 private int nextValue(String topic) {
-AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
-return new AtomicInteger(0);
-});
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
 return counter.getAndIncrement();
 }
 
+@Override
+public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+// After onNewBatch is called, we will call partition() again.
+// So 'rewind' the counter for this topic.
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
+counter.getAndDecrement();

Review Comment:
   I feel that the fix lies in RecordAccumulator as currently it is always 
returning `abortForNewBatch`=true from append() for a partition which does not 
have a Deque created.
   
   If a partition does not have a deque , 
[accumulator.getOrCreateDeque()](https://github.com/apache/kafka/blob/94d4fdeb28b3cd4d474d943448a7ef653eaa145d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L940)
 simply creates an empty ArrayQueue.
   When accumulator tries to append a new record, [tryAppend() 
](https://github.com/apache/kafka/blob/94d4fdeb28b3cd4d474d943448a7ef653eaa145d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L383)
 will return null since ProducerBatch has not been created. 
   
   Here are the sequence of events if key value is not set for record -
   
1. partitioner.partition() is invoked - partition id for topic is 
incremented
1. recordaccumulator.append() is invoked with `abortOnNewBatch` arg is set 
to true. Accumulator is unable to append record to a batch it returns 
RecordAppendResult with abortForNewBatch set to true. 
1. partitioner.onNewBatch() is invoked
1. partitioner.partition() is invoked again - partition id for topic is 
incremented
1. recordaccumulator.append() is invoked again with `abortOnNewBatch` arg 
is set to false. This time accumulator allocates a new ProducerBatch and 
appends the record.
   
   Probable fix:
In accumulator.getOrCreateDeque() in addition to creating a Deque, we 
should also initialize an empty ProducerBatch for the topicPartition.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics

2022-07-14 Thread Emmanuel Brard (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566714#comment-17566714
 ] 

Emmanuel Brard commented on KAFKA-14069:


I think it might be a bug I don't see any Delete requests being processed from 
the Kafka cluster log for topic with this pattern 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-".

> Allow custom configuration of foreign key join internal topics
> --
>
> Key: KAFKA-14069
> URL: https://issues.apache.org/jira/browse/KAFKA-14069
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Emmanuel Brard
>Priority: Minor
>
> Internal topic supporting foreign key joins (-subscription-registration-topic 
> and -subscription-response-topic) are automatically created with_ infinite 
> retention_ (retention.ms=-1, retention.bytes=-1).
> As far as I understand those topics are used for communication between tasks 
> that are involved in the FK, the intermediate result though is persisted in a 
> compacted topic (-subscription-store-changelog).
> This means, if I understood right, that during normal operation of the stream 
> application, once a message is read from the registration/subscription topic, 
> it will not be read again, even in case of recovery (the position in those 
> topics is committed).
> Because we have very large tables being joined this way with very high 
> changes frequency, we end up with FK internal topics in the order of 1 or 2 
> TB. This is complicated to maintain especially in term of disk space.
> I was wondering if:
> - this infinite retention is really a required configuration and if not
> - this infinite retention could be replaced with a configurable one (for 
> example of 1 week, meaning that I accept that in case of failure I must this 
> my app within one week)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #8690: KAFKA-9965: Fix uneven distribution in RoundRobinPartitioner

2022-07-14 Thread GitBox


ashwinpankaj commented on code in PR #8690:
URL: https://github.com/apache/kafka/pull/8690#discussion_r920869174


##
clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java:
##
@@ -65,12 +65,20 @@ public int partition(String topic, Object key, byte[] 
keyBytes, Object value, by
 }
 
 private int nextValue(String topic) {
-AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
-return new AtomicInteger(0);
-});
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
 return counter.getAndIncrement();
 }
 
+@Override
+public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+// After onNewBatch is called, we will call partition() again.
+// So 'rewind' the counter for this topic.
+AtomicInteger counter = topicCounterMap.
+computeIfAbsent(topic, k -> new AtomicInteger(0));
+counter.getAndDecrement();

Review Comment:
   I feel that the fix lies in RecordAccumulator as currently it is always 
returning `abortForNewBatch`=true from append() for a partition which does not 
have a Deque created.
   
   If a partition does not have a deque , 
[accumulator.getOrCreateDeque()](https://github.com/confluentinc/ce-kafka/blob/609c9e8a14f4689de268a9b06b27b5cc982a51c8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L677)
 simply creates an empty ArrayQueue.
   When accumulator tries to append a new record, [tryAppend() 
](https://github.com/confluentinc/ce-kafka/blob/609c9e8a14f4689de268a9b06b27b5cc982a51c8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L264)will
 return null since ProducerBatch has not been created. 
   
   Here are the sequence of events if key value is not set for record -
   
1. partitioner.partition() is invoked - partition id for topic is 
incremented
1. recordaccumulator.append() is invoked with `abortOnNewBatch` arg is set 
to true. Accumulator is unable to append record to a batch it returns 
RecordAppendResult with abortForNewBatch set to true. 
1. partitioner.onNewBatch() is invoked
1. partitioner.partition() is invoked again - partition id for topic is 
incremented
1. recordaccumulator.append() is invoked again with `abortOnNewBatch` arg 
is set to false. This time accumulator allocates a new ProducerBatch and 
appends the record.
   
   Probable fix:
In accumulator.getOrCreateDeque() in addition to creating a Deque, we 
should also initialize an empty ProducerBatch for the topicPartition.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics

2022-07-14 Thread Emmanuel Brard (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566695#comment-17566695
 ] 

Emmanuel Brard commented on KAFKA-14069:


One thing I did not mention yet is that we use version 2.8.0. I guess those API 
calls are triggered by the streaming application itself, should we run it in 
debug log mode to see the calls?

> Allow custom configuration of foreign key join internal topics
> --
>
> Key: KAFKA-14069
> URL: https://issues.apache.org/jira/browse/KAFKA-14069
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Emmanuel Brard
>Priority: Minor
>
> Internal topic supporting foreign key joins (-subscription-registration-topic 
> and -subscription-response-topic) are automatically created with_ infinite 
> retention_ (retention.ms=-1, retention.bytes=-1).
> As far as I understand those topics are used for communication between tasks 
> that are involved in the FK, the intermediate result though is persisted in a 
> compacted topic (-subscription-store-changelog).
> This means, if I understood right, that during normal operation of the stream 
> application, once a message is read from the registration/subscription topic, 
> it will not be read again, even in case of recovery (the position in those 
> topics is committed).
> Because we have very large tables being joined this way with very high 
> changes frequency, we end up with FK internal topics in the order of 1 or 2 
> TB. This is complicated to maintain especially in term of disk space.
> I was wondering if:
> - this infinite retention is really a required configuration and if not
> - this infinite retention could be replaced with a configurable one (for 
> example of 1 week, meaning that I accept that in case of failure I must this 
> my app within one week)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14069) Allow custom configuration of foreign key join internal topics

2022-07-14 Thread Emmanuel Brard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emmanuel Brard updated KAFKA-14069:
---
Affects Version/s: 2.8.0

> Allow custom configuration of foreign key join internal topics
> --
>
> Key: KAFKA-14069
> URL: https://issues.apache.org/jira/browse/KAFKA-14069
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Emmanuel Brard
>Priority: Minor
>
> Internal topic supporting foreign key joins (-subscription-registration-topic 
> and -subscription-response-topic) are automatically created with_ infinite 
> retention_ (retention.ms=-1, retention.bytes=-1).
> As far as I understand those topics are used for communication between tasks 
> that are involved in the FK, the intermediate result though is persisted in a 
> compacted topic (-subscription-store-changelog).
> This means, if I understood right, that during normal operation of the stream 
> application, once a message is read from the registration/subscription topic, 
> it will not be read again, even in case of recovery (the position in those 
> topics is committed).
> Because we have very large tables being joined this way with very high 
> changes frequency, we end up with FK internal topics in the order of 1 or 2 
> TB. This is complicated to maintain especially in term of disk space.
> I was wondering if:
> - this infinite retention is really a required configuration and if not
> - this infinite retention could be replaced with a configurable one (for 
> example of 1 week, meaning that I accept that in case of failure I must this 
> my app within one week)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching

2022-07-14 Thread GitBox


dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r920841796


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws 
Exception {
 }
 }
 
+@Test
+public void testBatchedListConsumerGroupOffsets() throws Exception {
+Cluster cluster = mockCluster(1, 0);
+Time time = new MockTime();
+Map groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+verifyListOffsetsForMultipleGroups(groupSpecs, result);
+}
+}
+
+@Test
+public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() 
throws Exception {

Review Comment:
   @rajinisivaram I was a bit puzzled by this downgrade logic so I spent a bit 
more time to understand it. In short, it seems that the current implementation 
does not work. You can make it fail by failing the first offset fetch response 
with COORDINATOR_LOAD_IN_PROGRESS for instance. When the api driver retries, it 
groups together all the groups targeting the same coordinator. The current test 
seems to work because the driver execute steps sequentially.
   
   The right approach to implement is likely to make 
`ListConsumerGroupOffsetsHandler` implements `AdminApiHandler` instead of 
`Batched`. Then in `buildRequest`, we can look at the `batch` flag in the 
`lookupStrategy`. Ideally, this should be decoupled from the `lookupStrategy` 
but that might be enough in our case. Otherwise, we need to add another flag in 
the `ListConsumerGroupOffsetsHandler`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org