Re: [PR] KAFKA-15645 ReplicationQuotasTestRig rewritten in java [kafka]

2023-11-22 Thread via GitHub


nizhikov commented on PR #14588:
URL: https://github.com/apache/kafka/pull/14588#issuecomment-1823869599

   Hello @mimaison 
   Are you be able to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]

2023-11-22 Thread via GitHub


cmccabe merged PR #14807:
URL: https://github.com/apache/kafka/pull/14807


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of 
client-metrics in kafka-configs.sh (KIP-714)
URL: https://github.com/apache/kafka/pull/14632


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 closed pull request #14699: KAFKA-15778 & KAFKA-15779: Implement 
metrics manager (KIP-714)
URL: https://github.com/apache/kafka/pull/14699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8575:
---
Fix Version/s: 4.0.0

> Investigate removing EAGER protocol &  cleaning up task suspension in Streams 
> rebalancing
> -
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 4.0.0
>
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up and going a step further to remove the EAGER protocol from Streams 
> entirely.
> Plan to remove this in 3.1/4.0, whichever comes after 3.0. This will make 3.0 
> a bridge release for users upgrading from any version below 2.4, but they 
> will still be able to do so in the usual two rolling bounces.
>  
> *The upgrade path from 2.3 and below, to any \{to_version} higher than 3.1 
> will be:*
> 1. During the first rolling bounce, upgrade the jars to a version between 2.4 
> - 3.1 and add the UPGRADE_FROM config for whichever version you are upgrading 
> from
> 2. During the second rolling bounce, upgrade the jars to the desired 
> \{to_version} and remove the UPGRADE_FROM config
>  
> EAGER will be effectively deprecated in 3.0 but not removed until the next 
> version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-8088) Deprecate `WindowStoreIterator` interface

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8088:
---
Fix Version/s: 4.0.0

> Deprecate `WindowStoreIterator` interface
> -
>
> Key: KAFKA-8088
> URL: https://issues.apache.org/jira/browse/KAFKA-8088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> The `WindowStore` interface has multiple methods to fetch() data. However, 
> the return types are mixed up. Two methods return `WindowStoreIterator` while 
> all others return `KeyValueIterator`.
> We should align the return types and replace `WindowStoreIterator` with 
> `KeyValueIterator`. For backward compatibility reasons we can only deprecate 
> the interface for now and remove it only later.
> KIP-439: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-439%3A+Deprecate+Interface+WindowStoreIterator]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12281) Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12281:

Labels: beginner needs-kip newbie  (was: needs-kip)

> Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException
> -
>
> Key: KAFKA-12281
> URL: https://issues.apache.org/jira/browse/KAFKA-12281
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> It's been 3 years since 234ec8a gets rid of usage of BrokerNotFoundException. 
> Hence, it is time to deprecate BrokerNotFoundException.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2023-11-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15848:
-

Assignee: Kirk True

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2023-11-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Description: 
The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
{{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use 
and interpretation of the {{Timer}} that is supplied.
h3. tl;dr

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

{{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
success of its operations _before_ checking the timer:
 # Submit operation asynchronously
 # Wait for operation to complete using {{NetworkClient.poll()}}
 # Check for result
 ## If successful, return success
 ## If fatal failure, return failure
 # Check timer
 ## If timer expired, return failure

{{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
 # Submit operation asynchronously
 # Wait for operation to complete using {{Future.get()}}
 ## If operation timed out, {{Future.get()}} will throw a timeout error
 # Check for result
 ## If successful, return success
 ## Otherwise, return failure

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via any of 
the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. 
Here's a bit of code that illustrates the difference between the two approaches.

{{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
manner similar to this:
{code:java}
public int getCount(Timer timer) {
do {
final RequestFuture future = sendSomeRequest(partitions);
client.poll(future, timer);

if (future.isDone())
return future.get();
} while (timer.notExpired());

return -1;
}
{code}

{{AsyncKafkaConsumer}} has similar logic, but it is structured like this:

{code:java}
private int getCount(Timer timer) {
try {
CompletableFuture future = new CompleteableFuture<>();
applicationEventQueue.add(future);
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
return -1;
}
}
{code}
The call to {{add}} enqueues the network operation, but it then _immediately_ 
invokes {{Future.get()}} with the timeout to implement a time-bounded blocking 
call. Since this method is being called with a timeout of 0, it _immediately_ 
throws a {{{}TimeoutException{}}}. 
h3. Suggested fix

TBD :(

  was:
The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
{{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use 
and interpretation of the {{Timer}} that is supplied.
h3. tl;dr

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

{{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
success of its operations _before_ checking the timer:
 # Submit operation asynchronously
 # Wait for operation to complete using {{NetworkClient.poll()}}
 # Check for result
 ## If successful, return success
 ## If fatal failure, return failure
 # Check timer
 ## If timer expired, return failure

{{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
 # Submit operation asynchronously
 # Wait for operation to complete using {{Future.get()}}
 ## If operation timed out, {{Future.get()}} will throw a timeout error
 # Check for result
 ## If successful, return success
 ## Otherwise, return failure

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via any of 
the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. 
Here's a bit of code that illustrates the difference between the two approaches.

{{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
manner similar to this:
{code:keyword}
public boolean doSomeWork(Timer timer) {
do {
final RequestFuture future = sendSomeRequest(partitions);
client.poll(future, timer);

if (future.isDone())
return {code}
{color:#910091}true{color}
{code:java}
;
} while (timer.notExpired());

return false;
}
{code}
{{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
{code:keyword}
private boolean doSomeWork(Timer timer) {
Set partitions = subscriptions.initializingPartitions();

try {
CompleteableApplicationEvent event = {code}
{color:#910091}. . .{color}
{code:java}

CompletableFuture future = event.future();
        applicationEventHandler.add(event);
future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
return false;
}
}
{code}
The call to {{add}} enqueues the network operation, but it then _immediately_ 
invokes {{Future.get()}} with the 

[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2023-11-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Description: 
The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
{{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use 
and interpretation of the {{Timer}} that is supplied.
h3. tl;dr

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

{{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
success of its operations _before_ checking the timer:
 # Submit operation asynchronously
 # Wait for operation to complete using {{NetworkClient.poll()}}
 # Check for result
 ## If successful, return success
 ## If fatal failure, return failure
 # Check timer
 ## If timer expired, return failure

{{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
 # Submit operation asynchronously
 # Wait for operation to complete using {{Future.get()}}
 ## If operation timed out, {{Future.get()}} will throw a timeout error
 # Check for result
 ## If successful, return success
 ## Otherwise, return failure

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via any of 
the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. 
Here's a bit of code that illustrates the difference between the two approaches.

{{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
manner similar to this:
{code:keyword}
public boolean doSomeWork(Timer timer) {
do {
final RequestFuture future = sendSomeRequest(partitions);
client.poll(future, timer);

if (future.isDone())
return {code}
{color:#910091}true{color}
{code:java}
;
} while (timer.notExpired());

return false;
}
{code}
{{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
{code:keyword}
private boolean doSomeWork(Timer timer) {
Set partitions = subscriptions.initializingPartitions();

try {
CompleteableApplicationEvent event = {code}
{color:#910091}. . .{color}
{code:java}

CompletableFuture future = event.future();
        applicationEventHandler.add(event);
future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
return false;
}
}
{code}
The call to {{add}} enqueues the network operation, but it then _immediately_ 
invokes {{Future.get()}} with the timeout to implement a time-bounded blocking 
call. Since this method is being called with a timeout of 0, it _immediately_ 
throws a {{{}TimeoutException{}}}. 
h3. Suggested fix

TBD :(

  was:
The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a 
fundamental difference in their timing when it relates to their use of the 
{{Timer}} that is supplied.

h3. tl;dr

For time-bounded tasks, {{LegacyKafkaConsumer}} does the following:

# Attempt the operation
# If successful, return result
# Check timer, return if expired
# Go to top

{{AsyncKafkaConsumer}} effectively does the inverse:

# Check timer, return if expired
# Attempt the operation
# If successful, return result
# Go to top

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via the 
{{KafkaConsumerTest}} unit tests, e.g. 
{{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test 
invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length 
timeout.

As part of the {{poll()}} logic, the consumer may need to refresh offsets. To 
accomplish this, {{LegacyKafkaConsumer}} uses the 
{{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured like 
this:

{code:java}
public void fetchCommittedOffsets(Set partitions, Timer timer) {
do {
final RequestFuture> future = 
sendOffsetFetchRequest(partitions);
. . .
client.poll(future, timer);
. . .
return future.value();
} while (timer.notExpired());

return null;
}
{code}

The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:

{code:java}
private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
Set initializingPartitions = 
subscriptions.initializingPartitions();

try {
OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(initializingPartitions);
Map offsets = 
applicationEventHandler.addAndGet(event, timer);
. . .
return true;
} catch (TimeoutException e) {
return false;
}
}
{code}

The call to {{addAndGet}} enqueues the operation on the queue for the network 
I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout to 
implement a time-bounded 

[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations

2023-11-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
--
Summary: Consumer API timeout inconsistent between ConsumerDelegate 
implementations  (was: Consumer API timeout inconsistent between 
LegacyKafkaConsumer and AsyncKafkaConsumer)

> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a 
> fundamental difference in their timing when it relates to their use of the 
> {{Timer}} that is supplied.
> h3. tl;dr
> For time-bounded tasks, {{LegacyKafkaConsumer}} does the following:
> # Attempt the operation
> # If successful, return result
> # Check timer, return if expired
> # Go to top
> {{AsyncKafkaConsumer}} effectively does the inverse:
> # Check timer, return if expired
> # Attempt the operation
> # If successful, return result
> # Go to top
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via the 
> {{KafkaConsumerTest}} unit tests, e.g. 
> {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test 
> invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length 
> timeout.
> As part of the {{poll()}} logic, the consumer may need to refresh offsets. To 
> accomplish this, {{LegacyKafkaConsumer}} uses the 
> {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured 
> like this:
> {code:java}
> public void fetchCommittedOffsets(Set partitions, Timer 
> timer) {
> do {
> final RequestFuture> future = 
> sendOffsetFetchRequest(partitions);
> . . .
> client.poll(future, timer);
> . . .
> return future.value();
> } while (timer.notExpired());
> return null;
> }
> {code}
> The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
> Set initializingPartitions = 
> subscriptions.initializingPartitions();
> try {
> OffsetFetchApplicationEvent event = new 
> OffsetFetchApplicationEvent(initializingPartitions);
> Map offsets = 
> applicationEventHandler.addAndGet(event, timer);
> . . .
> return true;
> } catch (TimeoutException e) {
> return false;
> }
> }
> {code}
> The call to {{addAndGet}} enqueues the operation on the queue for the network 
> I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout 
> to implement a time-bounded blocking call. {{Future.get()}} will be passed 
> the value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ 
> throw a {{TimeoutException}}.
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-11-22 Thread via GitHub


ocadaruma opened a new pull request, #14242:
URL: https://github.com/apache/kafka/pull/14242

   JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-15046
   
   - While any blocking operation under holding the UnifiedLog.lock could lead 
to serious performance (even availability) issues, currently there are several 
paths that calls fsync(2) inside the lock
 * In the meantime the lock is held, all subsequent produces against the 
partition may block
 * This easily causes all request-handlers to be busy on bad disk 
performance
 * Even worse, when a disk experiences tens of seconds of glitch (it's not 
rare in spinning drives), it makes the broker to unable to process any requests 
with unfenced from the cluster (i.e. "zombie" like status)
   - This PR gets rid of 4 cases of essentially-unnecessary fsync(2) calls 
performed under the lock:
 * (1) ProducerStateManager.takeSnapshot at UnifiedLog.roll
 - I moved fsync(2) call to the scheduler thread as part of existing 
"flush-log" job (before incrementing recovery point)
 - Since it's still ensured that the snapshot is flushed before 
incrementing recovery point, this change shouldn't cause any problem
 * (2) ProducerStateManager.removeAndMarkSnapshotForDeletion as part of log 
segment deletion
 - This method calls `Utils.atomicMoveWithFallback` with 
`needFlushParentDir = true` internally, which calls fsync.
 - I changed it to call `Utils.atomicMoveWithFallback` with 
`needFlushParentDir = false` (which is consistent behavior with index files 
deletion. index files deletion also doesn't flush parent dir)
 - This change shouldn't cause problems neither.
 * (3) LeaderEpochFileCache.truncateFromStart when incrementing 
log-start-offset
 - This path is called from deleteRecords on request-handler threads.
 - Here, we don't need fsync(2) either actually.
 - On unclean shutdown, few leader epochs might be remained in the file 
but it will be [handled by 
LogLoader](https://github.com/apache/kafka/blob/3f4816dd3eafaf1a0636d3ee689069f897c99e28/core/src/main/scala/kafka/log/LogLoader.scala#L185)
 on start-up so not a problem
 * (4) LeaderEpochFileCache.truncateFromEnd as part of log truncation
 - Likewise, we don't need fsync(2) here, since any epochs which are 
untruncated on unclean shutdown will be handled on log loading procedure
   - Please refer JIRA ticket for the further details and the performance 
experiment result
   
   To check if these changes don't cause a problem, below consistency 
expectation table will be helpful:
   
   | No | File  | Consistency expectation   

  | Description 


 | Note 


|
   
|:---|:--|:|:-|:-|
   | 1  | ProducerStateSnapshot | Snapshot files on the disk before the 
recovery point should be consistent with the log segments   
  | - On restart after unclean shutdown, Kafka will skip 
the snapshot recovery procedure before the recovery point.- If the snapshot 
content before recovery point is not consistent with the log, it will cause a 
problem like idempotency violation due to the missing producer state. | Hence, 
the inconsistency after the recovery point is acceptable because it will be 
recovered to the consistent state on the log loading procedure  
  |
   | 2  | ProducerStateSnapshot | Deleted snapshot files on the disk should be 
eventually consistent with log segments 
   | - On log segment deletion by any reasons (e.g. retention, 
topic deletion), corresponding snapshot files will be deleted.- Even when 
the broker crashes by power 

Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-11-22 Thread via GitHub


ocadaruma commented on PR #14242:
URL: https://github.com/apache/kafka/pull/14242#issuecomment-1823612335

   closing once to rebuild


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


kirktrue commented on PR #14768:
URL: https://github.com/apache/kafka/pull/14768#issuecomment-1823610187

   > Even if there was KIP for this change, we cannot put it into a minor 
release, but need to wait for 4.0.
   
   Is there any preparatory steps we need to take now (i.e. in the 3.7 release 
time frame) to make sure we are free to make this change in 4.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


kirktrue commented on code in PR #14768:
URL: https://github.com/apache/kafka/pull/14768#discussion_r1402784590


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -150,11 +150,9 @@ public class LegacyKafkaConsumer implements 
ConsumerDelegate {
 LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
 this.log = logContext.logger(getClass());
 boolean enableAutoCommit = 
config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
+
+if (this.groupId.isPresent() && this.groupId.get().isEmpty())
+throw new InvalidGroupIdException("The configured group.id 
should not be an empty string or whitespace");

Review Comment:
   Fair enough. I'll back out the error case for `LegacyKafkaConsumer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


mjsax commented on PR #14768:
URL: https://github.com/apache/kafka/pull/14768#issuecomment-1823600766

   Even if there was KIP for this change, we cannot put it into a minor 
release, but need to wait for 4.0.
   
   This change would have been ok for 3.0, but this ship has sailed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


mjsax commented on code in PR #14768:
URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777990


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java:
##
@@ -150,11 +150,9 @@ public class LegacyKafkaConsumer implements 
ConsumerDelegate {
 LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
 this.log = logContext.logger(getClass());
 boolean enableAutoCommit = 
config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
+
+if (this.groupId.isPresent() && this.groupId.get().isEmpty())
+throw new InvalidGroupIdException("The configured group.id 
should not be an empty string or whitespace");

Review Comment:
   I don't think we can make this change in a minor release, but we need to 
wait for 4.0. It's break compatibility, and we are not allowed to break it in a 
minor release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


mjsax commented on code in PR #14768:
URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777467


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -163,11 +163,9 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
 this.log = logContext.logger(getClass());
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
+
+if (this.groupId.isPresent() && this.groupId.get().isEmpty())
+throw new InvalidGroupIdException("The configured group.id 
should not be an empty string or whitespace");

Review Comment:
   This the change apply to the old protocol? If yes, I don't think we can make 
this change in a minor release, but we need to wait for 4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-22 Thread via GitHub


soarez commented on PR #14820:
URL: https://github.com/apache/kafka/pull/14820#issuecomment-1823599261

   @cmccabe @pprovenzano please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id [kafka]

2023-11-22 Thread via GitHub


mjsax commented on code in PR #14768:
URL: https://github.com/apache/kafka/pull/14768#discussion_r1402777467


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -163,11 +163,9 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
 LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
 this.log = logContext.logger(getClass());
-groupId.ifPresent(groupIdStr -> {
-if (groupIdStr.isEmpty()) {
-log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-}
-});
+
+if (this.groupId.isPresent() && this.groupId.get().isEmpty())
+throw new InvalidGroupIdException("The configured group.id 
should not be an empty string or whitespace");

Review Comment:
   This the change apply to the old protocol? If yes, I don't think we can make 
this change in a minor release, but we need to wait for 4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15768:

Description: 
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has no means to avoid getting the underlying 
error otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)

  was:
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)


> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has no means to avoid getting the 
> underlying error otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15768:

Description: 
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)

  was:
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the any result is a `FailedQueryResult` (and even 
if there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)


> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has not means to avoid getting an exception 
> otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available

2023-11-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15817:
--
Fix Version/s: 3.7.0
   3.6.1

> Avoid reconnecting to the same IP address if multiple addresses are available
> -
>
> Key: KAFKA-15817
> URL: https://issues.apache.org/jira/browse/KAFKA-15817
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Bob Barrett
>Assignee: Bob Barrett
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS 
> resolution behavior for clients to re-resolve DNS after disconnecting from a 
> broker, rather than wait until we iterated over all addresses from a given 
> resolution. This is useful when the IP addresses have changed between the 
> connection and disconnection.
> However, with the behavior change, this does mean that clients could 
> potentially reconnect immediately to the same IP they just disconnected from, 
> if the IPs have not changed. In cases where the disconnection happened 
> because that IP was unhealthy (such as a case where a load balancer has 
> instances in multiple availability zones and one zone is unhealthy, or a case 
> where an intermediate component in the network path is going through a 
> rolling restart), this will delay the client successfully reconnecting. To 
> address this, clients should remember the IP they just disconnected from and 
> skip that IP when reconnecting, as long as the address resolved to multiple 
> addresses.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix broken method link in DistributedHerder::writeToConfigTopicAsLeader Javadoc [kafka]

2023-11-22 Thread via GitHub


yashmayya merged PR #14824:
URL: https://github.com/apache/kafka/pull/14824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14552) Remove no longer required server protocol versions in Kafka 4.0

2023-11-22 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788891#comment-17788891
 ] 

Colin McCabe commented on KAFKA-14552:
--

I could go either way. I think most of the configuration key removals are 
"implied" by other KIPs (or sometimes stated directly there) but I thought it 
would be good to gather them somewhere.

> Remove no longer required server protocol versions in Kafka 4.0
> ---
>
> Key: KAFKA-14552
> URL: https://issues.apache.org/jira/browse/KAFKA-14552
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Kafka 4.0 will remove support for zk mode and kraft mode became production 
> ready in Kafka 3.3. Furthermore, migration from zk mode to kraft mode will 
> require upgrading to the bridge release first (likely 3.5, but could also be 
> 3.6).
> This provides an opportunity to remove exclusively server side protocols 
> versions that only exist to allow direct upgrades from versions older than 
> 3.n where n is either 0 (KRaft preview), 3 (KRaft production ready) or 5 
> (bridge release). We should decide on the right `n` and make the change as 
> part of 4.0.
> Note that this is complementary to the protocols that will be completely 
> removed as part of zk mode removal. Step one would be to create a list of 
> protocols that will be completely removed due to zk mode removal and the list 
> of exclusively server side protocols remaining after that (one example is 
> ControlledShutdown).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15888: DistributedHerder log context should not use the same client ID for each Connect worker by default [kafka]

2023-11-22 Thread via GitHub


C0urante commented on PR #14825:
URL: https://github.com/apache/kafka/pull/14825#issuecomment-1823468350

   Would it be worth it to keep the `connect-` prefix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]

2023-11-22 Thread via GitHub


scholzj commented on PR #14756:
URL: https://github.com/apache/kafka/pull/14756#issuecomment-1823431704

   Thanks for the review comments @viktorsomogyi. They should be now fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 opened a new pull request, #14632:
URL: https://github.com/apache/kafka/pull/14632

   The PR adds support of alter/describe configs for client-metrics as defined 
in 
[KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-kafka-configs.sh)
   
   Below are the results of the commands:
   
    Help section adds details for `client-metrics`:
   ```
   ./bin/kafka-configs.sh --help
   
   This tool helps to manipulate and describe entity config for a topic, 
client, user, broker, ip or client-metrics
   .
   .
   .
   --add-config   Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':
.
   .
   .
   controller_mutation_rate
producer_byte_rate
request_percentage
  For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
  For entity-type 'ips':
connection_creation_rate
  For entity-type 'client-metrics':
interval.ms
match
metrics
  Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
   
   --entity-type  Type of entity

(topics/clients/users/brokers/broker-
loggers/ips/client-metrics)
   
   
   --entity-name  Name of entity (topic name/client
id/user principal name/broker
id/ip/client metrics subscription
name)
   ```
   
    Incorrect entity type:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
client-metrics1 --describe --entity-name METRICSUB
   
   Invalid entity type client-metrics1, the entity type must be one of topics, 
clients, users, brokers, ips, client-metrics, broker-loggers with a 
--bootstrap-server or --bootstrap-controller argument
   ```
   
    Describe wihout entity name:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
client-metrics --describe
   
   an entity name must be specified with --describe of client-metrics
   ```
   
    Describe with blank entity name:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
client-metrics --describe --entity-name ""
   
   an entity name must be specified with --describe of client-metrics
   ```
   
    Invalid entity name. Omitted to throw exception as the describe 
response is further needed in alter to construct if the new subscription to be 
added or altered.
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
client-metrics --describe --entity-name "random"
   
   Dynamic configs for client-metric random are:
   ```
   
    Successful alter of `client-metrics`:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--entity-type client-metrics --entity-name METRICSUB --add-config 
"metrics=org.apache.kafka.consumer.,interval.ms=6,match=[client_software_name=kafka.python,client_software_version=1\.2\..*]"
   
   Completed updating config for client-metric METRICSUB.
   ```
   
    Successful describe of `client-metrics`:
   ```
   ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
client-metrics --describe --entity-name METRICSUB
   
   Dynamic configs for client-metric METRICSUB are:
 interval.ms=6 sensitive=false synonyms={}
 match=client_software_name=kafka.python,client_software_version=1\.2\..* 
sensitive=false synonyms={}
 metrics=org.apache.kafka.consumer. 

Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 closed pull request #14632: KAFKA-15681: Add support of 
client-metrics in kafka-configs.sh (KIP-714)
URL: https://github.com/apache/kafka/pull/14632


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-22 Thread via GitHub


jolshan commented on PR #14789:
URL: https://github.com/apache/kafka/pull/14789#issuecomment-1823374923

   I realized i didn't cherrypick to 3.6.1. I can do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15888: DistributedHerder log context should not use the same client ID for each Connect worker by default [kafka]

2023-11-22 Thread via GitHub


yashmayya opened a new pull request, #14825:
URL: https://github.com/apache/kafka/pull/14825

   - https://issues.apache.org/jira/browse/KAFKA-15888
   - By default, if there is no `client.id` configured on a Connect worker 
running in distributed mode, the same client ID (`connect-1`) will be used in 
the log context for the `DistributedHerder` class in every single worker in the 
Connect cluster.
   - This default is quite confusing and obviously not very useful.
   - Further, based on how this default is configured - 
https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299
   it seems like this might have been an unintentional bug (the static 
`AtomicInteger` is incremented in the `DistributedHerder`'s constructor, but 
we're only going to initialize a single `DistributedHerder` per worker JVM 
process).
   - This patch changes the default to simply use the `workerId` (the 
advertised host name and port of the worker) instead, which should be unique 
for each worker in a cluster.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15888) DistributedHerder log context should not use the same client ID for each Connect worker by default

2023-11-22 Thread Yash Mayya (Jira)
Yash Mayya created KAFKA-15888:
--

 Summary: DistributedHerder log context should not use the same 
client ID for each Connect worker by default
 Key: KAFKA-15888
 URL: https://issues.apache.org/jira/browse/KAFKA-15888
 Project: Kafka
  Issue Type: Bug
  Components: connect, KafkaConnect
Reporter: Yash Mayya
Assignee: Yash Mayya


By default, if there is no "{{{}client.id"{}}} configured on a Connect worker 
running in distributed mode, the same client ID ("connect-1") will be used in 
the log context for the DistributedHerder class in every single worker in the 
Connect cluster. This default is quite confusing and obviously not very useful. 
Further, based on how this default is configured 
([ref|https://github.com/apache/kafka/blob/150b0e8290cda57df668ba89f6b422719866de5a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L299]),
 it seems like this might have been an unintentional bug. We could simply use 
the workerId (the advertised host name and port of the worker) by default 
instead, which should be unique for each worker in a cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Fix broken method link in DistributedHerder::writeToConfigTopicAsLeader Javadoc [kafka]

2023-11-22 Thread via GitHub


yashmayya opened a new pull request, #14824:
URL: https://github.com/apache/kafka/pull/14824

   - The `ConfigBackingStore::putConnectorConfig` method was modified in 
https://github.com/apache/kafka/pull/14704 to take in a third parameter 
(`TargetState`).
   - This breaks the method link in the doc comment for 
`DistributedHerder::writeToConfigTopicAsLeader`. It doesn't break the build 
since we don't generate public Javadocs for this class, but it does affect 
things like code navigation in IDEs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15887) Autocommit during close consistently fails with exception in background thread

2023-11-22 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-15887:
--

 Summary: Autocommit during close consistently fails with exception 
in background thread
 Key: KAFKA-15887
 URL: https://issues.apache.org/jira/browse/KAFKA-15887
 Project: Kafka
  Issue Type: Sub-task
Reporter: Lucas Brutschy
Assignee: Philip Nee


when I run {{AsyncKafkaConsumerTest}} I get this every time I call close:

{code:java}
java.lang.IndexOutOfBoundsException: Index: 0
at java.base/java.util.Collections$EmptyList.get(Collections.java:4483)
at 
java.base/java.util.Collections$UnmodifiableList.get(Collections.java:1310)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.findCoordinatorSync(ConsumerNetworkThread.java:302)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.ensureCoordinatorReady(ConsumerNetworkThread.java:288)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.maybeAutoCommitAndLeaveGroup(ConsumerNetworkThread.java:276)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.cleanup(ConsumerNetworkThread.java:257)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:101)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]

2023-11-22 Thread via GitHub


cmccabe commented on code in PR #14807:
URL: https://github.com/apache/kafka/pull/14807#discussion_r1402476918


##
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##
@@ -205,4 +207,22 @@ private static List 
getImageRecords(ClusterImage image) {
 image.write(writer, new ImageWriterOptions.Builder().build());
 return writer.records();
 }
+
+@Test
+public void testHandleLossOfControllerRegistrations() {
+ClusterImage testImage = new ClusterImage(Collections.emptyMap(),
+Collections.singletonMap(1000, new 
ControllerRegistration.Builder().
+setId(1000).
+setIncarnationId(Uuid.fromString("9ABu6HEgRuS-hjHLgC4cHw")).
+setListeners(Collections.singletonMap("PLAINTEXT",
+new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"localhost", 19092))).
+setSupportedFeatures(Collections.emptyMap()).build()));
+RecordListWriter writer = new RecordListWriter();
+final AtomicReference lossString = new AtomicReference<>("");
+testImage.write(writer, new ImageWriterOptions.Builder().
+setMetadataVersion(MetadataVersion.IBP_3_6_IV2).
+setLossHandler(loss -> lossString.compareAndSet("", loss.loss())).
+build());
+assertEquals("controller registration data", lossString.get());

Review Comment:
   Both of the unit tests I added in this PR verify that a `ClusterImage` with 
controller registration generates a snapshot with `RegisterControllerRecords` 
if the metadata version is `3.7-IV0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]

2023-11-22 Thread via GitHub


jsancio commented on code in PR #14807:
URL: https://github.com/apache/kafka/pull/14807#discussion_r1402453079


##
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##
@@ -205,4 +207,22 @@ private static List 
getImageRecords(ClusterImage image) {
 image.write(writer, new ImageWriterOptions.Builder().build());
 return writer.records();
 }
+
+@Test
+public void testHandleLossOfControllerRegistrations() {
+ClusterImage testImage = new ClusterImage(Collections.emptyMap(),
+Collections.singletonMap(1000, new 
ControllerRegistration.Builder().
+setId(1000).
+setIncarnationId(Uuid.fromString("9ABu6HEgRuS-hjHLgC4cHw")).
+setListeners(Collections.singletonMap("PLAINTEXT",
+new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"localhost", 19092))).
+setSupportedFeatures(Collections.emptyMap()).build()));
+RecordListWriter writer = new RecordListWriter();
+final AtomicReference lossString = new AtomicReference<>("");
+testImage.write(writer, new ImageWriterOptions.Builder().
+setMetadataVersion(MetadataVersion.IBP_3_6_IV2).
+setLossHandler(loss -> lossString.compareAndSet("", loss.loss())).
+build());
+assertEquals("controller registration data", lossString.get());

Review Comment:
   Thanks for fixing this test. I think we are still missing a test that shows 
that a `ClusterImage` with controller registration generates a snapshot with 
`RegisterControllerRecords` if the metadata version is `3.7-IV0`. Can we add 
that test, if you agree? If not, can you point me to that test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]

2023-11-22 Thread via GitHub


viktorsomogyi commented on code in PR #14756:
URL: https://github.com/apache/kafka/pull/14756#discussion_r1400948591


##
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java:
##
@@ -185,6 +188,18 @@ private SslEngineFactory 
createNewSslEngineFactory(Map newConfigs) {
 }
 }
 
+private static boolean getBoolean(final Map configs, final 
String key, final boolean defaultValue) {
+final Object value = configs.getOrDefault(key, defaultValue);
+if (value instanceof Boolean) {
+return (boolean) value;
+} else if (value instanceof String) {
+return Boolean.parseBoolean((String) value);
+} else {
+log.warn("Invalid value (" + value + ") on configuration '" + key 
+ "'. Please specify a true/false value.");

Review Comment:
   Please also mention that in this case the default value is used (and the 
default value itself), otherwise it reads a bit incomplete.



##
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java:
##
@@ -185,6 +188,18 @@ private SslEngineFactory 
createNewSslEngineFactory(Map newConfigs) {
 }
 }
 
+private static boolean getBoolean(final Map configs, final 
String key, final boolean defaultValue) {

Review Comment:
   This is probably more like a utility-like method, would you please move it 
to `org.apache.kafka.common.utils.ConfigUtils` for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]

2023-11-22 Thread via GitHub


mimaison commented on PR #14823:
URL: https://github.com/apache/kafka/pull/14823#issuecomment-1823191335

   cc @ijuma @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]

2023-11-22 Thread via GitHub


hanyuzheng7 commented on code in PR #14821:
URL: https://github.com/apache/kafka/pull/14821#discussion_r1402442067


##
streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java:
##
@@ -77,6 +79,12 @@ public QueryResult getOnlyPartitionResult() {
 "The query did not return exactly one partition result: " + 
partitionResults
 );
 } else {
+if (nonempty.isEmpty() && partitionResults.size() != 0 && 
partitionResults.get(0).isFailure()) {
+FailureReason failureReason = 
partitionResults.get(0).getFailureReason();
+if (failureReason.equals(FailureReason.UNKNOWN_QUERY_TYPE)) {
+return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type");
+}

Review Comment:
   I think we can use a for loop to count failure reason, if all of them are 
unknown query type, we can return 
`QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query 
type")` in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]

2023-11-22 Thread via GitHub


mimaison commented on PR #14823:
URL: https://github.com/apache/kafka/pull/14823#issuecomment-1823186116

   The Java 8 Scala 2.12 build has been broken for 9 days since this backport 
https://github.com/apache/kafka/commit/1de84590c476d3ea0577cde2b4cd12e6584b8fe9
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/3.6/112/pipeline/9/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] HOTFIX: Fix compilation error with Scala 2.12 [kafka]

2023-11-22 Thread via GitHub


mimaison opened a new pull request, #14823:
URL: https://github.com/apache/kafka/pull/14823

   Fixes [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka_3.6/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:2646:51:
 the result type of an implicit conversion must be more specific than Object
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]

2023-11-22 Thread via GitHub


hanyuzheng7 commented on code in PR #14821:
URL: https://github.com/apache/kafka/pull/14821#discussion_r1402442067


##
streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java:
##
@@ -77,6 +79,12 @@ public QueryResult getOnlyPartitionResult() {
 "The query did not return exactly one partition result: " + 
partitionResults
 );
 } else {
+if (nonempty.isEmpty() && partitionResults.size() != 0 && 
partitionResults.get(0).isFailure()) {
+FailureReason failureReason = 
partitionResults.get(0).getFailureReason();
+if (failureReason.equals(FailureReason.UNKNOWN_QUERY_TYPE)) {
+return 
QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "unknown query type");
+}

Review Comment:
   I think we can use a for loop to count failure reason, if all unknown query 
type, we can return `QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, 
"unknown query type")` in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15876: Introduce RemoteStorageNotReadyException retryable error [kafka]

2023-11-22 Thread via GitHub


kamalcph opened a new pull request, #14822:
URL: https://github.com/apache/kafka/pull/14822

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]

2023-11-22 Thread via GitHub


hanyuzheng7 opened a new pull request, #14821:
URL: https://github.com/apache/kafka/pull/14821

   Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the any result is a `FailedQueryResult` (and even 
if there is only a single FailedQueryResult).
   
   The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
blindly (and incorrectly) calls `getResult`.
   
   Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15886) Always specify directories for new partition registrations

2023-11-22 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15886:
---

Assignee: Igor Soarez

> Always specify directories for new partition registrations
> --
>
> Key: KAFKA-15886
> URL: https://issues.apache.org/jira/browse/KAFKA-15886
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> When creating partition registrations directories must always be defined.
> If creating a partition from a PartitionRecord or PartitionChangeRecord from 
> an older version that does not support directory assignments, then 
> DirectoryId.MIGRATING is assumed.
> If creating a new partition, or triggering a change in assignment, 
> DirectoryId.UNASSIGNED should be specified, unless the target broker has a 
> single online directory registered, in which case the replica should be 
> assigned directly to that single directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]

2023-11-22 Thread via GitHub


soarez opened a new pull request, #14820:
URL: https://github.com/apache/kafka/pull/14820

   When creating partition registrations directories must always be defined.
   
   If creating a partition from a PartitionRecord or PartitionChangeRecord from 
an older version that does not support directory assignments, then 
DirectoryId.MIGRATING is assumed.
   
   If creating a new partition, or triggering a change in assignment, 
DirectoryId.UNASSIGNED should be specified, unless the target broker has a 
single online directory registered, in which case the replica should be 
assigned directly to that single directory.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-11-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788830#comment-17788830
 ] 

Mickael Maison commented on KAFKA-15620:


Removed 3.6.1 from the fix version list as the release script does not handle 
Jira resolved as duplicates properly. The original Jira, KAFKA-15479, has 3.6.1 
listed in its fix versions.

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> 

[jira] [Updated] (KAFKA-15876) Introduce Remote Storage Not Ready Exception

2023-11-22 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15876:
-
Labels: kip  (was: )

> Introduce Remote Storage Not Ready Exception
> 
>
> Key: KAFKA-15876
> URL: https://issues.apache.org/jira/browse/KAFKA-15876
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> When tiered storage is enabled on the cluster, Kafka broker has to build the 
> remote log metadata for all the partitions that it is either leader/follower 
> on node restart. The remote log metadata is built in asynchronous fashion and 
> does not interfere with the broker startup path. Once the broker becomes 
> online, it cannot handle the client requests (FETCH and LIST_OFFSETS) to 
> access remote storage until the metadata gets built for those partitions. 
> Currently, we are returning a ReplicaNotAvailable exception back to the 
> client so that it will retry after sometime.
> [ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java]
>  is applicable when there is a reassignment is in-progress and kind of 
> deprecated with the NotLeaderOrFollowerException 
> ([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce 
> an appropriate retriable exception for remote storage errors to denote that 
> it is not ready to accept the client requests yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15886) Always specify directories for new partition registrations

2023-11-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-15886:
---

 Summary: Always specify directories for new partition registrations
 Key: KAFKA-15886
 URL: https://issues.apache.org/jira/browse/KAFKA-15886
 Project: Kafka
  Issue Type: Sub-task
Reporter: Igor Soarez


When creating partition registrations directories must always be defined.

If creating a partition from a PartitionRecord or PartitionChangeRecord from an 
older version that does not support directory assignments, then 
DirectoryId.MIGRATING is assumed.

If creating a new partition, or triggering a change in assignment, 
DirectoryId.UNASSIGNED should be specified, unless the target broker has a 
single online directory registered, in which case the replica should be 
assigned directly to that single directory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-11-22 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15620:
---
Fix Version/s: (was: 3.6.1)

> Duplicate remote log DELETE_SEGMENT metadata is generated when there are 
> multiple leader epochs in the segment
> --
>
> Key: KAFKA-15620
> URL: https://issues.apache.org/jira/browse/KAFKA-15620
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.6.0
>Reporter: Henry Cai
>Priority: Major
> Fix For: 3.7.0
>
>
> Use the newly released 3.6.0, turn on tiered storage feature: 
> {*}remote.log.storage.system.enable{*}=true
> 1. Set up topic tier5 to be remote storage enabled.  Adding some data to the 
> topic and the data is copied to remote storage.  After a few days when the 
> log segment is removed from remote storage due to log retention expiration, 
> noticed the following warnings in the server log:
> [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
> [RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, 
> eventTimestampMs=1697005926358, brokerId=1043}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
> [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
> segment. 
> (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> No remote log segment metadata found for 
> :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id=YFNCaWjPQFSKCngQ1QcKpA}
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)
>         at 
> org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  
> 2. After some debugging, realized the problem is *there are 2 sets of 
> DELETE_SEGMENT_STARTED/FINISHED pairs* in the remote metadata topic for this 
> segment.  The DELETE_SEGMENT_FINISHED in the first set remove the segment 
> from the metadata cache and this caused the above exception when the 
> DELETE_SEGMENT_STARTED from the second set needs to be processed.
>  
> 3. And traced the log to where the log retention kicked in and saw *there 
> were two delete log segment generated* at that time:
> ```
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
> partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
> RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
> id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach 
> based on the largest record timestamp in the segment 
> (kafka.log.remote.RemoteLogManager$RLMTask)
> ```
> 4. And dumped out the content of the original COPY_SEGMENT_STARTED for this 
> segment (which triggers the generation of the later DELETE_SEGMENT metadata):
> [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
> [RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}
> , startOffset=6387830, endOffset=9578660, brokerId=1043, 
> maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
> segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
> (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)
>  
> You can see there are 2 leader epochs in this segment: 
> *segmentLeaderEpochs=\{5=6387830, 6=6721329}*
>  
> 5. From the remote log retention code 
> 

Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-22 Thread via GitHub


kamalcph commented on PR #14766:
URL: https://github.com/apache/kafka/pull/14766#issuecomment-1823109337

   > The first one is that I no longer see a point in segment.bytes and 
segment.ms (and by extension log.segment.bytes and log.segment.ms) with respect 
to tiered topics. If a person says "hey, I only want 4GB of data or data from 
the last 10 minutes around" then why would they ever need to configure how 
often a segment should be closed? If this is the case shouldn't this be 
followed by ignoring those two properties for tiered topics?
   
   `segment.bytes` will take effect when the topic has continuous inflow of 
data. If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day 
and the partition has a bytes-in load of 1 MB/sec, then the segment gets filled 
in ~40 mins and gets rotated to passive. 
   
   `segment.ms` will take effect when the topic has continuous inflow of data. 
If the user configures `segment.bytes` to 1 GB and `segment.ms` to 1 day and 
the partition has a bytes-in load of 100 bytes/sec, then the segment won't be 
filled and gets rotated to passive after 1 day.
   
   In this patch, we are trying to handle the case where the topic has some 
data in the active segment but doesn't have continuous inflow of data. So, both 
the `segment.ms` and `segment.bytes` configs are applicable for tiered storage 
topics.
   
   > The second one is that you will be changing the definition of 
local.retention. Prior to this change it meant that closed segments will be 
served from local disk for at most this much size or time as long as they have 
been moved to tiered storage. Now it will mean that anything beyond this size 
and time will be found only in tiered storage. Isn't this a "public facing 
change" and thus requiring some announcements?
   
   This was the original plan and inline with the local-log segments. If a 
topic was deprecated and won't be having any more payload in future, the user 
will expect that all the data in that topic will be removed post the retention 
time, otherwise it can fail to meet certain compliance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-22 Thread via GitHub


soarez commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1402362044


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -231,13 +238,12 @@ public void run() throws Exception {
 } else {
 failedAttempts = 0;
 AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
+
 Set failed = filterFailures(data, inflight);
+Set completed = Utils.diff(HashSet::new, 
inflight.values().stream().collect(Collectors.toSet()), failed);
+completed.forEach(assignmentEvent -> 
assignmentEvent.callback.run());
+
 log.warn("Re-queueing assignments: {}", failed);
-if (!failed.isEmpty()) {
-for (AssignmentEvent event : failed) {
-pending.put(event.partition, event);
-}
-}

Review Comment:
   I think we need to requeue these, did this not break the tests in 
AssignmentsManagerTest?



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))
+}
+})
+  case Some(DirectoryEventRequestState.COMPLETED) =>
+// Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
+if (partition.maybeReplaceCurrentWithFutureReplica()) {
+  removePartitions(Set(topicPartition))
+  assignmentRequestStates.remove(topicPartition)
+}
+  case _ =>

Review Comment:
   Looks better. Thanks



##
server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java:
##
@@ -19,13 +19,16 @@
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.function.Consumer;

Review Comment:
   Looks like this is unused. Did you run checkstyle?



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +83,69 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  override def removePartitions(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, PartitionFetchState] = {
+// Schedule assignment request to revert any queued request before 
cancelling
+for {
+  topicPartition <- topicPartitions
+  partitionState <- partitionAssignmentRequestState(topicPartition)
+  if partitionState == QUEUED
+  partition = replicaMgr.getPartitionOrException(topicPartition)
+  topicId <- partition.topicId
+  logId <- partition.logDirectoryId()


Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]

2023-11-22 Thread via GitHub


kamalcph commented on PR #14787:
URL: https://github.com/apache/kafka/pull/14787#issuecomment-1823088481

   @satishd 
   
   Addressed your review comments. PTAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix a few reconciliation bugs [kafka]

2023-11-22 Thread via GitHub


dajac opened a new pull request, #14819:
URL: https://github.com/apache/kafka/pull/14819

   Requires changes to make the reconciliation logic in the async consumer 
work. With this, I was able to join a consumer groups with a few members.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-22 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402217667


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -829,21 +902,50 @@ private 
CoordinatorResult consumerGr
 
 // Get or create the member.
 if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-if (memberEpoch == 0) {
-log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+ConsumerGroupMember updatedMember;
+boolean staticMemberReplaced = false;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+}
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberValidationFails(groupId, instanceId, member, 
memberEpoch, memberId);
+if (member == null) {
+member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+}
+throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+if (createIfNotExists) {
+log.info("[GroupId {}] Member {}, Instance-Id {} joins the 
consumer group.", groupId, memberId, instanceId);
+}
+staticMemberReplaced = staticMemberReplaced(memberEpoch, member);
+if (staticMemberReplaced) {
+removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+// The replacing member gets the same set of assignments as 
the departed member.
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions())
+
.setPartitionsPendingAssignment(member.partitionsPendingAssignment())
+
.setPartitionsPendingRevocation(member.partitionsPendingRevocation());

Review Comment:
   If I don't set `setAssignedPartitions`, then the replacing static member 
doesn't get it's assignments back. The pending assignments bit, I added just to 
ensure all assignments from previous member are assigned to the new member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-11-22 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1402215662


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -898,31 +1000,45 @@ private 
CoordinatorResult consumerGr
 group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
 }
 
-// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-// delta between the existing and the new target assignment is 
persisted to the partition.
+// 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+// replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
 int targetAssignmentEpoch = group.assignmentEpoch();
 Assignment targetAssignment = group.targetAssignment(memberId);
-if (groupEpoch > targetAssignmentEpoch) {
+if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
 String preferredServerAssignor = 
group.computePreferredServerAssignor(
 member,
 updatedMember
 ).orElse(defaultAssignor.name());
 
 try {
-TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
-new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+TargetAssignmentBuilder assignmentResultBuilder =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor));
+TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+// A new static member is replacing an older one with the same 
subscriptions.
+// We just need to remove the older member and add the newer 
one. The new member can
+// reuse the target assignment of the older member.
+if (staticMemberReplaced && groupEpoch == 
targetAssignmentEpoch) {

Review Comment:
   Actually, `groupEpoch == targetAssignmentEpoch` is not needed. I was just 
trying to ensure that the group epoch and target member epoch are the same 
which is what will happen when static member is replaced. So, in a way it's 
redundant. I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1402179874


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";
+
+private static final Map PRODUCER_CONFIG_MAPPING = new 
HashMap<>();
+private static final Map CONSUMER_CONFIG_MAPPING = new 
HashMap<>();
+
+private Resource resource = null;
+private Map config = null;
+
+static {
+PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
ClientTelemetryProvider.TRANSACTIONAL_ID);
+
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, 
ClientTelemetryProvider.GROUP_ID);
+CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
ClientTelemetryProvider.GROUP_INSTANCE_ID);
+}
+
+@Override
+public synchronized void configure(Map configs) {
+this.config = configs;
+}
+
+/**
+ * Validate that all the data required for generating correct metrics is 
present. The provider
+ * will be disabled if validation fails.
+ *
+ * @param metricsContext {@link MetricsContext}
+ * @return false if all the data required for generating correct metrics 
is missing, true
+ * otherwise.
+ */
+public boolean validate(MetricsContext metricsContext, Map 
config) {
+// metric collection will be disabled for clients without a client id 
(e.g. transient admin clients)
+return ClientTelemetryUtils.validateResourceLabel(config, 
CommonClientConfigs.CLIENT_ID_CONFIG) &&

Review Comment:
   Though the property `client.id` is optional but ProducerConfig and 
ConsumerConfig generates a client.id if not defined, which is not the case in 
AdminClient. Hence the admin client created from CLI (i.e. KafkaConfigs.sh, 
etc.) should not report any metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15885) Reduce lock contention when cleaning topics

2023-11-22 Thread Krzysztof Piecuch (Jira)
Krzysztof Piecuch created KAFKA-15885:
-

 Summary: Reduce lock contention when cleaning topics
 Key: KAFKA-15885
 URL: https://issues.apache.org/jira/browse/KAFKA-15885
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner
Reporter: Krzysztof Piecuch


Somewhat similar to KAFKA-14213, there are a couple of subroutines which 
require the same lock which results in throttling compaction speed and limits 
parallelism.

 

There's a couple of problems here:
 # LogCleanerManager.grabFilthiestCompactedLog - iterates through a list of 
partitions multiple times, all of this while holding a lock
 #  LogCleanerManager.grabFilthiestCompactedLog doesn't cache anything and 
returns only 1 item at a time - method is issued every time a cleaner thread 
asks for a new partition to compact
 # LogCleanerManager.checkCleaningAborted - a quick check which:

 ## shares a lock with grabFilthiestCompactedLog
 ## is executed every time a LogCleaner reads bufsize data to compact
 # LogCleaner's bufsize is limited to  1G / (number of log cleaner threads)

 

Here's the scenario where this design falls short:
 * I have 15k partitions
 * all of which need to be compacted fairly often but it doesn't take a lot of 
time to compact them
 * Most of the cputime spent by cleaner threads is spent on 
grabFilthiestCompactedLog
 ** so the other cleaners can't do anything since they need to acquire a lock 
to read data to compact as per 3.1. and 3.2.
 ** because of 4. log cleaners run out of work to do as soon as 
grabFilthiestLog is  called
 * Negative performance scaling - increasing # of log cleaner threads decreases 
log cleaner's bufsize which makes them hammer the lock mentioned in 3.1. and 
3.2. more often

 

I suggest:
 * making LogCleanerManager to use more fine-grained locking (ex. RW lock for 
checkCleaningAborted data structures) to decrease the effect of negative 
performance scaling
 * making LogCleanerManager.grabFilthiestLog faster on average:
 ** we don't need grabFilthiestLog to be 100% accurate
 ** we can try caching candidates for "filthiestLog" and re-calculate the cache 
every 1 minute or so.
 ** change the algorithm to probabilistic sampling (get 100 topics and pick the 
worst one?) or even round-robin
 * Alternatively we could make LogCleaner's bufsize to allow values higher



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15884) Reduce lock contention when cleaning topics

2023-11-22 Thread Krzysztof Piecuch (Jira)
Krzysztof Piecuch created KAFKA-15884:
-

 Summary: Reduce lock contention when cleaning topics
 Key: KAFKA-15884
 URL: https://issues.apache.org/jira/browse/KAFKA-15884
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner
Reporter: Krzysztof Piecuch


Somewhat similar to KAFKA-14213, there are a couple of subroutines which 
require the same lock which results in throttling compaction speed and limits 
parallelism.

 

There's a couple of problems here:
 # LogCleanerManager.grabFilthiestCompactedLog - iterates through a list of 
partitions multiple times, all of this while holding a lock
 #  LogCleanerManager.grabFilthiestCompactedLog doesn't cache anything and 
returns only 1 item at a time - method is issued every time a cleaner thread 
asks for a new partition to compact
 # LogCleanerManager.checkCleaningAborted - a quick check which:

 ## shares a lock with grabFilthiestCompactedLog
 ## is executed every time a LogCleaner reads bufsize data to compact
 # LogCleaner's bufsize is limited to  1G / (number of log cleaner threads)

 

Here's the scenario where this design falls short:
 * I have 15k partitions
 * all of which need to be compacted fairly often but it doesn't take a lot of 
time to compact them
 * Most of the cputime spent by cleaner threads is spent on 
grabFilthiestCompactedLog
 ** so the other cleaners can't do anything since they need to acquire a lock 
to read data to compact as per 3.1. and 3.2.
 ** because of 4. log cleaners run out of work to do as soon as 
grabFilthiestLog is  called
 * Negative performance scaling - increasing # of log cleaner threads decreases 
log cleaner's bufsize which makes them hammer the lock mentioned in 3.1. and 
3.2. more often

 

I suggest:
 * making LogCleanerManager to use more fine-grained locking (ex. RW lock for 
checkCleaningAborted data structures) to decrease the effect of negative 
performance scaling
 * making LogCleanerManager.grabFilthiestLog faster on average:
 ** we don't need grabFilthiestLog to be 100% accurate
 ** we can try caching candidates for "filthiestLog" and re-calculate the cache 
every 1 minute or so.
 ** change the algorithm to probabilistic sampling (get 100 topics and pick the 
worst one?) or even round-robin
 * Alternatively we could make LogCleaner's bufsize to allow values higher



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15663, KAFKA-15794: Telemetry reporter and request handling (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on code in PR #14724:
URL: https://github.com/apache/kafka/pull/14724#discussion_r1402159178


##
clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import io.opentelemetry.proto.common.v1.AnyValue;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.resource.v1.Resource;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.metrics.MetricsContext;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClientTelemetryProvider implements Configurable {
+
+public static final String DOMAIN = "org.apache.kafka";
+// Client metrics tags
+public static final String CLIENT_RACK = "client_rack";
+public static final String GROUP_ID = "group_id";
+public static final String GROUP_INSTANCE_ID = "group_instance_id";
+public static final String TRANSACTIONAL_ID = "transactional_id";

Review Comment:
   The KIP says `group member id (if any, consumer)`. I considered `if any` as 
not required label as that will not be present as standard consumer property 
rather I think broker generates same. Also I do not find `group member id` is 
exposed as label for consumer metrics. @AndrewJSchofield Please let me know if 
my understanding is incorrect or we can do better here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson updated KAFKA-7631:

Fix Version/s: 2.7.0

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Fix For: 2.7.0
>
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Olson resolved KAFKA-7631.
-
Resolution: Fixed

Marking as resolved since I believe it is.

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Fix For: 2.7.0
>
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15830: Add telemetry API handling (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on code in PR #14767:
URL: https://github.com/apache/kafka/pull/14767#discussion_r1402135807


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -913,6 +915,13 @@ class DynamicMetricReporterState(brokerId: Int, config: 
KafkaConfig, metrics: Me
 reporters.forEach { reporter =>
   metrics.addReporter(reporter)
   currentReporters += reporter.getClass.getName -> reporter
+  val clientTelemetryReceiver = reporter match {

Review Comment:
   Changes not required now, and the file shall be removed when merged with 
open PR: https://github.com/apache/kafka/pull/14699



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7631) NullPointerException when SCRAM is allowed bu ScramLoginModule is not in broker's jaas.conf

2023-11-22 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788786#comment-17788786
 ] 

Andrew Olson commented on KAFKA-7631:
-

This looks like a duplicate of KAFKA-10556 (fixed in 2.7.0).

> NullPointerException when SCRAM is allowed bu ScramLoginModule is not in 
> broker's jaas.conf
> ---
>
> Key: KAFKA-7631
> URL: https://issues.apache.org/jira/browse/KAFKA-7631
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 2.0.0, 2.5.0
>Reporter: Andras Beni
>Priority: Minor
> Attachments: KAFKA-7631.patch
>
>
> When user wants to use delegation tokens and lists {{SCRAM}} in 
> {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to 
> broker's JAAS configuration, a null pointer exception is thrown on broker 
> side and the connection is closed.
> Meaningful error message should be logged and sent back to the client.
> {code}
> java.lang.NullPointerException
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376)
> at 
> org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]

2023-11-22 Thread via GitHub


dongnuo123 commented on code in PR #14800:
URL: https://github.com/apache/kafka/pull/14800#discussion_r1402122588


##
core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala:
##
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.message.SyncGroupRequestData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.Collections
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class SyncGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testSyncGroup()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testSyncGroup()
+  }
+
+  private def testSyncGroup(): Unit = {
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- 0 to 
ApiKeys.SYNC_GROUP.latestVersion(isUnstableApiEnabled)) {
+  // Sync with unknown group id.
+  syncGroupWithOldProtocol(
+groupId = "grp-unknown",
+memberId = "member-id",
+generationId = -1,
+expectedProtocolType = null,
+expectedProtocolName = null,
+expectedError = Errors.UNKNOWN_MEMBER_ID,
+version = version.toShort
+  )
+
+  val metadata = ConsumerProtocol.serializeSubscription(
+new 
ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
+  ).array
+
+  // Join a dynamic member without member id.
+  // Prior to JoinGroup version 4, a new member is immediately added if it 
sends a join group request with UNKNOWN_MEMBER_ID.
+  val joinLeaderResponseData = sendJoinRequest(
+groupId = "grp",
+metadata = metadata
+  )
+  val leaderMemberId = joinLeaderResponseData.memberId
+
+  // Rejoin the group with the member id.
+  sendJoinRequest(
+groupId = "grp",
+memberId = leaderMemberId,
+metadata = metadata
+  )
+
+  if (version >= 5) {
+// Sync the leader with unmatched protocolName.
+syncGroupWithOldProtocol(
+  groupId = "grp",
+  memberId = leaderMemberId,
+  generationId = 1,
+  protocolName = "unmatched",
+  assignments = List(new 
SyncGroupRequestData.SyncGroupRequestAssignment()
+.setMemberId(leaderMemberId)

[jira] [Created] (KAFKA-15883) Implement RemoteCopyLagBytes

2023-11-22 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-15883:
-

 Summary: Implement RemoteCopyLagBytes
 Key: KAFKA-15883
 URL: https://issues.apache.org/jira/browse/KAFKA-15883
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov
Assignee: Christo Lolov






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Update LICENSE-binary for 3.6.1 [kafka]

2023-11-22 Thread via GitHub


mimaison merged PR #14812:
URL: https://github.com/apache/kafka/pull/14812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1822777968

   > @apoorvmittal10 :
   > 
   > 1. Do you know why JDK 11 and Scala 2.13 didn't build?
   > 2. For getting green build, it would be useful to help triage the new test 
failures. If we could identify the PR that introduced the failure, we could 
ping the author for a fix.
   
   I ll try to find out some details regarding failing tests. Build for JDK11 
sems stuck in publishing test result where the build went for more than 7 hours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15882) Scheduled nightly github actions workflow for CVE reports on published docker images

2023-11-22 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-15882:
--

 Summary: Scheduled nightly github actions workflow for CVE reports 
on published docker images
 Key: KAFKA-15882
 URL: https://issues.apache.org/jira/browse/KAFKA-15882
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


This scheduled github actions workflow will check supported published docker 
images for CVEs and generate reports.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15881) Update release.py script to include docker image

2023-11-22 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-15881:
--

 Summary: Update release.py script to include docker image
 Key: KAFKA-15881
 URL: https://issues.apache.org/jira/browse/KAFKA-15881
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


Make changes in release.py to include build and push of RC docker image to RM's 
dockerhub account and include the details in VOTE email template.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15878: KIP-768 - Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER [kafka]

2023-11-22 Thread via GitHub


jcme opened a new pull request, #14818:
URL: https://github.com/apache/kafka/pull/14818

   # Overview
   * This change pertains to [SASL/OAUTHBEARER 
](https://kafka.apache.org/documentation/#security_sasl_oauthbearer)  mechanism 
of Kafka authentication. 
   * Kafka clients can use [SASL/OAUTHBEARER 
](https://kafka.apache.org/documentation/#security_sasl_oauthbearer)   
mechanism by overriding the [custom call back 
handlers](https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod)
 . 
   * 
[KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575)
 available from v3.1  further extends the mechanism with a production grade 
implementation. 
   * Kafka's [SASL/OAUTHBEARER 
](https://kafka.apache.org/documentation/#security_sasl_oauthbearer)  mechanism 
currently **rejects the non-JWT (i.e. opaque) tokens**. This is because of a 
more restrictive set of characters than what 
[RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1) 
recommends. 
   * This JIRA can be considered an extension of 
[KIP-768](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575)
 to support the opaque tokens as well apart from the JWT tokens.

   # Solution
   * Have updated the regex in the the offending class to be compliant with the 
[RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)
   * Have provided a supporting test case that includes the possible character 
set defined in 
[RFC-6750](https://datatracker.ietf.org/doc/html/rfc6750#section-2.1)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15880) Add github actions workflow for promoting RC docker image

2023-11-22 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-15880:
--

 Summary: Add github actions workflow for promoting RC docker image
 Key: KAFKA-15880
 URL: https://issues.apache.org/jira/browse/KAFKA-15880
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


RC docker image needs to be pulled and pushed to apache/kafka through this 
github actions workflow.

We need to ensure that only PMC members can access this workflow.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15879) Add documentation for the Docker image

2023-11-22 Thread Vedarth Sharma (Jira)
Vedarth Sharma created KAFKA-15879:
--

 Summary: Add documentation for the Docker image
 Key: KAFKA-15879
 URL: https://issues.apache.org/jira/browse/KAFKA-15879
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vedarth Sharma
Assignee: Vedarth Sharma


Update quickstart with docker image details and docker section in getting 
started section



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]

2023-11-22 Thread via GitHub


lucasbru opened a new pull request, #14817:
URL: https://github.com/apache/kafka/pull/14817

   To align the new consumer completely with the legacy consumer,
   we need to update the latest seen leader epoch in the metadata
   both when we commit offsets with a leader epoch using
   `commitAsync` and when we fetch the latest committed offsets
   using `committed`.
   
   We add unit tests to `AsyncKafkaConsumer` to test that
   metadata is correctly updated. We also add a corresponding test
   for `commitSync`, which already had the leader epoch logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15803) Update last seen epoch during commit

2023-11-22 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788757#comment-17788757
 ] 

Lucas Brutschy commented on KAFKA-15803:


>From what I can see, we seem to be missing mostly is updating the epoch in 
>`commitAsync` and `committed`.

Otherwise, both implementations seem to behave the same.

> Update last seen epoch during commit
> 
>
> Key: KAFKA-15803
> URL: https://issues.apache.org/jira/browse/KAFKA-15803
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
>
> At the time we implemented commitAsync in the prototypeAsyncConsumer, 
> metadata was not there. The ask here is to investigate if we need to add the 
> following function to the commit code:
>  
> private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, 
> OffsetAndMetadata offsetAndMetadata) {
> if (offsetAndMetadata != null)
> offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER

2023-11-22 Thread Anuj Sharma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anuj Sharma updated KAFKA-15878:

Description: 
{code:java}
// code placeholder
{code}
h1. Overview
 * This issue pertains to 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
 mechanism of Kafka authentication. 
 * Kafka clients can use [SASL/OAUTHBEARER  
|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by 
overriding the [custom call back 
handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
 . 
 * 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 available from v3.1  further extends the mechanism with a production grade 
implementation. 
 * Kafka's 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
  mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
because of a more restrictive set of characters than what 
[RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
recommends. 
 * This JIRA can be considered an extension of 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 to support the opaque tokens as well apart from the JWT tokens.

 

In summary the following character set should be supported as per the RFC - 
{code:java}
1*( ALPHA / DIGIT /
   "-" / "." / "_" / "~" / "+" / "/" ) *"="
{code}
 

 

 

  was:
{code:java}
// code placeholder
{code}
h1. Overview
 * This issue pertains to 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
 mechanism of Kafka authentication. 
 * Kafka clients can use [SASL/OAUTHBEARER  
|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by 
overriding the [custom call back 
handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
 . 
 * 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 available from v3.1  further extends the mechanism with a production grade 
implementation. 
 * Kafka's 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
  mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
because of a more restrictive set of characters than what 
[RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
recommends. 
 * This JIRA can be considered an extension of 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 to support the opaque tokens as well apart from the JWT tokens.

In summary the following character set should be supported as per the RFC - 
{code:java}
1*( ALPHA / DIGIT /
   "-" / "." / "_" / "~" / "+" / "/" ) *"="
{code}
 

 

 


> KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER
> 
>
> Key: KAFKA-15878
> URL: https://issues.apache.org/jira/browse/KAFKA-15878
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Anuj Sharma
>Priority: Major
>
> {code:java}
> // code placeholder
> {code}
> h1. Overview
>  * This issue pertains to 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>  mechanism of Kafka authentication. 
>  * Kafka clients can use [SASL/OAUTHBEARER  
> |https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism 
> by overriding the [custom call back 
> handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
>  . 
>  * 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  available from v3.1  further extends the mechanism with a production grade 
> implementation. 
>  * Kafka's 
> [SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
>   mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
> because of a more restrictive set of characters than what 
> [RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
> recommends. 
>  * This JIRA can be considered an extension of 
> [KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
>  to support the opaque tokens as well apart from the JWT tokens.
>  
> In summary the following character set should be supported as per the RFC - 
> {code:java}
> 1*( ALPHA / DIGIT /
>"-" / "." / "_" / "~" / "+" / "/" ) *"="
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15878) KIP-768: Extend support for opaque (i.e. non-JWT) tokens in SASL/OAUTHBEARER

2023-11-22 Thread Anuj Sharma (Jira)
Anuj Sharma created KAFKA-15878:
---

 Summary: KIP-768: Extend support for opaque (i.e. non-JWT) tokens 
in SASL/OAUTHBEARER
 Key: KAFKA-15878
 URL: https://issues.apache.org/jira/browse/KAFKA-15878
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Anuj Sharma


{code:java}
// code placeholder
{code}
h1. Overview
 * This issue pertains to 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
 mechanism of Kafka authentication. 
 * Kafka clients can use [SASL/OAUTHBEARER  
|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]mechanism by 
overriding the [custom call back 
handlers|https://kafka.apache.org/documentation/#security_sasl_oauthbearer_prod]
 . 
 * 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 available from v3.1  further extends the mechanism with a production grade 
implementation. 
 * Kafka's 
[SASL/OAUTHBEARER|https://kafka.apache.org/documentation/#security_sasl_oauthbearer]
  mechanism currently {*}rejects the non-JWT (i.e. opaque) tokens{*}. This is 
because of a more restrictive set of characters than what 
[RFC-6750|https://datatracker.ietf.org/doc/html/rfc6750#section-2.1] 
recommends. 
 * This JIRA can be considered an extension of 
[KIP-768|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575]
 to support the opaque tokens as well apart from the JWT tokens.

In summary the following character set should be supported as per the RFC - 
{code:java}
1*( ALPHA / DIGIT /
   "-" / "." / "_" / "~" / "+" / "/" ) *"="
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1822604515

   > @apoorvmittal10 : Thanks for the updated PR. LGTM. Are the 34 test 
failures related?
   
   Thanks @junrao, the tests are not related and I ll trigger re-build to see 
if tests fail persists. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on PR #14620:
URL: https://github.com/apache/kafka/pull/14620#issuecomment-1822599777

   @xvrl Can you please re-review and let me know if fixing temporality change 
by resetting ledger in follow up PR makes sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-22 Thread via GitHub


apoorvmittal10 commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1401906005


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and 
a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for 
"connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total.
+ *
+ * 
+ *
+ * Frequencies is created with an array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it 

[jira] [Created] (KAFKA-15877) Support change of temporality in Java client

2023-11-22 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15877:
-

 Summary: Support change of temporality in Java client 
 Key: KAFKA-15877
 URL: https://issues.apache.org/jira/browse/KAFKA-15877
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Details: https://github.com/apache/kafka/pull/14620#discussion_r1401554867



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15876) Introduce Remote Storage Not Ready Exception

2023-11-22 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-15876:


 Summary: Introduce Remote Storage Not Ready Exception
 Key: KAFKA-15876
 URL: https://issues.apache.org/jira/browse/KAFKA-15876
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash


When tiered storage is enabled on the cluster, Kafka broker has to build the 
remote log metadata for all the partitions that it is either leader/follower on 
node restart. The remote log metadata is built in asynchronous fashion and does 
not interfere with the broker startup path. Once the broker becomes online, it 
cannot handle the client requests (FETCH and LIST_OFFSETS) to access remote 
storage until the metadata gets built for those partitions. Currently, we are 
returning a ReplicaNotAvailable exception back to the client so that it will 
retry after sometime.

[ReplicaNotAvailableException|https://sourcegraph.com/github.com/apache/kafka@254335d24ab6b6d13142dcdb53fec3856c16de9e/-/blob/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java]
 is applicable when there is a reassignment is in-progress and kind of 
deprecated with the NotLeaderOrFollowerException 
([PR#8979|https://github.com/apache/kafka/pull/8979]). It's good to introduce 
an appropriate retriable exception for remote storage errors to denote that it 
is not ready to accept the client requests yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-22 Thread via GitHub


dajac commented on PR #14758:
URL: https://github.com/apache/kafka/pull/14758#issuecomment-1822554162

   @AndrewJSchofield We are about to merge 
https://github.com/apache/kafka/pull/14781. Is it possible to build this one on 
top of it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update LICENSE-binary for 3.6.1 [kafka]

2023-11-22 Thread via GitHub


showuon commented on code in PR #14812:
URL: https://github.com/apache/kafka/pull/14812#discussion_r1401875110


##
LICENSE-binary:
##
@@ -205,7 +205,7 @@
 This project bundles some components that are also licensed under the Apache
 License Version 2.0:
 
-audience-annotations-0.13.0
+audience-annotations-0.12.0

Review Comment:
   Yes, it's quite surprised that we need to downgrade it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]

2023-11-22 Thread via GitHub


cadonna commented on code in PR #14758:
URL: https://github.com/apache/kafka/pull/14758#discussion_r1401760510


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -277,8 +294,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  @Test
-  def testAutoCommitOnCloseAfterWakeup(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // close() is not committing 
offsets in consumer group protocol

Review Comment:
   This is just a transient state, isn't it? At least 
https://issues.apache.org/jira/browse/KAFKA-15327 says that committing on close 
is planned. If KAFKA-15327 is still valid, can we formulate this comment 
accordingly like `close() is not committing offsets in consumer group protocol 
for now but it should when implementation is complete`.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol

Review Comment:
   I guess the `ConsumerRebalanceListener` is also something that will be 
supported in the future by the consumer group protocol. 



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -537,33 +574,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 awaitAssignment(consumer, shrunkenAssignment)
   }
 
-  @Test
-  def testPartitionsFor(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsFor(groupProtocol: String): Unit = {
 val numParts = 2
 createTopic("part-test", numParts, 1)
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 val parts = consumer.partitionsFor("part-test")
 assertNotNull(parts)
 assertEquals(2, parts.size)
   }
 
-  @Test
-  def testPartitionsForAutoCreate(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol
+  def testPartitionsForAutoCreate(groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 val consumer = createConsumer()
 // First call would create the topic
 consumer.partitionsFor("non-exist-topic")
 val partitions = consumer.partitionsFor("non-exist-topic")
 assertFalse(partitions.isEmpty)
   }
 
-  @Test
-  def testPartitionsForInvalidTopic(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // partitionsFor not supported for 
consumer group protocol

Review Comment:
   See my comment above about the functionality being supported in future.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 runMultiConsumerSessionTimeoutTest(false)
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnClose(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnClose(groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 runMultiConsumerSessionTimeoutTest(true)
   }
 
-  @Test
-  def testInterceptors(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // Consumer interceptors not 
implemented for consumer group protocol

Review Comment:
   Will also interceptors be supported?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1137,18 +1202,23 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
-  @Test
-  def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("generic")) // ConsumerRebalanceListener not 
supported for consumer group protocol
+  def testMultiConsumerSessionTimeoutOnStopPolling(groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol)
 

Re: [PR] KAFKA-15875 Make Snapshot public [kafka]

2023-11-22 Thread via GitHub


jlprat commented on code in PR #14816:
URL: https://github.com/apache/kafka/pull/14816#discussion_r1401847506


##
server-common/src/main/java/org/apache/kafka/timeline/Snapshot.java:
##
@@ -24,10 +24,10 @@
  * A snapshot of some timeline data structures.
  * 
  * The snapshot contains historical data for several timeline data structures.
- * We use an IdentityHashMap to store this data.  This way, we can easily drop 
all of
+ * We use an IdentityHashMap to store this data.  This way, we can easily drop 
all
  * the snapshot data.
  */
-class Snapshot {
+public class Snapshot {

Review Comment:
   Class is now public, but only read methods are public, so no class from 
outside the package can tamper with the data in this data structure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15875 Make Snapshot public [kafka]

2023-11-22 Thread via GitHub


jlprat opened a new pull request, #14816:
URL: https://github.com/apache/kafka/pull/14816

   Makes Snapshop, Revertable and Delta classes public. Only read methods are 
made public as well.
   
   Snapshot, Revertable and Delta are package protected but they potentially 
leak to classes in other packages 
(org.apache.kafka.controller.OffsetControlManager).
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]

2023-11-22 Thread via GitHub


dajac commented on code in PR #14800:
URL: https://github.com/apache/kafka/pull/14800#discussion_r1401815563


##
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##
@@ -37,6 +37,12 @@ public JoinGroupResponse(JoinGroupResponseData data, short 
version) {
 if (version < 7 && data.protocolName() == null) {
 data.setProtocolName("");
 }
+
+// If nullable string for the protocol name is supported,
+// we set empty string to be null to ensure compliance.
+if (version >= 7 && data.protocolName() != null && 
data.protocolName().isEmpty()) {
+data.setProtocolName(null);
+}

Review Comment:
   Should we add a unit test in JoinGroupResponseTest to cover this change?



##
core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala:
##
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.message.SyncGroupRequestData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.Collections
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class SyncGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testSyncGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testSyncGroup()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testSyncGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testSyncGroup()
+  }
+
+  private def testSyncGroup(): Unit = {
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- 0 to 
ApiKeys.SYNC_GROUP.latestVersion(isUnstableApiEnabled)) {
+  // Sync with unknown group id.
+  syncGroupWithOldProtocol(
+groupId = "grp-unknown",
+memberId = "member-id",
+generationId = -1,
+expectedProtocolType = null,
+expectedProtocolName = null,
+expectedError = Errors.UNKNOWN_MEMBER_ID,
+version = version.toShort
+  )
+
+  val metadata = ConsumerProtocol.serializeSubscription(
+new 
ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
+  ).array
+
+  // Join a dynamic member without member id.
+  // Prior to JoinGroup version 4, a new member is immediately added if it 
sends a join group request with UNKNOWN_MEMBER_ID.
+  val joinLeaderResponseData = sendJoinRequest(
+

Re: [PR] KAFKA-15756: [1/2] Migrate existing integration tests to run old protocol in new coordinator [kafka]

2023-11-22 Thread via GitHub


dajac commented on code in PR #14781:
URL: https://github.com/apache/kafka/pull/14781#discussion_r1401788044


##
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##
@@ -67,6 +67,10 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 if (isZkMigrationTest()) {
   cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true"))
 }
+if (isNewGroupCoordinatorEnabled()) {
+  cfgs.foreach(_.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, 
"true"))

Review Comment:
   We could actually remove this now that 
https://github.com/apache/kafka/commit/7826d5fc8ab695a5ad927338469ddc01b435a298 
was merged.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -547,23 +564,29 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 assertEquals(2, parts.size)
   }
 
-  @Test
-  def testPartitionsForAutoCreate(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft", "kraft+kip848"))
+  def testPartitionsForAutoCreate(quorum: String): Unit = {
 val consumer = createConsumer()
 // First call would create the topic
 consumer.partitionsFor("non-exist-topic")
-val partitions = consumer.partitionsFor("non-exist-topic")
-assertFalse(partitions.isEmpty)
+var partitions = consumer.partitionsFor("non-exist-topic")
+TestUtils.waitUntilTrue(() => {
+  partitions = consumer.partitionsFor("non-exist-topic")
+  !partitions.isEmpty
+}, s"Timed out while awaiting non empty partitions.")

Review Comment:
   nit: We don't really need `var partitions` here. How about?
   
   ```
   TestUtils.waitUntilTrue(() => {
   !consumer.partitionsFor("non-exist-topic").isEmpty
   }, s"Timed out while awaiting non empty partitions.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2023-11-22 Thread via GitHub


appchemist opened a new pull request, #14815:
URL: https://github.com/apache/kafka/pull/14815

   KRaft support in LeaderEpochIntegrationTest
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15869) Document semantics of nullable nested API entities

2023-11-22 Thread Anton Agestam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788698#comment-17788698
 ] 

Anton Agestam commented on KAFKA-15869:
---

Sorry for spamming here, I found the thread now in the non-Pony UI: 
https://www.mail-archive.com/dev@kafka.apache.org/msg127971.html

> Document semantics of nullable nested API entities
> --
>
> Key: KAFKA-15869
> URL: https://issues.apache.org/jira/browse/KAFKA-15869
> Project: Kafka
>  Issue Type: Wish
>Reporter: Anton Agestam
>Priority: Minor
>
> The initial version of ConsumerGroupHeartbeatResponse [introduced the first 
> field across the protocol that is a nullable nested 
> entity|https://github.com/dajac/kafka/blob/3acd87a3e82e1d2fd4c07218d362e7665b99c547/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json#L48].
>  As the implementor of a third-party schema parser it is not clear how to 
> handle this field, where such fields are allowed, and how null is represented 
> for such fields.
> As far as I can tell, the [protocol 
> guide|https://kafka.apache.org/protocol.html#The_Messages_ConsumerGroupHeartbeat]
>  does not mention the nullability at all.
> The reason I ask where such fields are allowed is because if the answer to 
> how null is represented here is just omitting writing any bytes, then I 
> suspect the only unambiguous place for such field to appear would be as the 
> last field of a top-level entity. Even then, how is it discriminated from 
> tagged fields?
> Is it possible this field was made nullable by mistake?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15869) Document semantics of nullable nested API entities

2023-11-22 Thread Anton Agestam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788694#comment-17788694
 ] 

Anton Agestam commented on KAFKA-15869:
---

[~dajac] The discussion link on that KIP looks wrong (presumably a left-over 
from a KIP template?), and neither Google or the mail archive search is being 
helpful. Do you happen to know if there's a discussion thread for it?

> Document semantics of nullable nested API entities
> --
>
> Key: KAFKA-15869
> URL: https://issues.apache.org/jira/browse/KAFKA-15869
> Project: Kafka
>  Issue Type: Wish
>Reporter: Anton Agestam
>Priority: Minor
>
> The initial version of ConsumerGroupHeartbeatResponse [introduced the first 
> field across the protocol that is a nullable nested 
> entity|https://github.com/dajac/kafka/blob/3acd87a3e82e1d2fd4c07218d362e7665b99c547/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json#L48].
>  As the implementor of a third-party schema parser it is not clear how to 
> handle this field, where such fields are allowed, and how null is represented 
> for such fields.
> As far as I can tell, the [protocol 
> guide|https://kafka.apache.org/protocol.html#The_Messages_ConsumerGroupHeartbeat]
>  does not mention the nullability at all.
> The reason I ask where such fields are allowed is because if the answer to 
> how null is represented here is just omitting writing any bytes, then I 
> suspect the only unambiguous place for such field to appear would be as the 
> last field of a top-level entity. Even then, how is it discriminated from 
> tagged fields?
> Is it possible this field was made nullable by mistake?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]

2023-11-22 Thread via GitHub


dajac merged PR #14582:
URL: https://github.com/apache/kafka/pull/14582


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15699) MirrorConnectorsIntegrationBaseTest is flaky

2023-11-22 Thread Ashwin Pankaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Pankaj reassigned KAFKA-15699:
-

Assignee: Ashwin Pankaj

> MirrorConnectorsIntegrationBaseTest is flaky
> 
>
> Key: KAFKA-15699
> URL: https://issues.apache.org/jira/browse/KAFKA-15699
> Project: Kafka
>  Issue Type: Bug
>Reporter: Calvin Liu
>Assignee: Ashwin Pankaj
>Priority: Major
>
> It may relate to inappropriate test timeout
> testReplicateSourceDefault()
> {code:java}
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 
> different, because it's in exclude filter! ==> expected: not equal but was: 
> <8640>
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testReplicateSourceDefault$9(MirrorConnectorsIntegrationBaseTest.java:826){code}
> testOffsetSyncsTopicsOnTarget() 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
> deadlineMs=1698275006778, tries=1, nextAllowedTryMs=1698275715972) timed out 
> at 1698275715878 after 1 attempt(s)at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149)
>   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15875) Snapshot class is package protected but returned in public methods

2023-11-22 Thread Josep Prat (Jira)
Josep Prat created KAFKA-15875:
--

 Summary: Snapshot class is package protected but returned in 
public methods
 Key: KAFKA-15875
 URL: https://issues.apache.org/jira/browse/KAFKA-15875
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.6.0
Reporter: Josep Prat
Assignee: Josep Prat


org.apache.kafka.timeline.Snapshot class is package protected but it is part of 
the public API of org.apache.kafka.timeline.SnapshotRegistry. This might cause 
compilation errors if we ever try to assign the returned object of these 
methods to a variable.

org.apache.kafka.controller.OffsetControlManager is calling SnapshotRegistry's 
methods that return a Snapshot and OffsetControlManager is in another package.

 

The SnapshotRegistry class seems to not be public API so I don't think this 
needs a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)