[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-08 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1222556934


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,14 +686,15 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception, 
InvalidStateDetectionStrategy.BACKGROUND);

Review Comment:
   I think not, it will be ignored because of the UNINITIALIZED state, snippet 
from transitionTo:
   ```
   else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
   if (error == null)
   throw new IllegalArgumentException("Cannot transition to " + 
target + " with a null exception");
   lastError = error;
   }
   ```
   If it's ignored, do we need to log it somewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-06-08 Thread Steven Booke (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730436#comment-17730436
 ] 

Steven Booke commented on KAFKA-14995:
--

[~vvcephei] Hello John, this will be my first time contributing and I would 
like to assign myself to this ticket but I am unable to do so. Could you assign 
it to me please?

> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-08 Thread via GitHub


showuon commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1222617381


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2291,6 +2289,8 @@ public void testDeleteRecords() throws Exception {
 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));

Review Comment:
   Before looking into the implementation, I don't think this test is testing 
what we expected. The original comment is commenting around this lines:
   
https://github.com/apache/kafka/pull/7296/files#diff-5422d10d9a7f4776c6538ae3aea27f24e94cf4ecf5e752040125aca6edc795d3R3671-R3682
   
   And it said, when metadata response (mr) contains error, we just fail the 
future without retry. 
   
   The point is, we want to retry `metadataResponse`, not 
`deleteRecordResponse`. What I expected is tests like this:
   
   
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L5431C11-L5433
   
   Please update the test and make sure it passed. 
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-08 Thread via GitHub


showuon commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1222617381


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -2291,6 +2289,8 @@ public void testDeleteRecords() throws Exception {
 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));

Review Comment:
   Before looking into the implementation, I don't think this test is testing 
what we expected. The original comment is commenting around this lines:
   
https://github.com/apache/kafka/pull/7296/files#diff-5422d10d9a7f4776c6538ae3aea27f24e94cf4ecf5e752040125aca6edc795d3R3671-R3682
   
   And it said, when metadata response (mr) contains error, we just fail the 
future without retry. 
   
   And here, the existing metadataResponse also fail without retry: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2969-L2972
   
   The point is, we want to retry `metadataResponse`, not 
`deleteRecordResponse`. What I expected is tests like this:
   
   
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L5431C11-L5433
   
   Please update the test and make sure it passed. 
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1222638839


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {
+public final MockScheduler scheduler;
 
-  val scheduler = new MockScheduler(this)
+public MockTime() {
+this(System.currentTimeMillis(), System.nanoTime());

Review Comment:
   please consider using `Time.SYSTEM`



##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {

Review Comment:
   My preference would be to rename this class. Having two MockTime is super 
confusing. 



##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {

Review Comment:
   should this implement AutoCloseable so that we can shutdown the scheduler 
correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13650: KAFKA-14709: Move content in connect/mirror/README.md to the docs

2023-06-08 Thread via GitHub


showuon commented on code in PR #13650:
URL: https://github.com/apache/kafka/pull/13650#discussion_r1222665546


##
connect/mirror/README.md:
##
@@ -1,297 +0,0 @@
-
-# MirrorMaker 2.0
-
-MM2 leverages the Connect framework to replicate topics between Kafka
-clusters. MM2 includes several new features, including:
-
- - both topics and consumer groups are replicated
- - topic configuration and ACLs are replicated
- - cross-cluster offsets are synchronized
- - partitioning is preserved
-
-## Replication flows
-
-MM2 replicates topics and consumer groups from upstream source clusters
-to downstream target clusters. These directional flows are notated
-`A->B`.
-
-It's possible to create complex replication topologies based on these
-`source->target` flows, including:
-
- - *fan-out*, e.g. `K->A, K->B, K->C`
- - *aggregation*, e.g. `A->K, B->K, C->K`
- - *active/active*, e.g. `A->B, B->A`
-
-Each replication flow can be configured independently, e.g. to replicate
-specific topics or groups:
-
-A->B.topics = topic-1, topic-2
-A->B.groups = group-1, group-2
-
-By default, all topics and consumer groups are replicated (except
-excluded ones), across all enabled replication flows. Each
-replication flow must be explicitly enabled to begin replication:
-
-A->B.enabled = true
-B->A.enabled = true
-
-## Starting an MM2 process
-
-You can run any number of MM2 processes as needed. Any MM2 processes
-which are configured to replicate the same Kafka clusters will find each
-other, share configuration, load balance, etc.
-
-To start an MM2 process, first specify Kafka cluster information in a
-configuration file as follows:
-
-# mm2.properties
-clusters = us-west, us-east
-us-west.bootstrap.servers = host1:9092
-us-east.bootstrap.servers = host2:9092
-
-You can list any number of clusters this way.
-
-Optionally, you can override default MirrorMaker properties:
-
-topics = .*
-groups = group1, group2
-emit.checkpoints.interval.seconds = 10
-
-These will apply to all replication flows. You can also override default
-properties for specific clusters or replication flows:
-
-# configure a specific cluster
-us-west.offset.storage.topic = mm2-offsets
-
-# configure a specific source->target replication flow
-us-west->us-east.emit.heartbeats = false
-
-Next, enable individual replication flows as follows:
-
-us-west->us-east.enabled = true # disabled by default
-
-Finally, launch one or more MirrorMaker processes with the 
`connect-mirror-maker.sh`
-script:
-
-$ ./bin/connect-mirror-maker.sh mm2.properties
-
-## Multicluster environments
-
-MM2 supports replication between multiple Kafka clusters, whether in the
-same data center or across multiple data centers. A single MM2 cluster
-can span multiple data centers, but it is recommended to keep MM2's producers
-as close as possible to their target clusters. To do so, specify a subset
-of clusters for each MM2 node as follows:
-
-# in west DC:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
-
-This signals to the node that the given clusters are nearby, and prevents the
-node from sending records or configuration to clusters in other data centers.
-
-### Example
-
-Say there are three data centers (west, east, north) with two Kafka
-clusters in each data center (west-1, west-2 etc). We can configure MM2
-for active/active replication within each data center, as well as cross data
-center replication (XDCR) as follows:
-
-# mm2.properties
-clusters: west-1, west-2, east-1, east-2, north-1, north-2
-
-west-1.bootstrap.servers = ...
----%<---
-
-# active/active in west
-west-1->west-2.enabled = true
-west-2->west-1.enabled = true
-
-# active/active in east
-east-1->east-2.enabled = true
-east-2->east-1.enabled = true
-
-# active/active in north
-north-1->north-2.enabled = true
-north-2->north-1.enabled = true
-
-# XDCR via west-1, east-1, north-1
-west-1->east-1.enabled = true
-west-1->north-1.enabled = true
-east-1->west-1.enabled = true
-east-1->north-1.enabled = true
-north-1->west-1.enabled = true
-north-1->east-1.enabled = true
-
-Then, launch MM2 in each data center as follows:
-
-# in west:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
-
-# in east:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2
-
-# in north:
-$ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2
-
-With this configuration, records produced to any cluster will be replicated
-within the data center, as well as across to other data centers. By providing
-the `--clusters` parameter, we ensure that each node only produces records to
-nearby clusters.
-
-N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between

[GitHub] [kafka] urbandan commented on pull request #13796: KAFKA-14034 Idempotent producer should wait for preceding in-flight b…

2023-06-08 Thread via GitHub


urbandan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1582188829

   > I guess I just need to clarify what retried batches are here -- is the 
idea that we wait for inflight batches to return a response or time out? What 
if the response triggers another retry? Would we prevent that from sending out?
   
   The core idea is that we let each of the in-flight batches complete, even if 
they need multiple retries. This would allow the producer to
   1. Avoid inconsistency - by letting in-flight batches finish, we do not run 
the risk of overwriting their sequence number while we are still not sure if 
they were appended or not.
   2. Operate with best-effort - when using an idempotent producer, and 
encountering an error, it is costly to verify if a message was appended to the 
log or not (I think the "official" suggestion is to consume the topic to 
verify). By letting the in-flight batches finish, the idempotent producer will 
report fewer false positive errors.
   
   
   > I'm also wondering the benefit of preserving the previous batches if there 
is an error. How does the system recover differently if we allow those batches 
to "complete". I think we could run into cases where the error causes the 
inflight batches to be unable to be written. Do we prefer to fail them (what we 
may do with this change) and start clean or try to write them with new 
sequences? I can see both scenarios causing issues.
   
   I believe that produce errors should be handled separately, and should not 
cascade to other batches. I think most errors do not really predict the result 
of other produce requests.
   
   > I guess it boils down to availability of writes (rewriting the sequences 
allows us to continue writing) or idempotency correctness (trying to wait for 
them to complete with their old sequences). The sticking point I'm running into 
is why getting those extra inflight requests (potentially) written is better if 
we've hit a non-retriable error.
   
   My understanding is that here correctness beats availability. Are you 
suggesting that we should just cancel in-flight batches when encountering an 
error?
   
   > Maybe I just need an example :)
   
   I will try to write up some examples, and also write more unit tests to 
demonstrate those scenarios.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-08 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data 
to partition 4 in several minutes and consumer consume data through 
@KafkaListener , and one consumer consume the data from the partition.

 

 

  was:
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when i consumer kafka through 
@KafkaListener in that case producer publish 100W data to partition 4 in 
several minutes, and one consumer consume the data from the partition.

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I  got ?? "Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W 
> data to partition 4 in several minutes and consumer consume data through 
> @KafkaListener , and one consumer consume the data from the partition.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15074) offset out of range for partition xxx, resetting offset

2023-06-08 Thread YaYun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YaYun Wang updated KAFKA-15074:
---
Description: 
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data 
to partition 4 in several minutes and consumer consume data through 
@KafkaListener , just one consumer exists.

 

 

  was:
I  got ?? "Fetch position FetchPosition{offset=42574305, 
offsetEpoch=Optional[2214], 
currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
cn-north-1d)], epoch=2214}} is out of range for partition 
vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W data 
to partition 4 in several minutes and consumer consume data through 
@KafkaListener , and one consumer consume the data from the partition.

 

 


> offset out of range for partition xxx, resetting offset
> ---
>
> Key: KAFKA-15074
> URL: https://issues.apache.org/jira/browse/KAFKA-15074
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.2
>Reporter: YaYun Wang
>Priority: Major
>
> I  got ?? "Fetch position FetchPosition{offset=42574305, 
> offsetEpoch=Optional[2214], 
> currentLeader=LeaderAndEpoch{leader=Optional[host:port (id: 2 rack: 
> cn-north-1d)], epoch=2214}} is out of range for partition 
> vcc.hdmap.tile.delivery-4, resetting offset " ??when producer publish 100W 
> data to partition 4 in several minutes and consumer consume data through 
> @KafkaListener , just one consumer exists.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ashwinpankaj commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-08 Thread via GitHub


ashwinpankaj commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1222706232


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+secondaryStore.set(values, (secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+}
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
-if (secondaryStore != null) {
+// Secondary store writes have already happened for tombstone 
records

Review Comment:
   +1 we can ensure that the secondary write has already been attempted via a 
`CompletableFuture` set in the callback of `secondaryStore.set()` 



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);

Review Comment:
   We should do this only if `connectorStore.isPresent()` else we will always 
end up scanning the map for tombstones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-08 Thread via GitHub


mimaison commented on code in PR #13760:
URL: https://github.com/apache/kafka/pull/13760#discussion_r1222667563


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.admin.DeletedRecords;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import 
org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.Batched;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.DeleteRecordsRequestData;
+import org.apache.kafka.common.message.DeleteRecordsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public final class DeleteRecordsHandler extends Batched {
+
+private final Map recordsToDelete;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DeleteRecordsHandler(
+Map recordsToDelete,
+LogContext logContext
+) {
+this.recordsToDelete = recordsToDelete;
+this.log = logContext.logger(DeleteRecordsHandler.class);
+this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+}
+
+@Override
+public String apiName() {
+return "deleteRecords";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return this.lookupStrategy;
+}
+
+public static SimpleAdminApiFuture 
newFuture(
+Collection topicPartitions
+) {
+return AdminApiFuture.forKeys(new HashSet<>(topicPartitions));
+}
+
+@Override
+public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
+Map 
deletionsForTopic = new HashMap<>();
+for (Map.Entry entry: 
recordsToDelete.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.get(topicPartition.topic());
+if (deleteRecords == null) {
+deleteRecords = new 
DeleteRecordsRequestData.DeleteRecordsTopic()
+.setName(topicPartition.topic());
+deletionsForTopic.put(topicPartition.topic(), deleteRecords);
+}
+deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
+.setPartitionIndex(topicPartition.partition())
+.setOffset(entry.getValue().beforeOffset()));
+
+System.out.println("Partitions: " + deleteRecords.partitions());

Review Comment:
   Let's remove this debugging statement



##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the Lic

[GitHub] [kafka] jlprat commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-08 Thread via GitHub


jlprat commented on code in PR #13827:
URL: https://github.com/apache/kafka/pull/13827#discussion_r1222732234


##
.github/workflows/stale.yml:
##
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: 'Close stale issues and PRs'
+on:
+  #schedule:
+  #  - cron: '30 3 * * *'
+  workflow_dispatch:
+inputs:
+  dryRun:
+description: 'Dry Run'
+required: true
+default: true
+type: boolean
+  operationsPerRun:
+description: 'Max GitHub API operations'
+required: true
+default: 30
+type: number
+
+permissions:
+  issues: write
+  pull-requests: write
+
+jobs:
+  stale:
+runs-on: ubuntu-latest
+steps:
+  - uses: actions/stale@v8
+with:
+  debug-only: ${{ inputs.dryRun }}
+  operations-per-run: ${{ inputs.operationsPerRun }}
+  days-before-stale: 90
+  days-before-close: -1
+  stale-issue-label: 'stale'

Review Comment:
   I would probably add the message there:
   `stale-pr-message: 'This PR is stale because it has been open 90 days with 
no activity.'`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-08 Thread via GitHub


showuon commented on PR #13807:
URL: https://github.com/apache/kafka/pull/13807#issuecomment-1582296785

   @satishd , FYI
   
   
   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(String).quorum=zk
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.PlaintextAdminIntegrationTest.testCreateDeleteTopics()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testAbortTransactionTimeout(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testDescribeQuorumRequestToBrokers()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] 
Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-08 Thread via GitHub


showuon opened a new pull request, #13828:
URL: https://github.com/apache/kafka/pull/13828

   add "remote.log.metadata.manager.listener.name" config to rlmm to allow 
producer/consumer to connect to the server. Also add tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge commented on pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-08 Thread via GitHub


tinaselenge commented on PR #13760:
URL: https://github.com/apache/kafka/pull/13760#issuecomment-1582411061

   @divijvaidya @showuon @mimaison Thank you very much for reviewing the PR!
   
   I believe I have addressed the comments now. Please let me know if I have 
missed anything. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


cadonna opened a new pull request, #13829:
URL: https://github.com/apache/kafka/pull/13829

   Fixes a bug regarding the state updater where tasks that experience 
corruption during restoration are passed from the state updater to the stream 
thread for closing and reviving but then the revived tasks are not re-added to 
the state updater.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


lucasbru commented on code in PR #13829:
URL: https://github.com/apache/kafka/pull/13829#discussion_r1222946279


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 
 task.addPartitionsForOffsetReset(assignedToPauseAndReset);
 }
+if (stateUpdater != null) {
+tasks.removeTask(task);

Review Comment:
   Any particular reason why we remove the task from `tasks` before reviving 
it? It would seem cleaner to me to basically remove from stream thread / add 
back to state updater directly in one block, which makes it easier to track 
ownership.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1970,6 +1970,29 @@ public void 
shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
 assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
 }
 
+@Test
+public void shouldReAddRevivedTasksToStateUpdater() {
+final StreamTask corruptedActiveTask = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId03Partitions).build();
+final StandbyTask corruptedStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+.inState(State.RUNNING)
+.withInputPartitions(taskId02Partitions).build();
+final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+when(tasks.task(taskId03)).thenReturn( corruptedActiveTask);
+when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask);
+expect(consumer.assignment()).andReturn(emptySet());
+replay(consumer);
+
+taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), 
corruptedStandbyTask.id()));
+
+Mockito.verify(tasks).removeTask(corruptedActiveTask);

Review Comment:
   Do we want to validate the Task state as well?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1970,6 +1970,29 @@ public void 
shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
 assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
 }
 
+@Test
+public void shouldReAddRevivedTasksToStateUpdater() {
+final StreamTask corruptedActiveTask = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId03Partitions).build();
+final StandbyTask corruptedStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+.inState(State.RUNNING)
+.withInputPartitions(taskId02Partitions).build();
+final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+when(tasks.task(taskId03)).thenReturn( corruptedActiveTask);

Review Comment:
   Awkward space after `(`, also in the line below, maybe use automatic 
formatting for the changed code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


cadonna commented on code in PR #13829:
URL: https://github.com/apache/kafka/pull/13829#discussion_r1222962171


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 
 task.addPartitionsForOffsetReset(assignedToPauseAndReset);
 }
+if (stateUpdater != null) {
+tasks.removeTask(task);

Review Comment:
   Yes! The reason is that `removeTask()` verifies that the state of the task 
is `CLOSED` and throws an exception when it is not. I did not want to change 
the check and lose that guard.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


cadonna commented on code in PR #13829:
URL: https://github.com/apache/kafka/pull/13829#discussion_r1222976522


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1970,6 +1970,29 @@ public void 
shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
 assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
 }
 
+@Test
+public void shouldReAddRevivedTasksToStateUpdater() {
+final StreamTask corruptedActiveTask = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId03Partitions).build();
+final StandbyTask corruptedStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+.inState(State.RUNNING)
+.withInputPartitions(taskId02Partitions).build();
+final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+when(tasks.task(taskId03)).thenReturn( corruptedActiveTask);
+when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask);
+expect(consumer.assignment()).andReturn(emptySet());
+replay(consumer);
+
+taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), 
corruptedStandbyTask.id()));
+
+Mockito.verify(tasks).removeTask(corruptedActiveTask);

Review Comment:
   The task is a mock. The state of the task is specified by the test. However, 
I added verifications for the method calls that change the state of the task. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13712: KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito

2023-06-08 Thread via GitHub


cadonna commented on code in PR #13712:
URL: https://github.com/apache/kafka/pull/13712#discussion_r1222986401


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3694,12 +3694,12 @@ public Map 
prepareCommit() {
 
 @Test
 public void shouldSendPurgeData() {
-resetToStrict(adminClient);
-expect(adminClient.deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(5L
-.andReturn(new DeleteRecordsResult(singletonMap(t1p1, 
completedFuture(;
-expect(adminClient.deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(17L
-.andReturn(new DeleteRecordsResult(singletonMap(t1p1, 
completedFuture(;
-replay(adminClient);
+when(adminClient.deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(5L
+.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, 
completedFuture(;
+when(adminClient.deleteRecords(singletonMap(t1p1, 
RecordsToDelete.beforeOffset(17L
+.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, 
completedFuture(;
+
+InOrder inOrder = Mockito.inOrder(adminClient);

Review Comment:
   ```suggestion
   final InOrder inOrder = Mockito.inOrder(adminClient);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


lucasbru commented on code in PR #13829:
URL: https://github.com/apache/kafka/pull/13829#discussion_r1222992405


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1970,6 +1970,29 @@ public void 
shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
 assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
 }
 
+@Test
+public void shouldReAddRevivedTasksToStateUpdater() {
+final StreamTask corruptedActiveTask = statefulTask(taskId03, 
taskId03ChangelogPartitions)
+.inState(State.RESTORING)
+.withInputPartitions(taskId03Partitions).build();
+final StandbyTask corruptedStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+.inState(State.RUNNING)
+.withInputPartitions(taskId02Partitions).build();
+final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+when(tasks.task(taskId03)).thenReturn( corruptedActiveTask);
+when(tasks.task(taskId02)).thenReturn( corruptedStandbyTask);
+expect(consumer.assignment()).andReturn(emptySet());
+replay(consumer);
+
+taskManager.handleCorruption(mkSet(corruptedActiveTask.id(), 
corruptedStandbyTask.id()));
+
+Mockito.verify(tasks).removeTask(corruptedActiveTask);

Review Comment:
   Yes, that's what I meant. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13829: KAFKA-10199: Re-add revived tasks to the state updater after handling

2023-06-08 Thread via GitHub


lucasbru commented on code in PR #13829:
URL: https://github.com/apache/kafka/pull/13829#discussion_r1222994406


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -294,7 +297,13 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 
 task.addPartitionsForOffsetReset(assignedToPauseAndReset);
 }
+if (stateUpdater != null) {
+tasks.removeTask(task);

Review Comment:
   Got it. Could make sense to add a little comment, but I won't insist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-08 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1222869378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, 
Map connector
 connector = plugins.newConnector(connectorClassOrAlias);
 if (ConnectUtils.isSinkConnector(connector)) {
 log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
-alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
 } else {
 log.debug("Altering offsets for source connector: {}", 
connName);
-alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Reset a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be reset
+ * @param connectorConfig the connector's configurations
+ * @param cb callback to invoke upon completion
+ */
+public void resetConnectorOffsets(String connName, Map 
connectorConfig, Callback cb) {

Review Comment:
   > If we're worried about accidentally introducing a nasty bug where an 
empty-bodied alter request causes an unintentional reset, we can add an 
integration test for that case.
   
   Yeah, this was exactly my worry and the reason why I'd kept them separated. 
Based on your feedback, I've added a new integration test and also moved the 
second level check to `AbstractHerder::alterConnectorOffsets` (so that we can 
consolidate the two methods in the `Worker`). While we could in theory do a 
similar consolidation for the `Herder` methods, I think it's probably a better 
idea to have cleaner and more well-defined interface methods there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1223073346


##
server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SystemTimer implements Timer {
+// timeout timer
+private final ExecutorService taskExecutor;
+private final DelayQueue delayQueue;
+private final AtomicInteger taskCounter;
+private final TimingWheel timingWheel;
+
+// Locks used to protect data structures while ticking
+private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock();
+private final ReentrantReadWriteLock.ReadLock readLock = 
readWriteLock.readLock();
+private final ReentrantReadWriteLock.WriteLock writeLock = 
readWriteLock.writeLock();
+
+public SystemTimer(String executorName) {
+this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
+}
+
+public SystemTimer(
+String executorName,
+long tickMs,
+int wheelSize,
+long startMs
+) {
+this.taskExecutor = Executors.newFixedThreadPool(1,
+runnable -> KafkaThread.nonDaemon("executor-" + executorName, 
runnable));
+this.delayQueue = new DelayQueue<>();
+this.taskCounter = new AtomicInteger(0);
+this.timingWheel = new TimingWheel(
+tickMs,
+wheelSize,
+startMs,
+taskCounter,
+delayQueue
+);
+}
+
+public void add(TimerTask timerTask) {
+readLock.lock();

Review Comment:
   we are using this pattern of locking very often now (also used in TS related 
code for leader epoch snapshot). Could we perhaps extract this out into a 
utility which takes incoming lambda and executes it in a lock, similar to 
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/utils/CoreUtils.scala#L182



##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;
+// timestamp in millisecond
+public final long delayMs;
+
+public TimerTask(long delayMs) {
+this.delayMs = delayMs;
+}
+
+public void cancel() {
+synchronized (this) {

Review Comment:
   you can move `synchronized` to the method signature above?
   ```
   public synchronized a() {
   }
   ```
   is equivalent to 
   ```
   public a() {
  synchronized(this) {
  }
   }
   ```



##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for addition

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {
+rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());

Review Comment:
   please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG`



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {

Review Comment:
   missing property `cluster.id` as per 
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -280,7 +281,8 @@ class KafkaServer(
 _brokerState = BrokerState.RECOVERY
 logManager.startup(zkClient.getAllTopicsInCluster())
 
-remoteLogManager = createRemoteLogManager(config)
+val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)

Review Comment:
   s/remoteLogManager/remoteLogManagerOpt



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -504,6 +506,13 @@ class KafkaServer(
   new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
 KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
+remoteLogManager.foreach(rlm => {
+  val listenerName = 
ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+  val endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
+.orElse(Some(brokerInfo.broker.endPoints.head))

Review Comment:
   this means that endpoint will never be optional (since we are picking up the 
first broker endpoint when it's not configrued). Right? In that case, can we 
make it mandatory please in RemoteLogManager?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {
+rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
+rlmmProps.put("security.protocol", e.securityProtocol().name);

Review Comment:
   please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -280,7 +281,8 @@ class KafkaServer(
 _brokerState = BrokerState.RECOVERY
 logManager.startup(zkClient.getAllTopicsInCluster())
 
-remoteLogManager = createRemoteLogManager(config)
+val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   should we create this only when 
`remoteLogManagerConfig.enableRemoteStorageSystem()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-08 Thread via GitHub


mumrah commented on PR #13827:
URL: https://github.com/apache/kafka/pull/13827#issuecomment-1582676925

   Thanks for taking a look @jlprat! 
   
   For the more complex workflow, we could use 
https://github.com/actions/github-script. This basically lets you do anything 
that the Github API allows (kind of like using Groovy in a Gradle build 😄).
   
   I also found 
https://github.com/marketplace/actions/auto-label-merge-conflicts which would 
add a label to PRs with conflicts. This could be used to create two separate 
workflows of the "stale" action. 
   
   I think there's a lot of flexibility with the actions available.
   
   Do you agree that we can scope this PR to "marking old PRs as stale" for the 
sake of clearing our backlog?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223142144


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -197,7 +199,8 @@ class BrokerServer(
   logManager = LogManager(config, initialOfflineDirs, metadataCache, 
kafkaScheduler, time,
 brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = 
true)
 
-  remoteLogManager = createRemoteLogManager(config)
+  val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   Correct me if I am wrong here but we already have RemoteLogManagerConfig. It 
is used here:
   
   
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/log/LogManager.scala#L1405
   
   Hence, we can simply do 
`createRemoteLogManager(config.remoteLogManagerConfig)` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13795: KAFKA-14462; [17/N] Add CoordinatorRuntime

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13795:
URL: https://github.com/apache/kafka/pull/13795#discussion_r1223158227


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -0,0 +1,1040 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.runtime;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.deferred.DeferredEvent;
+import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The CoordinatorRuntime provides a framework to implement coordinators such 
as the group coordinator
+ * or the transaction coordinator.
+ *
+ * The runtime framework maps each underlying partitions (e.g. 
__consumer_offsets) that that broker is a
+ * leader of to a coordinator replicated state machine. A replicated state 
machine holds the hard and soft
+ * state of all the objects (e.g. groups or offsets) assigned to the 
partition. The hard state is stored in
+ * timeline datastructures backed by a SnapshotRegistry. The runtime supports 
two type of operations
+ * on state machines: (1) Writes and (2) Reads.
+ *
+ * (1) A write operation, aka a request, can read the full and potentially 
**uncommitted** state from state
+ * machine to handle the operation. A write operation typically generates a 
response and a list of
+ * records. The records are applies to the state machine and persisted to the 
partition. The response
+ * is parked until the records are committed and delivered when they are.
+ *
+ * (2) A read operation, aka a request, can only read the committed state from 
the state machine to handle
+ * the operation. A read operation typically generates a response that is 
immediately completed.
+ *
+ * The runtime framework exposes an asynchronous, future based, API to the 
world. All the operations
+ * are executed by an CoordinatorEventProcessor. The processor guarantees that 
operations for a
+ * single partition or state machine are not processed concurrently.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public class CoordinatorRuntime, U> {
+
+/**
+ * Builder to create a CoordinatorRuntime.
+ *
+ * @param  The type of the state machine.
+ * @param  The type of the record.
+ */
+public static class Builder, U> {
+private LogContext logContext;
+private CoordinatorEventProcessor eventProcessor;
+private PartitionWriter partitionWriter;
+private CoordinatorLoader loader;
+private CoordinatorBuilderSupplier coordinatorBuilderSupplier;
+
+public Builder withLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+public Builder withEventProcessor(CoordinatorEventProcessor 
eventProcessor) {
+this.eventProcessor = eventProcessor;
+return this;
+}
+
+public Builder withPartitionWriter(PartitionWriter 
partitionWriter) {
+this.partitionWriter = partitionWriter;
+return this;
+}
+
+public Builder withLoader(CoordinatorLoader loader) {
+this.loader = loader;
+return this;
+}
+
+public Builder 
withCoordinatorBuilderSupplier(CoordinatorBuilderSupplier 
coordinatorBuilderSupplier) {
+this.coordinatorBuilderSupplier = coordinatorBuilderSupplier;
+return this;
+}
+
+public CoordinatorRuntime build() {
+if (logContext == null)
+logContext = new LogContext();
+if (eventProcessor == null)
+throw new IllegalAr

[GitHub] [kafka] C0urante commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2023-06-08 Thread via GitHub


C0urante commented on PR #12307:
URL: https://github.com/apache/kafka/pull/12307#issuecomment-1582800062

   Test failures appear unrelated; merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2023-06-08 Thread via GitHub


C0urante merged PR #12307:
URL: https://github.com/apache/kafka/pull/12307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15051) docs: add missing connector plugin endpoint to documentation

2023-06-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730611#comment-17730611
 ] 

ASF GitHub Bot commented on KAFKA-15051:


C0urante merged PR #520:
URL: https://github.com/apache/kafka-site/pull/520




> docs: add missing connector plugin endpoint to documentation
> 
>
> Key: KAFKA-15051
> URL: https://issues.apache.org/jira/browse/KAFKA-15051
> Project: Kafka
>  Issue Type: Task
>  Components: docs, documentation
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Minor
>
> GET /plugin/config endpoint added in 
> [KIP-769|https://cwiki.apache.org/confluence/display/KAFKA/KIP-769%3A+Connect+APIs+to+list+all+connector+plugins+and+retrieve+their+configuration+definitions]
>  is not included in the connect documentation page: 
> https://kafka.apache.org/documentation/#connect_rest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-08 Thread via GitHub


cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1223073798


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private final String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.MAX_VALUE;
+numRecords = 0;
+bufferSize = 0;
+this.topic = topic;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void setSerdesIfNull(final SerdeGetter getter) {
+keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
+valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void init(final StateStoreContext context, final StateStore root) {
+wrapped().init(context, wrapped());
+}
+
+@Override
+public void evictWhile(final Supplier predicate, final 
Consumer> callback) {
+KeyValue keyValue;
+
+if (predicate.get()) {
+try (final KeyValueIterator iterator = wrapped()
+.fetchAll(0, wrapped().observedStreamTime - gracePeriod)) {
+if (iterator.hasNext()) {
+keyValue = iterator.next();
+} else {
+if (numRecords() == 0) {

Review Comment:
   Why do you not directly access the field?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.st

[GitHub] [kafka] viktorsomogyi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-08 Thread via GitHub


viktorsomogyi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1582838059

   So I have some context with the replica fetcher area (mostly by reading and 
debugging), I hope I can help.
   
   First, since the conversation is a bit long, let me summarize what I 
understand:
   - The problem is disk A reaches its capacity limits
   - The solution is to move partition X-1 to disk B
   - During the reassignment, log cleaning is disabled on X-1 (which can 
therefore fill disk A)
   - The reassignment of X-1 fails, it is left failed there on B and X-1 on A 
keeps growing
   Is this correct?
   
   If it is, we may need to separate the deletion and compaction cases. I think 
resuming deletion is safe, however resuming compaction might not be, since 
compaction alters the log. If an operator somehow resumes B and lets 
replication continue, then the history of X-1 in A and B might be different 
(I'm still working on a local test case that reproduces this). What do you 
think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-08 Thread via GitHub


viktorsomogyi commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1219754042


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -447,11 +447,11 @@ class Partition(val topicPartition: TopicPartition,
   private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
 def updateHighWatermark(log: UnifiedLog): Unit = {
   val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, 
topicPartition).getOrElse {
-info(s"No checkpointed highwatermark is found for partition 
$topicPartition")
+info(s"No checkpointed highwatermark is found for ${if 
(isFutureReplica) "Future partition" else "partition"} $topicPartition")

Review Comment:
   nit: "future" not "Future"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13812: KAFKA-14462; [18/N] Add GroupCoordinatorService

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13812:
URL: https://github.com/apache/kafka/pull/13812#discussion_r1223233148


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidFetchSizeException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.message.HeartbeatRequestData;
+import org.apache.kafka.common.message.HeartbeatResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsRequestData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
+import org.apache.kafka.common.message.OffsetDeleteRequestData;
+import org.apache.kafka.common.message.OffsetDeleteResponseData;
+import org.apache.kafka.common.message.OffsetFetchRequestData;
+import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.SyncGroupRequestData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
+import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.FutureUtils;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+
+/**
+ * The group coordinator service.
+ */
+public class GroupCoordinatorService implements GroupCoordinator {
+
+public static class Builder {
+private final int nodeId;
+private final GroupCoordinatorConfig config;
+private PartitionWriter writer;
+private CoordinatorLoader loader;
+
+public Builder(
+int nodeId,
+GroupCoordinatorConfig config
+) {
+this.nodeId = nodeId;
+this.config = config;
+}
+
+public Builder withWriter(PartitionWriter writer) {
+this.writer = writer;
+return this;
+}
+
+

[GitHub] [kafka] mumrah commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-08 Thread via GitHub


mumrah commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1223265946


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet 
interests, TopicVisitor
 (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
 writer.handleSnapshot(image, consumer);
 assertEquals(1, opCounts.remove("CreateTopic"));
-assertEquals(1, opCounts.remove("UpdatePartition"));
+assertEquals(1, opCounts.remove("UpdatePartitions"));
 assertEquals(1, opCounts.remove("UpdateTopic"));
 assertEquals(0, opCounts.size());
 assertEquals("bar", topicClient.createdTopics.get(0));
 }
+
+@Test
+public void testDeleteTopicFromSnapshot() {
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+visitor.visitTopic("spam", Uuid.randomUuid(), 
Collections.emptyMap());
+}
+};
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+Map opCounts = new HashMap<>();
+KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+(logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+
+opCounts.clear();
+topicClient.reset();
+writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(2, opCounts.remove("CreateTopic"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+}
+
+@FunctionalInterface
+interface TopicVerifier {
+void verify(Uuid topicId, TopicsImage topicsImage, 
CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+}
+
+void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+// Set up a topic with two partitions in ZK (via iterateTopics) and a 
KRaft TopicsImage, then run the given verifier
+Uuid topicId = Uuid.randomUuid();
+Map partitionMap = new HashMap<>();
+partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new 
int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, 
-1));
+partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new 
int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, 
-1));
+
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+Map> assignments = new HashMap<>();
+assignments.put(0, Arrays.asList(2, 3, 4));
+assignments.put(1, Arrays.asList(3, 4, 5));
+visitor.visitTopic("spam", topicId, assignments);
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 0)), partitionMap.get(0));
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 1)), partitionMap.get(1));
+}
+};
+
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 
0).message());
+delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 
1).message());
+TopicsImage image = delta.apply();
+
+verifier.verify(topicId, image, topicClient, writer);
+}
+
+@Test
+public void testUpdatePartitionsFromSnapshot() {
+setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+ 

[GitHub] [kafka] hudeqi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-08 Thread via GitHub


hudeqi commented on PR #13719:
URL: https://github.com/apache/kafka/pull/13719#issuecomment-1582994469

   @viktorsomogyi hi, this minor pr is also about replica fetcher thread, 
please help to review, thanks! Seems @dajac have no time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-06-08 Thread via GitHub


jolshan commented on code in PR #13787:
URL: https://github.com/apache/kafka/pull/13787#discussion_r1223292555


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1006,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction 
state.
+  // This guarantees that transactional records are never written to 
the log outside of the transaction coordinator's knowledge of an open 
transaction on
+  // the partition. If we do not have an ongoing transaction or 
correct guard, return an error and do not append.
+  // There are two phases -- the first append to the log and 
subsequent appends.
+  //
+  // 1. First append: Verification starts with creating a verification 
guard object, sending a verification request to the transaction coordinator, and
+  // given a "verified" response, continuing the append path. (A 
non-verified response throws an error.) We create the unique verification guard 
for the transaction
+  // to ensure there is no race between the transaction coordinator 
response and an abort marker getting written to the log. We need a unique guard 
because we could
+  // have a sequence of events where we start a transaction 
verification, have the transaction coordinator send a verified response, write 
an abort marker,
+  // start a new transaction not aware of the partition, and receive 
the stale verification (ABA problem). With a unique verification guard object, 
this sequence would not
+  // result in appending to the log and would return an error. The 
guard is removed after the first append to the transaction and from then, we 
can rely on phase 2.
+  //
+  // 2. Subsequent appends: Once we write to the transaction, the 
in-memory state currentTxnFirstOffset is populated. This field remains until the
+  // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
+  // ongoing. If the transaction is expected to be ongoing, we will 
not set a verification guard. If the transaction is aborted, 
hasOngoingTransaction is false and
+  // requestVerificationGuard is null, so we will throw an error. A 
subsequent produce request (retry) should create verification state and return 
to phase 1.
+  if (!hasOngoingTransaction(batch.producerId) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))

Review Comment:
   I think it is safe since we are just checking the producer state entry (and 
if it is missing, we will return false)
   But I can place the explicit is transactional check first if it seems 
clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-06-08 Thread via GitHub


hudeqi commented on PR #13696:
URL: https://github.com/apache/kafka/pull/13696#issuecomment-1582997213

   This minor pr is also about replica fetcher thread, please help to review, 
thanks! @viktorsomogyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 opened a new pull request, #13830: KAFKA-14936: Change Time Ordered Buffer to not require Change<> 0/N

2023-06-08 Thread via GitHub


wcarlson5 opened a new pull request, #13830:
URL: https://github.com/apache/kafka/pull/13830

   Make it so the Time ordered buffer doesn't need a change record. We now have 
two types for the value. One for the storage type and another for the type. 
They can be the same if you don't want to use the change value. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-08 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1223321590


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final Duration gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRec;
+private Serde keySerde;
+private FullChangeSerde valueSerde;

Review Comment:
   @vcrfxia @cadonna I made the changes on 
https://github.com/apache/kafka/pull/13830. I will rebase this one once that is 
merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-06-08 Thread via GitHub


junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1222183119


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -945,4 +1176,27 @@ public void close() {
 }
 }
 
+private static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+this.retentionSize = retentionSize;
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+}
+
+private static class RetentionTimeData {
+
+private final long retentionMs;
+private final long cleanupUntilMs;
+
+public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+this.retentionMs = retentionMs;
+this.cleanupUntilMs = cleanupUntilMs;
+}
+

Review Comment:
   extra new line



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -945,4 +1176,27 @@ public void close() {
 }
 }
 
+private static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+this.retentionSize = retentionSize;
+this.remainingBreachedSize = remainingBreachedSize;
+}
+

Review Comment:
   extra new line



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -945,4 +1176,27 @@ public void close() {
 }
 }
 
+private static class RetentionSizeData {
+private final long retentionSize;
+private final long remainingBreachedSize;
+
+public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+this.retentionSize = retentionSize;
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+}
+
+private static class RetentionTimeData {
+
+private final long retentionMs;
+private final long cleanupUntilMs;
+
+public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+this.retentionMs = retentionMs;
+this.cleanupUntilMs = cleanupUntilMs;
+}
+
+}
+

Review Comment:
   extra new line



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +625,230 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+if (isLeader()) {
+logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+}
+
+class RemoteLogRetentionHandler {
+
+private final Optional retentionSizeData;
+private final Optional retentionTimeData;
+
+private long remainingBreachedSize;
+
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(Optional 
retentionSizeData, Optional retentionTimeData) {
+this.retentionSizeData = retentionSizeData;
+this.retentionTimeData = retentionTimeData;
+remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+if (!retentionSizeData.isPresent()) {
+return false;
+}
+
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size >= 0
+if (retentionSizeData.get().remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+}
+return isSegmentDeleted;
+}
+
+public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+if (!rete

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-06-08 Thread via GitHub


wcarlson5 commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1223326402


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {

Review Comment:
   The segments are not particularly critical. What I needed was the time 
ordered part and that had been implemented with the segment store



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer extends 
WrappedStateStore implements TimeOrderedKeyValueBuffer {
+
+private final long gracePeriod;
+private long bufferSize;
+private long minTimestamp;
+private int numRecords;
+private Serde keySerde;
+private FullChangeSerde valueSerde;
+private final String topic;
+
+public RocksDBTimeOrderedKeyValueBuffer(final 
RocksDBTimeOrderedKeyValueSegmentedBytesStore store,
+final Duration gracePeriod,
+final String topic) {
+super(store);
+this.gracePeriod = gracePeriod.toMillis();
+minTimestamp = Long.M

[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-08 Thread via GitHub


novosibman commented on code in PR #13782:
URL: https://github.com/apache/kafka/pull/13782#discussion_r1223373702


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -430,11 +428,19 @@ public Optional lastEntry(long 
producerId) {
  * Take a snapshot at the current end offset if one does not already exist.
  */
 public void takeSnapshot() throws IOException {
+takeSnapshot(null);
+}
+
+/**
+ * Take a snapshot at the current end offset if one does not already exist.
+ * Flush the snapshot asynchronously if scheduler != null
+ */
+public void takeSnapshot(Scheduler scheduler) throws IOException {

Review Comment:
   IOException still will be thrown on open/write/close operations. Force 
(flush) operation running by scheduler in a separate thread will write log 
warning only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] novosibman commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-08 Thread via GitHub


novosibman commented on code in PR #13782:
URL: https://github.com/apache/kafka/pull/13782#discussion_r1223374923


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -681,7 +687,12 @@ private static void writeSnapshot(File file, Map entri
 
 try (FileChannel fileChannel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
 fileChannel.write(buffer);
-fileChannel.force(true);
+}
+
+if (scheduler != null) {
+scheduler.scheduleOnce("flush-producer-snapshot", () -> 
Utils.flushFileQuietly(file.toPath(), "producer-snapshot"));
+} else {
+Utils.flushFileQuietly(file.toPath(), "producer-snapshot");

Review Comment:
   Open/close changes done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] novosibman commented on pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

2023-06-08 Thread via GitHub


novosibman commented on PR #13782:
URL: https://github.com/apache/kafka/pull/13782#issuecomment-1583094847

   Open/close changes provided. 
   Also corrected style check issue (in task ':storage:checkstyleMain').


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #13802: MINOR: Improve KRaftMigrationZkWriter test coverage

2023-06-08 Thread via GitHub


ahuang98 commented on code in PR #13802:
URL: https://github.com/apache/kafka/pull/13802#discussion_r1223387331


##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriterTest.java:
##
@@ -195,9 +235,658 @@ public void iterateTopics(EnumSet 
interests, TopicVisitor
 (logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
 writer.handleSnapshot(image, consumer);
 assertEquals(1, opCounts.remove("CreateTopic"));
-assertEquals(1, opCounts.remove("UpdatePartition"));
+assertEquals(1, opCounts.remove("UpdatePartitions"));
 assertEquals(1, opCounts.remove("UpdateTopic"));
 assertEquals(0, opCounts.size());
 assertEquals("bar", topicClient.createdTopics.get(0));
 }
+
+@Test
+public void testDeleteTopicFromSnapshot() {
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+visitor.visitTopic("spam", Uuid.randomUuid(), 
Collections.emptyMap());
+}
+};
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+Map opCounts = new HashMap<>();
+KRaftMigrationOperationConsumer consumer = 
KRaftMigrationDriver.countingOperationConsumer(opCounts,
+(logMsg, operation) -> 
operation.apply(ZkMigrationLeadershipState.EMPTY));
+writer.handleTopicsSnapshot(TopicsImage.EMPTY, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+
+opCounts.clear();
+topicClient.reset();
+writer.handleTopicsSnapshot(TopicsImageTest.IMAGE1, consumer);
+assertEquals(1, opCounts.remove("DeleteTopic"));
+assertEquals(1, opCounts.remove("DeleteTopicConfig"));
+assertEquals(2, opCounts.remove("CreateTopic"));
+assertEquals(0, opCounts.size());
+assertEquals(Collections.singletonList("spam"), 
topicClient.deletedTopics);
+assertEquals(Arrays.asList("foo", "bar"), topicClient.createdTopics);
+}
+
+@FunctionalInterface
+interface TopicVerifier {
+void verify(Uuid topicId, TopicsImage topicsImage, 
CapturingTopicMigrationClient topicClient, KRaftMigrationZkWriter writer);
+}
+
+void setupTopicWithTwoPartitions(TopicVerifier verifier) {
+// Set up a topic with two partitions in ZK (via iterateTopics) and a 
KRaft TopicsImage, then run the given verifier
+Uuid topicId = Uuid.randomUuid();
+Map partitionMap = new HashMap<>();
+partitionMap.put(0, new PartitionRegistration(new int[]{2, 3, 4}, new 
int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, LeaderRecoveryState.RECOVERED, 0, 
-1));
+partitionMap.put(1, new PartitionRegistration(new int[]{3, 4, 5}, new 
int[]{3, 4, 5}, new int[]{}, new int[]{}, 3, LeaderRecoveryState.RECOVERED, 0, 
-1));
+
+CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+@Override
+public void iterateTopics(EnumSet interests, 
TopicVisitor visitor) {
+Map> assignments = new HashMap<>();
+assignments.put(0, Arrays.asList(2, 3, 4));
+assignments.put(1, Arrays.asList(3, 4, 5));
+visitor.visitTopic("spam", topicId, assignments);
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 0)), partitionMap.get(0));
+visitor.visitPartition(new TopicIdPartition(topicId, new 
TopicPartition("spam", 1)), partitionMap.get(1));
+}
+};
+
+CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+.setBrokersInZk(0)
+.setTopicMigrationClient(topicClient)
+.build();
+KRaftMigrationZkWriter writer = new 
KRaftMigrationZkWriter(migrationClient);
+
+TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+delta.replay(new TopicRecord().setTopicId(topicId).setName("spam"));
+delta.replay((PartitionRecord) partitionMap.get(0).toRecord(topicId, 
0).message());
+delta.replay((PartitionRecord) partitionMap.get(1).toRecord(topicId, 
1).message());
+TopicsImage image = delta.apply();
+
+verifier.verify(topicId, image, topicClient, writer);
+}
+
+@Test
+public void testUpdatePartitionsFromSnapshot() {
+setupTopicWithTwoPartitions((topicId, topicsImage, topicClient, 
writer) -> {
+   

[jira] [Commented] (KAFKA-10337) Wait for pending async commits in commitSync() even if no offsets are specified

2023-06-08 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730665#comment-17730665
 ] 

Erik van Oosten commented on KAFKA-10337:
-

Thanks for your PR [~thomaslee]. It has been merged now with little changes.

> Wait for pending async commits in commitSync() even if no offsets are 
> specified
> ---
>
> Key: KAFKA-10337
> URL: https://issues.apache.org/jira/browse/KAFKA-10337
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Tom Lee
>Assignee: Erik van Oosten
>Priority: Major
> Fix For: 3.6.0
>
>
> The JavaDoc for commitSync() states the following:
> {quote}Note that asynchronous offset commits sent previously with the
> {@link #commitAsync(OffsetCommitCallback)}
>  (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {quote}
> But should we happen to call the method with an empty offset map
> (i.e. commitSync(Collections.emptyMap())) the callbacks for any incomplete 
> async commits will not be invoked because of an early return in 
> ConsumerCoordinator.commitOffsetsSync() when the input map is empty.
> If users are doing manual offset commits and relying on commitSync as a 
> barrier for in-flight async commits prior to a rebalance, this could be an 
> important (though somewhat implementation-dependent) detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jlprat commented on pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-08 Thread via GitHub


jlprat commented on PR #13827:
URL: https://github.com/apache/kafka/pull/13827#issuecomment-1583155790

   > I also found 
https://github.com/marketplace/actions/auto-label-merge-conflicts which would 
add a label to PRs with conflicts. This could be used to create two separate 
workflows of the "stale" action.
   
   This is a good one, I think we can combine these 2 actions and make them run 
to accomplish what we need. Mark PRs with merge conflicts with a label, and 
then we can run the stale job twice, one excluding the conflict label and 
another one only for the conflict label.
   
   > Do you agree that we can scope this PR to "marking old PRs as stale" for 
the sake of clearing our backlog?
   
   Yes this makes sense. I'll work on the other one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jlprat commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-08 Thread via GitHub


jlprat commented on code in PR #13827:
URL: https://github.com/apache/kafka/pull/13827#discussion_r1223425372


##
.github/workflows/stale.yml:
##
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: 'Close stale issues and PRs'
+on:
+  #schedule:
+  #  - cron: '30 3 * * *'
+  workflow_dispatch:
+inputs:
+  dryRun:
+description: 'Dry Run'
+required: true
+default: true
+type: boolean
+  operationsPerRun:
+description: 'Max GitHub API operations'
+required: true
+default: 30
+type: number
+
+permissions:
+  issues: write
+  pull-requests: write
+
+jobs:
+  stale:
+runs-on: ubuntu-latest
+steps:
+  - uses: actions/stale@v8
+with:
+  debug-only: ${{ inputs.dryRun }}
+  operations-per-run: ${{ inputs.operationsPerRun }}
+  days-before-stale: 90

Review Comment:
   I think 90 days is a long time but looking at our queue, I think it's a 
realistic value to start with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223454138


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent 
event) {
 manager.addOffsetFetchRequest(event.partitions);
 return true;
 }
+
+private boolean process(final MetadataUpdateApplicationEvent event) {
+metadata.requestUpdateForNewTopics();

Review Comment:
   Changed `MetadataUpdateApplicationEvent` to 
`NewTopicsMetadataUpdateRequestEvent` for clarity.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/MetadataUpdateApplicationEvent.java:
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class MetadataUpdateApplicationEvent extends ApplicationEvent {
+
+private final long timestamp;

Review Comment:
   Removed the unused variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223460709


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent 
event) {
 manager.addOffsetFetchRequest(event.partitions);
 return true;
 }
+
+private boolean process(final MetadataUpdateApplicationEvent event) {
+metadata.requestUpdateForNewTopics();
+return true;
+}
+
+private boolean process(final UnsubscribeApplicationEvent event) {
+/*
+this.coordinator.onLeavePrepare();
+this.coordinator.maybeLeaveGroup("the consumer unsubscribed 
from all topics");
+ */

Review Comment:
   Removed those lines as it's for unsubscribe which isn't baked enough for 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223460895


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher

Review Comment:
   Yes. We don't have the Fetcher ready to merge into AK yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461234


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());
+
+log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
+if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+   updateMetadata(time.milliseconds());

Review Comment:
   Does the latest rename and refactor address your concerns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461556


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) {
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+// 
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());

Review Comment:
   Backed out the unsubscribe logic as it wasn't implemented in any meaningful 
way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-06-08 Thread via GitHub


kirktrue commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1223461888


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());

Review Comment:
   I'll take a closer look at this.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java:
##
@@ -166,7 +208,7 @@ private DefaultBackgroundThread mockBackgroundThread() {
 applicationEventsQueue,
 backgroundEventsQueue,
 this.errorEventHandler,
-processor,
+applicationEventProcessor,

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-06-08 Thread Steven Booke (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Booke reassigned KAFKA-14995:


Assignee: Steven Booke

> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp commented on pull request #13662: MINOR: Upgrade Jackson dependencies to version 2.15.0

2023-06-08 Thread via GitHub


bmscomp commented on PR #13662:
URL: https://github.com/apache/kafka/pull/13662#issuecomment-1583266979

   @showuon Yes I'll do ,Thanks so much  @divijvaidya  for reviews, I am back 
from holidays :) , I'll continue working on this topic, I'll check all comments 
one by one and try to bring the best answer possible that I'lll be able to do 
   
   Thanks again for all reviews 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic

2023-06-08 Thread via GitHub


lianetm commented on PR #13815:
URL: https://github.com/apache/kafka/pull/13815#issuecomment-1583285348

   Thanks @junrao. Just addressed the comment and checked unrelated failing 
tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #13815: KAFKA-14966: Extract OffsetFetcher reusable logic

2023-06-08 Thread via GitHub


junrao merged PR #13815:
URL: https://github.com/apache/kafka/pull/13815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14966) Extract reusable common logic from OffsetFetcher

2023-06-08 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14966.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

Merged the PR to trunk.

> Extract reusable common logic from OffsetFetcher
> 
>
> Key: KAFKA-14966
> URL: https://issues.apache.org/jira/browse/KAFKA-14966
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.6.0
>
>
> The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, 
> validate and reset positions. 
> For the new consumer based on a refactored threading model, similar 
> functionality will be needed by the ListOffsetsRequestManager component. 
> This task aims at identifying and extracting the OffsetFetcher functionality 
> that can be reused by the new consumer implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13679: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse

2023-06-08 Thread via GitHub


ijuma commented on code in PR #13679:
URL: https://github.com/apache/kafka/pull/13679#discussion_r1223558292


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -112,4 +157,8 @@ class DefaultApiVersionManager(
   zkMigrationEnabled
 )
   }
+
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: 
Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse 
= {
+throw new UnsupportedOperationException("This method is not supported in 
DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")

Review Comment:
   I think @cmccabe submitted a PR to fix this here 
https://github.com/apache/kafka/pull/13826



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bogao007 opened a new pull request, #13831: KAFKA-15053: Use case insensitive validator for security.protocol config

2023-06-08 Thread via GitHub


bogao007 opened a new pull request, #13831:
URL: https://github.com/apache/kafka/pull/13831

   Fixed a regression described in 
[KAFKA-15053](https://issues.apache.org/jira/browse/KAFKA-15053) that 
security.protocol only allows uppercase values like PLAINTEXT, SSL, 
SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values 
will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, 
ssl, sasl_plaintext, sasl_ssl)
   
   Added new unit test to cover the case insensitive test case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-08 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767
 ] 

Bo Gao commented on KAFKA-15053:


Hi [~ChrisEgerton] , just created a pull request for the fix 
[https://github.com/apache/kafka/pull/13831.]

I also have a quick question for the release process: imagine this fix is 
merged and released, to consume this fix, do I need to upgrade to the latest 
Kafka version? Or is there a way to include this fix in a minor release? Thanks!

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-08 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767
 ] 

Bo Gao edited comment on KAFKA-15053 at 6/8/23 11:44 PM:
-

Hi [~ChrisEgerton] , just created a pull request for the fix 
[https://github.com/apache/kafka/pull/13831|https://github.com/apache/kafka/pull/13831.]

I also have a quick question for the release process: imagine this fix is 
merged and released, to consume this fix, do I need to upgrade to the latest 
Kafka version? Or is there a way to include this fix in a minor release? Thanks!


was (Author: JIRAUSER300429):
Hi [~ChrisEgerton] , just created a pull request for the fix 
[https://github.com/apache/kafka/pull/13831.]

I also have a quick question for the release process: imagine this fix is 
merged and released, to consume this fix, do I need to upgrade to the latest 
Kafka version? Or is there a way to include this fix in a minor release? Thanks!

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-08 Thread Bo Gao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730767#comment-17730767
 ] 

Bo Gao edited comment on KAFKA-15053 at 6/8/23 11:45 PM:
-

Hi [~ChrisEgerton] , just created a pull request for the fix 
[https://github.com/apache/kafka/pull/13831].

I also have a quick question for the release process: imagine this fix is 
merged and released, to consume this fix, do I need to upgrade to the latest 
Kafka version? Or is there a way to include this fix in a minor release? Thanks!


was (Author: JIRAUSER300429):
Hi [~ChrisEgerton] , just created a pull request for the fix 
[https://github.com/apache/kafka/pull/13831|https://github.com/apache/kafka/pull/13831.]

I also have a quick question for the release process: imagine this fix is 
merged and released, to consume this fix, do I need to upgrade to the latest 
Kafka version? Or is there a way to include this fix in a minor release? Thanks!

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Priority: Major
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13830: KAFKA-14936: Change Time Ordered Buffer to not require Change<> 0/N

2023-06-08 Thread via GitHub


vcrfxia commented on code in PR #13830:
URL: https://github.com/apache/kafka/pull/13830#discussion_r1223656443


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java:
##
@@ -27,14 +26,14 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public interface TimeOrderedKeyValueBuffer extends StateStore {
+public interface TimeOrderedKeyValueBuffer extends StateStore {

Review Comment:
   Can we add a quick javadoc to clarify that `T` here is the buffer type? 
Should already be clear to anyone who reads the code, but a javadoc will help 
make it so readers won't need to read the code if all they want to know is what 
the type is.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java:
##
@@ -65,7 +65,7 @@
 import static 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
 import static 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
 
-public final class InMemoryTimeOrderedKeyValueBuffer implements 
TimeOrderedKeyValueBuffer {
+public final class InMemoryTimeOrderedKeyValueBuffer implements 
TimeOrderedKeyValueBuffer> {

Review Comment:
   Could be good to rename this to `InMemoryTimeOrderedKeyValueChangeBuffer` to 
contrast with `RocksDBTimeOrderedKeyValueBuffer` (not a change buffer) in your 
next PR, but I don't feel too strongly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13807: KAFKA-15040: trigger onLeadershipChange under KRaft mode

2023-06-08 Thread via GitHub


showuon merged PR #13807:
URL: https://github.com/apache/kafka/pull/13807


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] KarboniteKream commented on pull request #13762: MINOR: Do not print an empty line when no topics exist

2023-06-08 Thread via GitHub


KarboniteKream commented on PR #13762:
URL: https://github.com/apache/kafka/pull/13762#issuecomment-1583888214

   Sure, I understand the concerns.
   
   > you might want to align your PR with that PR?
   
   Do you mean waiting for that PR to get merged, then rebase my changes? Or 
add my change to that PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] danielgospodinow commented on a diff in pull request #13827: KAFKA-15073: Add a Github action to mark PRs as stale

2023-06-08 Thread via GitHub


danielgospodinow commented on code in PR #13827:
URL: https://github.com/apache/kafka/pull/13827#discussion_r1223881556


##
.github/workflows/stale.yml:
##
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: 'Close stale issues and PRs'
+on:
+  #schedule:

Review Comment:
   Maybe we can add an additional comment here to explain that there's a plan 
to use this cron schedule for nightly running of the workflow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1223906254


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {
+public final MockScheduler scheduler;

Review Comment:
   It would break a bunch of java usages. This is why I kept it like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1223909537


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {
+public final MockScheduler scheduler;
 
-  val scheduler = new MockScheduler(this)
+public MockTime() {
+this(System.currentTimeMillis(), System.nanoTime());

Review Comment:
   I don't see any benefits of using it here. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1223914389


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {

Review Comment:
   I agree that the name is bad. As said earlier, my intent is to just move it 
for now. We could consider renaming it separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac commented on code in PR #13823:
URL: https://github.com/apache/kafka/pull/13823#discussion_r1223913820


##
server-common/src/test/java/org/apache/kafka/server/util/MockTime.java:
##
@@ -27,15 +23,21 @@
  * 1. This has an associated scheduler instance for managing background tasks 
in a deterministic way.
  * 2. This doesn't support the `auto-tick` functionality as it interacts badly 
with the current implementation of `MockScheduler`.
  */
-class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends 
JMockTime(0, currentTimeMs, currentHiResTimeNs) {
-
-  def this() = this(System.currentTimeMillis(), System.nanoTime())
+public class MockTime extends org.apache.kafka.common.utils.MockTime {

Review Comment:
   Hum... Implementing AutoCloseable alone does not help. We would need to 
change all the usages to call `close` to make it work, right?
   
   My goal with this patch is to move it from core to server-common without 
changing it. I think that we could consider doing this later, if it brings 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac commented on PR #13823:
URL: https://github.com/apache/kafka/pull/13823#issuecomment-1584066362

   I will merge it as it is based on @mumrah's +1. I will follow up based on 
@divijvaidya's replies if necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13823: MINOR: Move MockTime to server-common

2023-06-08 Thread via GitHub


dajac merged PR #13823:
URL: https://github.com/apache/kafka/pull/13823


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

2023-06-08 Thread via GitHub


dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1223802381


##
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##
@@ -584,4 +584,12 @@ class ZkMetadataCache(
   def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
 featuresAndEpoch
   }
+
+  override def versionContext(): MetadataVersionContext = {

Review Comment:
   The name is confusing, we include features and metadata version in the 
context, it's better to use FeaturesContext since metadata version is also a 
feature.



##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -447,16 +447,9 @@ class ControllerApis(val requestChannel: RequestChannel,
 requestThrottleMs => 
apiVersionRequest.getErrorResponse(requestThrottleMs, 
INVALID_REQUEST.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
-  val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal, OptionalLong.empty())
-  controller.finalizedFeatures(context).handle { (result, exception) =>
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-  if (exception != null) {
-apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-  } else {
-apiVersionManager.apiVersionResponse(requestThrottleMs, 
result.featureMap().asScala.toMap, result.epoch())
-  }
-})
-  }
+  requestHelper.sendResponseMaybeThrottle(request,

Review Comment:
   Since we are changing back to the original code, we can make it the same as 
KafkaApis, like this:
   ```
   def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse 
= {
 val apiVersionRequest = request.body[ApiVersionsRequest]
 if (apiVersionRequest.hasUnsupportedRequestVersion) {
   apiVersionRequest.getErrorResponse(requestThrottleMs, 
Errors.UNSUPPORTED_VERSION.exception)
 } else if (!apiVersionRequest.isValid) {
   apiVersionRequest.getErrorResponse(requestThrottleMs, 
Errors.INVALID_REQUEST.exception)
 } else {
   apiVersionManager.apiVersionResponse(requestThrottleMs)
 }
   }
   requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org