[GitHub] [kafka] vcrfxia commented on pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


vcrfxia commented on PR #13264:
URL: https://github.com/apache/kafka/pull/13264#issuecomment-1451404037

   > There is test failures. Could it be related to this PR?
   
   Good catch. These failures only appeared when I switched from the original 
try-catch approach for casting stores to either TimestampedKeyValueStore or 
VersionedKeyValueStore, to fetching the store as a generic StateStore and then 
performing `instanceof` checks. The failures happened because the unit tests 
use a mock context which returns null for the state store, which failed the 
`instanceof` checks. I'm pretty sure in the actual code (non-mocked contexts) 
it's not possible for `context.getStateStore()` to return null and therefore 
this "issue" is limited to unit tests only, but I've updated the PR to return 
to the original try-catch approach for casting in order to be safe (and in 
light of your other comment above, regarding type casts on every `put/get` 
call).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


vcrfxia commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122676573


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public class KeyValueStoreWrapper implements StateStore {
+
+private TimestampedKeyValueStore timestampedStore = null;
+private VersionedKeyValueStore versionedStore = null;
+
+public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+try {
+// first try timestamped store
+timestampedStore = context.getStateStore(storeName);
+return;
+} catch (final ClassCastException e) {
+// ignore since could be versioned store instead
+}
+
+try {
+// next try versioned store
+versionedStore = context.getStateStore(storeName);
+} catch (final ClassCastException e) {
+throw new InvalidStateStoreException("KTable source state store 
must implement either TimestampedKeyValueStore or VersionedKeyValueStore.");
+}
+}
+
+public ValueAndTimestamp get(final K key) {
+if (timestampedStore != null) {
+return timestampedStore.get(key);
+}
+if (versionedStore != null) {
+final VersionedRecord versionedRecord = versionedStore.get(key);
+return versionedRecord == null
+? null
+: ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public void put(final K key, final V value, final long timestamp) {
+if (timestampedStore != null) {
+timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
+return;
+}
+if (versionedStore != null) {
+versionedStore.put(key, value, timestamp);
+return;
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public StateStore getStore() {

Review Comment:
   Yeah, your point about casting overhead is valid. I had hoped it'd be fairly 
minimal, but that's moot now since I went ahead and updated the PR to use your 
proposal (three store pointers) in response to your comment about test failures 
(see below).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-03-01 Thread via GitHub


showuon merged PR #13172:
URL: https://github.com/apache/kafka/pull/13172


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-03-01 Thread via GitHub


showuon commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1451363118

   Failed tests are unrelated
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KafkaServerKRaftRegistrationTest.[1] Type=ZK, 
Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest.testPrepareJoinAndRejoinAfterFailedRebalance()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorManyStandbys
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety

2023-03-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695451#comment-17695451
 ] 

Luke Chen commented on KAFKA-14768:
---

For (1), could you explain what "warmup" messages mean? How could you send 
messages before getting broker's metadata (ex: broker address?)

For (2) and (3), it makes sense to me. There was a KIP 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update)]
 which is trying to fix the similar issue. It didn't get accepted. Maybe you 
can check the discussion thread in that KIP for reference.

> proposal to reduce the first message's send time cost and max block time for 
> safety 
> 
>
> Key: KAFKA-14768
> URL: https://issues.apache.org/jira/browse/KAFKA-14768
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.3.1, 3.3.2
>Reporter: fujian
>Assignee: hzh0425
>Priority: Major
>  Labels: needs-kip, performance
>
> Hi, Team:
>  
> Nice to meet you!
>  
> In our business, we found two types of issue which need to improve:
>  
> *(1) Take much time to send the first message*
> Sometimes, we found the users' functional interaction take a lot of time. At 
> last, we figure out the root cause is that after we complete deploy or 
> restart the servers. The first message's delivery on each application server 
> by kafka client will take much time.
> So, we try to find one solution to improve it.
>  
> After analyzing the source code about the first time's sending logic. The 
> time cost is caused by the getting metadata before the sending. The latter's 
> sending won't take the much time due to the cached metadata. The logic is 
> right and necessary. Thus, we still want to improve the experience for the 
> first message's send/user first interaction.
>  
> *(2) can't reduce the send message's block time to wanted value.*
> Sometimes our application's thread will block for max.block.ms to send 
> message. When we try to reduce the max.block.ms to reduce the blocking time. 
> It can't meet the getting metadata's time requirement sometimes. The root 
> cause is the configured max.block.ms is shared with "get metadata" operation 
> and "send message" operation. We can refer to follow tables:
> |*where to block*
>  |*when it is blocked*
>  |*how long it will be blocked?*
>  |
> |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first 
> request which need to load the metadata from kafka| |org.apache.kafka.clients.producer.internals.RecordAccumulator#append|at peak 
> time for business, if the network can’t send message in short 
> time.|  
> What's the solution for the above two issues:
> I think about current logic and figure out followed possible solution:
> (1) send one "warmup" message, thus we can't send any fake message.
> (2) provide one extra configure time configure which dedicated for getting 
> metadata. thus it will break the define for the max.block.ms
> (3) add one method to call waitOnMetadata with one timeout setting without 
> using the max.block.ms  (PR: [KAFKA-14768: provide new method to warmup first 
> record's sending and reduce the max.block.ms safely by jiafu1115 · Pull 
> Request #13320 · apache/kafka 
> (github.com)|https://github.com/apache/kafka/pull/13320])
>  
> _note:  org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_
> ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long 
> nowMs, long maxWaitMs)
>  
>  __ 
> after the change, we can call it before the service is marked as ready. After 
> the ready. it won't block to get metadata due to cache. And then we can be 
> safe to reduce the max.block.ms to a lower value to reduce thread's blocking 
> time.
>  
> After adopting the solution 3. we solve the above issues. For example, we 
> reduce the first message's send about 4s seconds. The log can refer to 
> followed:
> _warmup test_topic at phase phase 2: get metadata from mq start_
> _warmup test_topic at phase phase 2: get metadata from mq end consume 
> *4669ms*_
> And after the change, we reduce the max.block.ms from 10s to 2s without worry 
> can't get metadata.
>  
> {*}So what's your thought for these two issues and the solution I 
> proposed{*}. I hope to get your feedback and thought for the issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-03-01 Thread via GitHub


satishd commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1121995162


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {

Review Comment:
   Right. This is the reason I did not add it earlier. But I am not strongly 
opinionated about that. 
   I am fine adding them with the latest commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695434#comment-17695434
 ] 

Luke Chen commented on KAFKA-14771:
---

[~pierDipi] , the suggestion makes sense to me. Welcome to submit a PR for it. 
Thanks.

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-03-01 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14729.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> -
>
> Key: KAFKA-14729
> URL: https://issues.apache.org/jira/browse/KAFKA-14729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.5.0
>
> Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, groupId=*_dev_VA] Heartbeat thread failed due to 
> unexpected error java.lang.OutOfMemoryError: Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
> return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-03-01 Thread via GitHub


showuon merged PR #13270:
URL: https://github.com/apache/kafka/pull/13270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] CalvinConfluent opened a new pull request, #13323: Add ReplicaState to FetchRequest.

2023-03-01 Thread via GitHub


CalvinConfluent opened a new pull request, #13323:
URL: https://github.com/apache/kafka/pull/13323

   As the first part of the 
[KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR),
 it updates the FetchRequest:
   - Deprecate the ReplicaId field
   - Create a new tagged field ReplicaState with ReplicaId and ReplicaEpoch
   - Bump the FetchRequest version to 15
   - Bump metadata version to 3.5-IV1
   
   https://issues.apache.org/jira/browse/KAFKA-14617
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

2023-03-01 Thread via GitHub


kirktrue commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1122535670


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {

Review Comment:
   These should come from here: 
https://kafka.apache.org/documentation/#consumer_fetch_monitoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

2023-03-01 Thread via GitHub


kirktrue commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1122517751


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param  Record key type
+ * @param  Record value type
+ */
+class CompletedFetch {
+
+private final Logger log;
+private final SubscriptionState subscriptions;
+private final boolean checkCrcs;
+private final BufferSupplier decompressionBufferSupplier;
+private final Deserializer keyDeserializer;
+private final Deserializer valueDeserializer;
+private final IsolationLevel isolationLevel;
+public final TopicPartition partition;

Review Comment:
   All fields are either package `protected` or `private` and are grouped into 
`mutable` and `final` sections, for a total of four sections.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import 

[GitHub] [kafka] mjsax commented on pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on PR #13264:
URL: https://github.com/apache/kafka/pull/13264#issuecomment-1451176639

   There is test failures. Could it be related to this PR?
   ```
   
org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoinTest.shouldLogAndMeterSkippedRecordsDueToNullLeftKey
   java.lang.NullPointerException: Cannot invoke "Object.getClass()" because 
"this.store" is null
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525554


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public class KeyValueStoreWrapper implements StateStore {
+
+private TimestampedKeyValueStore timestampedStore = null;
+private VersionedKeyValueStore versionedStore = null;
+
+public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+try {
+// first try timestamped store
+timestampedStore = context.getStateStore(storeName);
+return;
+} catch (final ClassCastException e) {
+// ignore since could be versioned store instead
+}
+
+try {
+// next try versioned store
+versionedStore = context.getStateStore(storeName);
+} catch (final ClassCastException e) {
+throw new InvalidStateStoreException("KTable source state store 
must implement either TimestampedKeyValueStore or VersionedKeyValueStore.");
+}
+}
+
+public ValueAndTimestamp get(final K key) {
+if (timestampedStore != null) {
+return timestampedStore.get(key);
+}
+if (versionedStore != null) {
+final VersionedRecord versionedRecord = versionedStore.get(key);
+return versionedRecord == null
+? null
+: ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public void put(final K key, final V value, final long timestamp) {
+if (timestampedStore != null) {
+timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
+return;
+}
+if (versionedStore != null) {
+versionedStore.put(key, value, timestamp);
+return;
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public StateStore getStore() {

Review Comment:
   My proposal was actually to have three member, the previously existing 
`KeyValueStore keyValueStore`, `VersionedStore versionStore` and a new `Store 
store`. So we just simplify the code for the case when we don't care about the 
store type, but when we care we check if `keyValueStore != null` or 
`versionStore != null` and do the right thing.
   
   Sorry for not explaining it good enough -- using an enum works, too, I 
guess, but also tend to agree it a little overkill. But you made the change 
already... So maybe also not worth to revert it again to sue `KeyValueStore 
keyValueStore` and `VersionedStore versionStore` as you did originally.
   
   The only question (by might be pre-mature optimization): we know need to 
cast on very single get/put call using the enum. Sounds like more overhead that 
using KeyValueStore keyValueStore` / 

[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122527645


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public class KeyValueStoreWrapper implements StateStore {
+
+private final StateStore store;
+private final StoreType storeType;
+
+private enum StoreType {
+TIMESTAMPED,
+VERSIONED;
+}
+
+public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+store = context.getStateStore(storeName);
+
+if (store instanceof TimestampedKeyValueStore) {

Review Comment:
   I think this is fine. -- Even if we have the cast as in the original code, I 
don't think it would be safer. The compile check cannot verify the generic 
types anyway, because `getStateStore` does not contain any information about it 
-- and at runtime, the generic types are gone and the actually "cast" if 
necessary from `Object` to `K` (or `V`) would happen somewhere else in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122527645


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public class KeyValueStoreWrapper implements StateStore {
+
+private final StateStore store;
+private final StoreType storeType;
+
+private enum StoreType {
+TIMESTAMPED,
+VERSIONED;
+}
+
+public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+store = context.getStateStore(storeName);
+
+if (store instanceof TimestampedKeyValueStore) {

Review Comment:
   I think this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525901


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java:
##
@@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final 
MaterializedInternal> materialize() {
+public StoreBuilder materialize() {

Review Comment:
   I we need it later, it's fine to do it right away. Maybe a little cleaner 
from a "commit history" POV do only to it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525901


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java:
##
@@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final 
MaterializedInternal> materialize() {
+public StoreBuilder materialize() {

Review Comment:
   I we need it later, it's fine to do it right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores

2023-03-01 Thread via GitHub


mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525554


##
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param  The key type
+ * @param  The value type
+ */
+public class KeyValueStoreWrapper implements StateStore {
+
+private TimestampedKeyValueStore timestampedStore = null;
+private VersionedKeyValueStore versionedStore = null;
+
+public KeyValueStoreWrapper(final ProcessorContext context, final 
String storeName) {
+try {
+// first try timestamped store
+timestampedStore = context.getStateStore(storeName);
+return;
+} catch (final ClassCastException e) {
+// ignore since could be versioned store instead
+}
+
+try {
+// next try versioned store
+versionedStore = context.getStateStore(storeName);
+} catch (final ClassCastException e) {
+throw new InvalidStateStoreException("KTable source state store 
must implement either TimestampedKeyValueStore or VersionedKeyValueStore.");
+}
+}
+
+public ValueAndTimestamp get(final K key) {
+if (timestampedStore != null) {
+return timestampedStore.get(key);
+}
+if (versionedStore != null) {
+final VersionedRecord versionedRecord = versionedStore.get(key);
+return versionedRecord == null
+? null
+: ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public void put(final K key, final V value, final long timestamp) {
+if (timestampedStore != null) {
+timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
+return;
+}
+if (versionedStore != null) {
+versionedStore.put(key, value, timestamp);
+return;
+}
+throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+}
+
+public StateStore getStore() {

Review Comment:
   My proposal was actually to have three member, the previously existing 
`KeyValueStore keyValueStore`, `VersionedStore versionStore` and a new `Store 
store`. So we just simplify the code for the case when we don't care about the 
store type, but when we care we check if `keyValueStore != null` or 
`versionStore != null` and do the right thing.
   
   Sorry for not explaining it good enough -- using an enum works, too, I 
guess, but also tend to agree it a little overkill. But you made the change 
already... So maybe also not worth to revert it again to sue `KeyValueStore 
keyValueStore` and `VersionedStore versionStore` as you did originally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-03-01 Thread via GitHub


guozhangwang commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1451145318

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-03-01 Thread via GitHub


guozhangwang merged PR #13319:
URL: https://github.com/apache/kafka/pull/13319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-03-01 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1119167627


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+

Review Comment:
   Could we remove the extra new line?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+private final File cacheDir;
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+private final Object lock = new Object();
+private final RemoteStorageManager remoteStorageManager;
+private final Map entries;
+private final ShutdownableThread cleanerThread;
+
+private volatile boolean closed = false;
+
+public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
+this(1024, remoteStorageManager, logDir);
+}
+
+/**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize  maximum number of segment index entries to 
be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used 
in fetching indexes.
+ * @param logDir   log directory
+ */
+public RemoteIndexCache(int maxSize, RemoteStorageManager 
remoteStorageManager, String logDir) throws IOException {
+   

[GitHub] [kafka] guozhangwang merged pull request #13288: MINOR: fix rerun-tests for unit test

2023-03-01 Thread via GitHub


guozhangwang merged PR #13288:
URL: https://github.com/apache/kafka/pull/13288


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13288: MINOR: fix rerun-tests for unit test

2023-03-01 Thread via GitHub


guozhangwang commented on PR #13288:
URL: https://github.com/apache/kafka/pull/13288#issuecomment-1451098190

   Ah thanks @chia7712 , my bad missing that change in the PR. As for the 
solution I do not have a preference in either side and I agree that both should 
not involve recompilation. Let's just follow-up with `rerun-tests` to avoid 
another change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-01 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1122473180


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   Perhaps we can also consider this a failure of the signature of 
Herder::requestTaskReconfiguration. The DistributedHerder makes this 
asynchronous, but provides no future or callback to confirm the progress of the 
request.
   Arguably StandaloneHerder is implementing the function signature correctly 
as a request that either succeeds or fails.
   
   It also makes me think that a connector which repeatedly calls 
requestTaskReconfiguration (and then always fails in generateTaskConfigs) could 
spam the herder with retried restart requests. This is such a messy situation 
that the old function signatures hid from us :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-01 Thread via GitHub


gharris1727 commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1122467555


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   Yes this is a change in behavior.
   
   There is precedent for throwing ConnectException from 
ConnectorContext::requestTaskReconfiguration, so perhaps wrapping this in a 
ConnectException and propagating it would be a better behavior. I can move this 
to HerderConnectorContext, except it would only be effective for the standalone 
herder.
   
   We can also see this as an opportunity to improve the StandaloneHerder by 
handling reconfigurations asynchronously and retry them in the background, 
rather than 500'ing the REST API or dropping the failure silently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-03-01 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1122411269


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && 

[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-01 Thread via GitHub


OneCricketeer commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1122385195


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   > a protected REST API does prevent untrusted users
   
   Sure, but authorized/trusted does not imply _non-malicious_. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jeffkbkim commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122374811


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -163,6 +163,20 @@ object Defaults {
   val GroupInitialRebalanceDelayMs = 3000
   val GroupMaxSize: Int = Int.MaxValue
 
+  /** New group coordinator configs */
+  val NewGroupCoordinatorEnable = false
+  val GroupCoordinatorNumThreads = 1
+
+  /** Consumer group configs */
+  val ConsumerGroupSessionTimeoutMs = 45000
+  val ConsumerGroupMinSessionTimeoutMs = 45000
+  val ConsumerGroupMaxSessionTimeoutMs = 6
+  val ConsumerGroupHeartbeatIntervalMs = 5000
+  val ConsumerGroupMinHeartbeatInternalMs = 5000
+  val ConsumerGroupMaxHeartbeatInternalMs = 15000
+  val ConsumerGroupMaxSize = Int.MaxValue
+  val ConsumerGroupAssignors = ""

Review Comment:
   KIP 848 lists
   ```
   org.apache.kafka.server.group.consumer.UniformAssignor, 
org.apache.kafka.server.group.consumer.RangeAssignor
   ```
   as the default. are we waiting until these two are implemented?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -163,6 +163,20 @@ object Defaults {
   val GroupInitialRebalanceDelayMs = 3000
   val GroupMaxSize: Int = Int.MaxValue
 
+  /** New group coordinator configs */
+  val NewGroupCoordinatorEnable = false
+  val GroupCoordinatorNumThreads = 1
+
+  /** Consumer group configs */
+  val ConsumerGroupSessionTimeoutMs = 45000
+  val ConsumerGroupMinSessionTimeoutMs = 45000
+  val ConsumerGroupMaxSessionTimeoutMs = 6
+  val ConsumerGroupHeartbeatIntervalMs = 5000
+  val ConsumerGroupMinHeartbeatInternalMs = 5000
+  val ConsumerGroupMaxHeartbeatInternalMs = 15000

Review Comment:
   should these be "Interval"?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -483,11 +497,27 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = 
"controlled.shutdown.retry.backoff.ms"
   val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+
   /** * Group coordinator configuration ***/
   val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms"
   val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms"
   val GroupInitialRebalanceDelayMsProp = "group.initial.rebalance.delay.ms"
   val GroupMaxSizeProp = "group.max.size"
+
+  /** New group coordinator configs */
+  val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable"
+  val GroupCoordinatorNumThreadsProp = "group.coordinator.threads"
+
+  /** Consumer group configs */
+  val ConsumerGroupSessionTimeoutMsProp = "group.consumer.session.timeout.ms"
+  val ConsumerGroupMinSessionTimeoutMsProp = 
"group.consumer.min.session.timeout.ms"
+  val ConsumerGroupMaxSessionTimeoutMsProp = 
"group.consumer.max.session.timeout.ms"
+  val ConsumerGroupHeartbeatIntervalMsProp = 
"group.consumer.heartbeat.interval.ms"
+  val ConsumerGroupMinHeartbeatInternalMsProp = 
"group.consumer.min.heartbeat.interval.ms"
+  val ConsumerGroupMaxHeartbeatInternalMsProp 
="group.consumer.max.heartbeat.interval.ms"

Review Comment:
   should these also be "Interval"?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)
+  .defineInternal(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
+
+  /** Consumer groups configs */
+  // All properties are kept internal until KIP-848 is released.
+  .defineInternal(ConsumerGroupSessionTimeoutMsProp, INT, 
Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, 
ConsumerGroupSessionTimeoutMsDoc)

Review Comment:
   how does the operator use ConsumerGroupSessionTimeoutMs and 
ConsumerGroupSessionMin/MaxTimeoutMs configs?
   
   is the idea to set min/max once, then configure the base timeout which the 
min/max would enforce? 



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 

[GitHub] [kafka] jolshan merged pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-03-01 Thread via GitHub


jolshan merged PR #13078:
URL: https://github.com/apache/kafka/pull/13078


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-03-01 Thread via GitHub


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1450918075

   Tests look unrelated. Going to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14451) Make range assignor rack-aware if consumer racks are configured

2023-03-01 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14451.

Fix Version/s: 3.5.0
 Reviewer: David Jacot
   Resolution: Fixed

> Make range assignor rack-aware if consumer racks are configured
> ---
>
> Key: KAFKA-14451
> URL: https://issues.apache.org/jira/browse/KAFKA-14451
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.5.0
>
>
> See KIP-881 for details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-03-01 Thread via GitHub


cmccabe commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1450843896

   > I'm confused by this comment. There is a 
./metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java in the 
review. It is the last file in the review.
   
   Sorry, my mistake. I see it now. I don't know how I missed it.
   
   We still need the MetadataVersion fixes and a test for that in the image 
test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram merged PR #12990:
URL: https://github.com/apache/kafka/pull/12990


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-03-01 Thread via GitHub


pprovenzano commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1450839352

   > We need a test like 
`./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` , 
similar to the tests for the other Image classes.
   
   I'm confused by this comment. There is a 
`./metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java` in the 
review. It is the last file in the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450838774

   @dajac Thanks for the reviews. Test failures not related (have verified them 
locally), merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1122285463


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Apologies for the delay - would you have any JMH benchmark for this change? 
E.g. something like in #13312.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122271424


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,43 +128,53 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
-public static class Builder {
-OffsetCommitResponseData data = new OffsetCommitResponseData();
-HashMap byTopicName = new 
HashMap<>();
+public short version() {
+return version;
+}
 
-private OffsetCommitResponseTopic getOrCreateTopic(
-String topicName
-) {
-OffsetCommitResponseTopic topic = byTopicName.get(topicName);
-if (topic == null) {
-topic = new OffsetCommitResponseTopic().setName(topicName);
-data.topics().add(topic);
-byTopicName.put(topicName, topic);
-}
-return topic;
+public static Builder newBuilder(TopicResolver topicResolver, short 
version) {
+if (version >= 9) {
+return new Builder<>(topicResolver, new ByTopicId(), version);

Review Comment:
   Note - this duplicated invocation of the `Builder` constructor is to allow 
the resolution of the parameter type as either `Uuid` or `String`. Not graceful 
but...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122265994


##
core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala:
##
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.common.message.OffsetCommitRequestData
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.{OffsetCommitRequestPartition,
 OffsetCommitRequestTopic}
+import org.apache.kafka.common.requests.{OffsetCommitRequest, 
OffsetCommitResponse}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.util.Collections.singletonList
+import java.util.Properties
+
+class OffsetCommitRequestTest extends BaseRequestTest {

Review Comment:
   Adding more tests to this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-03-01 Thread via GitHub


jsancio commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1450774546

   > @jsancio @hachikuji Was there a reason to have this field or was it added 
accidentally?
   
   @ijuma I think this is just an artifact that `quorum-state` was implemented 
before `KIP-631` added cluster id to `meta.properites`. In that change we 
missed removing cluster id from `quorum-state`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file

2023-03-01 Thread via GitHub


jsancio commented on PR #13102:
URL: https://github.com/apache/kafka/pull/13102#issuecomment-1450767005

   Thanks for the change and review. I think it is safe to remove the cluster 
id from `quorum-state`. The cluster id is stored and read from the file 
`meta.properties` in `KafkaConfig.metadataLogDir`. See 
https://github.com/apache/kafka/blob/510e99e1a2636f9a7035020f682ab7df8530986b/core/src/main/scala/kafka/raft/RaftManager.scala#L244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13318: [DO NOT MERGE] KAFKA-14533: Re-enable state-updater in SmokeTestDriverIntegrationTest

2023-03-01 Thread via GitHub


guozhangwang commented on PR #13318:
URL: https://github.com/apache/kafka/pull/13318#issuecomment-1450748832

   Re-starting new checks as `SmokeTestDriverIntegrationTest` did not fail 
again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122188628


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -166,20 +187,27 @@ public  Builder addPartitions(
 return this;
 }
 
-public Builder merge(
-OffsetCommitResponseData newData
-) {
+public Builder merge(OffsetCommitResponseData newData) {
+if (version >= 9) {
+// This method is called after the group coordinator committed 
the offsets. The group coordinator
+// provides the OffsetCommitResponseData it built in the 
process. As of now, this data does
+// not contain topic ids, so we resolve them here.
+newData.topics().forEach(
+topic -> 
topic.setTopicId(topicResolver.getTopicId(topic.name()).orElse(Uuid.ZERO_UUID)));

Review Comment:
   At this point, topic ids should be always resolvable. However if some 
aren't, we should fallback to adding the topic "as is" to the response to avoid 
caching `ZERO_UUID` with risk of overwrites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122175488


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -119,43 +130,62 @@ public boolean shouldClientThrottle(short version) {
 return version >= 4;
 }
 
-public static class Builder {
-OffsetCommitResponseData data = new OffsetCommitResponseData();
-HashMap byTopicName = new 
HashMap<>();
+public short version() {
+return version;
+}
 
-private OffsetCommitResponseTopic getOrCreateTopic(
-String topicName
-) {
-OffsetCommitResponseTopic topic = byTopicName.get(topicName);
-if (topic == null) {
-topic = new OffsetCommitResponseTopic().setName(topicName);
-data.topics().add(topic);
-byTopicName.put(topicName, topic);
-}
-return topic;
+public static Builder newBuilder(TopicResolver topicResolver, short 
version) {
+if (version >= 9) {
+return new Builder<>(topicResolver, new ByTopicId(), version);
+} else {
+return new Builder<>(topicResolver, new ByTopicName(), version);
 }
+}
 
-public Builder addPartition(
-String topicName,
-int partitionIndex,
-Errors error
-) {
-final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+public static final class Builder {
+private final TopicResolver topicResolver;
+private final TopicClassifier topicClassifier;
+private final short version;
 
-topicResponse.partitions().add(new OffsetCommitResponsePartition()
-.setPartitionIndex(partitionIndex)
-.setErrorCode(error.code()));
+private OffsetCommitResponseData data = new OffsetCommitResponseData();
+private final Map topics = new 
HashMap<>();
+
+protected Builder(TopicResolver topicResolver, TopicClassifier 
topicClassifier, short version) {
+this.topicResolver = topicResolver;
+this.topicClassifier = topicClassifier;
+this.version = version;
+}
+
+public Builder addPartition(String topicName, Uuid topicId, int 
partitionIndex, Errors error) {
+Uuid resolvedTopicId = maybeResolveTopicId(topicName, topicId);
+
+if (version >= 9 && Uuid.ZERO_UUID.equals(resolvedTopicId)) {
+Errors reported = error != Errors.NONE ? error : 
Errors.UNKNOWN_TOPIC_ID;

Review Comment:
   This case shouldn't be reachable because once we have proceeded with 
constructing the response via `addPartition` all topic ids are supposed to have 
been resolved successfully. Here, we choose to add the topic to the response 
with the error code `UNKNOWN_TOPIC_ID` if no error is already set. Any existing 
error is not overwritten.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-03-01 Thread via GitHub


C0urante commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1122150546


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   > This isn't stopped by a protected REST API.
   
   I don't really agree with this assessment, since a protected REST API does 
prevent untrusted users from submitting connector configurations.
   
   Generally speaking we've only tried to take unfettered access to Connect 
clusters into account for features that are enabled by default. Given that this 
feature is disabled by default, it's not necessary to take steps to guard it 
against malicious connector configurations.
   
   It's also worth noting that, if you're running a Connect cluster where 
non-trusted users are able to submit connector configurations, a REST extension 
or a fork of this config provider are both reasonable alternatives.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on PR #13322:
URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450638812

   Got it. So the distinction is "publishing" and that is done when the config 
is not internal. Thanks for clarifying.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


dajac commented on PR #13322:
URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450613278

   > Just a general question about configs -- do we have any guidelines for 
adding and removing configs between releases? Just curious about compatibility 
and typical guidelines around that.
   
   Configs are treated as public APIs so we treat them as such. This is why I 
keep all of them internal for now. They won't be published until we remove this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on PR #13322:
URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450610551

   Just a general question about configs -- do we have any guidelines for 
adding and removing configs between releases? Just curious about compatibility 
and typical guidelines around that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122123411


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)

Review Comment:
   nit: you can leave "null" out (same with the other prop below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122121405


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -976,6 +976,20 @@ class KafkaConfigTest {
 case RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
 case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
 
+/** New group coordinator configs */
+case KafkaConfig.NewGroupCoordinatorEnableProp => // ignore string

Review Comment:
   nit: this one isn't a string -- some of the other configs just said "ignore"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122120725


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -976,6 +976,20 @@ class KafkaConfigTest {
 case RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
 case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
 
+/** New group coordinator configs */
+case KafkaConfig.NewGroupCoordinatorEnableProp => // ignore string
+case KafkaConfig.GroupCoordinatorNumThreadsProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+
+/** Consumer groups configs */
+case KafkaConfig.ConsumerGroupSessionTimeoutMsProp => // ignore string

Review Comment:
   Should this one assert the same as the ones below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


jolshan commented on code in PR #13322:
URL: https://github.com/apache/kafka/pull/13322#discussion_r1122119145


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1267,6 +1314,24 @@ object KafkaConfig {
   .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc)
   .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
 
+  /** New group coordinator configs */
+  // All properties are kept internal until KIP-848 is released.
+  // This property meant to be here only during the development of 
KIP-848. It will
+  // be replaced by a metadata version before releasing it.
+  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc)
+  .defineInternal(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
+
+  /** Consumer groups configs */
+  // All properties are kept internal until KIP-848 is released.
+  .defineInternal(ConsumerGroupSessionTimeoutMsProp, INT, 
Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, 
ConsumerGroupSessionTimeoutMsDoc)
+  .defineInternal(ConsumerGroupMinSessionTimeoutMsProp, INT, 
Defaults.ConsumerGroupMinSessionTimeoutMs, atLeast(1), MEDIUM, 
ConsumerGroupMinSessionTimeoutMsDoc)
+  .defineInternal(ConsumerGroupMaxSessionTimeoutMsProp, INT, 
Defaults.ConsumerGroupMaxSessionTimeoutMs, atLeast(1), MEDIUM, 
ConsumerGroupMaxSessionTimeoutMsDoc)

Review Comment:
   Do we want to enforce that the min value is less than or equal to the max 
value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-03-01 Thread via GitHub


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1450588812

   Seems like there was a build falilure related to `rat` I restarted the build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-01 Thread via GitHub


gharris1727 commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1122112042


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -419,7 +423,14 @@ private  Collection> 
getServiceLoaderPluginDesc(Class klass,
 Collection> result = new ArrayList<>();
 try {
 ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (T pluginImpl : serviceLoader) {
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Unable to instantiate plugin{}", 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   I changed this and the previous log message so that it wasn't confusing that 
we were instantiating a SinkConnector.
   It is unfortunate that on this branch we don't get to see the `Class` 
after a failure occurs, and have to rely on the ServiceConfigurationError 
message to report what specific plugin had an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-01 Thread via GitHub


gharris1727 commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1122110705


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -439,6 +457,22 @@ public static  String versionFor(Class 
pluginKlass) throws Refle
 versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : 
UNDEFINED_VERSION;
 }
 
+private static String reflectiveErrorDescription(Throwable t) {
+if (t instanceof NoSuchMethodException) {
+return ": Plugin class must have a default constructor, and cannot 
be a non-static inner class";
+} else if (t instanceof SecurityException) {
+return ": Security settings must allow reflective instantiation of 
plugin classes";
+} else if (t instanceof IllegalAccessException) {
+return ": Plugin class default constructor must be public";
+} else if (t instanceof ExceptionInInitializerError) {
+return ": Plugin class should not throw exception during static 
initialization";
+} else if (t instanceof InvocationTargetException) {
+return ": Constructor must complete without throwing an exception";
+} else {
+return "";

Review Comment:
   I think that actionable error messages are better than non-actionable ones, 
especially when they can save the novice user a trip to the internet or risk 
chasing the wrong problem. They also shouldn't be misleading enough to get in 
the way of an advanced user.
   
   I do agree that those particular messages were a bit weak though, and those 
inner messages did not really do a good job of explaining what went wrong. I 
now know what an ExceptionInInitializerError means, but I do remember being 
quite confused by it 2-3y ago when I first encountered it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes

2023-03-01 Thread via GitHub


gharris1727 commented on code in PR #13182:
URL: https://github.com/apache/kafka/pull/13182#discussion_r1122107145


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##
@@ -111,20 +115,62 @@ public enum TestPlugin {
 /**
  * A plugin which shares a jar file with {@link 
TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE}
  */
-MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo");
+MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", 
"test.plugins.ThingTwo"),
+/**
+ * A plugin which is incorrectly packaged, and is missing a superclass 
definition.
+ */
+FAIL_TO_INITIALIZE_MISSING_SUPERCLASS("fail-to-initialize", 
"test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is packaged with other incorrectly packaged plugins, 
but itself has no issues loading.
+ */
+FAIL_TO_INITIALIZE_CO_LOCATED("fail-to-initialize", 
"test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER),
+/**
+ * A connector which is incorrectly packaged, and throws during static 
initialization.
+ */
+
FAIL_TO_INITIALIZE_STATIC_INITIALIZER_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER),
+/**
+ * A plugin which is incorrectly packaged, which throws an exception 
from the {@link Versioned#version()} method.
+ */
+
FAIL_TO_INITIALIZE_VERSION_METHOD_THROWS_CONNECTOR("fail-to-initialize", 
"test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER),

Review Comment:
   This is not a description of the behavior of the plugin, but how it is 
packaged. It is packaged with the other connectors which fail during plugin 
discovery. And it is packaged this way because before this patch, throwing from 
the version method caused other plugins to be shadowed (such as 
CoLocatedPlugin).
   
   I think that the name of this group of plugins could change though. Are any 
of `FAIL_DURING_DISCOVERY`, `PACKAGED_WITH_FAILING_PLUGINS`, or `BAD_PACKAGING` 
better than `FAIL_TO_INITIALIZE`?
   Alternatively I could remove the prefix from these constants and keep or 
change the `fail-to-initialize` directory name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-03-01 Thread via GitHub


guozhangwang commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450565316

   I checked the latest failure on `shouldPauseStandbyTaskAndNotTransit..` and 
found out that it's because of the test condition itself is timing dependent. 
What it wants to verify is the following sequence:
   
   `task1 added, task2 added, and then task1 paused`. In this case the 
`transitToUpdateStandby` should be called once.
   
   But it is possible that the following sequence can also happen:
   
   `task1 added, task1 paused, task2 added` (because the updater thread can 
interleave its runOnce with the `addTask` call), in which case the 
`transitToUpdateStandby` could be called twice.
   
   I've relaxed that condition and I think this test should not fail anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1122096890


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -149,13 +161,15 @@ public Builder addPartition(
 return this;
 }
 
-public  Builder addPartitions(
-String topicName,
-List partitions,
-Function partitionIndex,
-Errors error
+public final  Builder addPartitions(

Review Comment:
   There is still a problem here if `topicName` and `topicId` are both 
undefined in which case we should do what was done before and add to the 
response without caching.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -425,35 +426,59 @@ class KafkaApis(val requestChannel: RequestChannel,
   requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
+  val topicResolver = 
TopicResolver.fromTopicIds(metadataCache.topicNamesToIds())

Review Comment:
   Move the `TopicResolver` in the `MetadataCache` or create it without copying 
the map of topic ids as this is costly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-03-01 Thread via GitHub


guozhangwang commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450512377

   Honestly I do not quite understanding how mockito is messing here.. take 
`shouldPauseActiveTaskAndTransitToUpdateStandby` for example, after task1 
(named topologyA) and task2 (named topologyB) are updating, then there's the 
line `when(topologyMetadata.isPaused("A")).thenReturn(true);`. In succeeding 
cases, task1 is indeed paused, and everything works just fine; however in those 
flaky failing cases, task2 is paused instead, indicating 
`topologyMetadata.isPaused("B")` returns true but 
`topologyMetadata.isPaused("A")` returns false... I honestly have no idea why 
that could happen, but after changing that to 
`when(topologyMetadata.isPaused(task1.id().topologyName())).thenReturn(false).thenReturn(true);`
 (i.e. let it to return false on the first call when adding task, and then 
return true on the second call to pause) I did not see that test failing any 
more. I checked mockito's docs but still do not see how `when` could mess up on 
the return values, so I kinda fixed it but do not know 
 exactly why I fixed it..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc

2023-03-01 Thread via GitHub


gharris1727 commented on PR #13315:
URL: https://github.com/apache/kafka/pull/13315#issuecomment-1450445379

   It appears that the mustRunAfter directive has fixed the build, and now it's 
just normal flakey tests. I won't be reverting the fixups and should be ready 
for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-03-01 Thread via GitHub


satishd commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1121995162


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {

Review Comment:
   This is the reason I did not add it earlier. But I am not strongly 
opinionated about that. 
   I added them with the latest commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-03-01 Thread via GitHub


satishd commented on PR #13304:
URL: https://github.com/apache/kafka/pull/13304#issuecomment-1450425363

   Thanks @junrao @ijuma for your review. Addressed the review comments with 
the latest commit. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1450362232

   Please note that as mentioned by David above, in the current state, topic 
ids are not provided by the group coordinator when constructing the response, 
hence are not returned with the `OffsetCommitResponse`. A change to the PR will 
be made to address this shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-03-01 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450247692

   @C0urante ready for your review. I have increased the timeout to 2 min. and 
also added the wait for mirror maker to get ready. In a separate JIRA we should 
talk about how we can make the startup synchronous. I have created one such 
JIRA at https://issues.apache.org/jira/browse/KAFKA-14773 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14773) Make MirrorMaker startup synchronous

2023-03-01 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14773:


 Summary: Make MirrorMaker startup synchronous
 Key: KAFKA-14773
 URL: https://issues.apache.org/jira/browse/KAFKA-14773
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya


Currently, MirrorMaker is started using `
./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if 
the shell command has exited and a log with `Kafka MirrorMaker started` has 
been printed, it is likely that the underlying connectors and tasks have not 
been configured.

This tasks aims to make the MirrorMaker startup synchronous by either waiting 
for connections & tasks to move to running state before exiting the 
`MirrorMaker#start()` function or by blocking completion of `main()`.

A conversation about this was done in 
[https://github.com/apache/kafka/pull/13284] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

2023-03-01 Thread via GitHub


divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1121820892


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror.integration;

Review Comment:
   Note for reviewers: this was required so that we can access 
isConfiguredAndRunning from the test without having to make the method public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-03-01 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1121810375


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   ping @ijuma / @Hangleton , are there any blockers to getting this merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14772) Add ConsumerGroupHeartbeat API to AuthorizerIntegrationTest

2023-03-01 Thread David Jacot (Jira)
David Jacot created KAFKA-14772:
---

 Summary: Add ConsumerGroupHeartbeat API to 
AuthorizerIntegrationTest
 Key: KAFKA-14772
 URL: https://issues.apache.org/jira/browse/KAFKA-14772
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot


When the new group coordinator can be enabled, we need to update 
AuthorizerIntegrationTest to include new APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-03-01 Thread via GitHub


satishd commented on PR #13067:
URL: https://github.com/apache/kafka/pull/13067#issuecomment-1450196726

   > @satishd can you help review this?
   
   @ijuma Sure, I will have cycles after this week. I will review it by next 
Monday. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)

2023-03-01 Thread via GitHub


dajac opened a new pull request, #13322:
URL: https://github.com/apache/kafka/pull/13322

   This patch adds the new server configurations introduced in KIP-848. All of 
them are kept as internal configurations for now. We will make them public when 
the KIP is ready. It also adds an internal configuration named 
`group.coordinator.new.enable` that we will used during the development of the 
KIP. This one will be remove later on and replaced by IBP/feature flag.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on PR #12990:
URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450159413

   @dajac Thanks for the reviews, I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121733442


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
-  def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = {
-consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a")
-
consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
classOf[RackAwareAssignor].getName)
-val consumer = createConsumer()
-consumer.subscribe(Set(topic).asJava)
-awaitAssignment(consumer, Set(tp, tp2))
-  }
-}
+  def testRackAwareRangeAssignor(): Unit = {

Review Comment:
   Ah, I missed that one, I think it is one of the new tests that was added 
when fixing FFF issues. I moved this test to that class. It uses 2 brokers 
instead of 3, but that seems ok for this test. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());

Review Comment:
   Good idea, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


rajinisivaram commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121728688


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+private void assignWithRackMatching(Collection 
assignmentStates,
+

[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1450155072

   Thanks David, included all your comments in the PR. I now work on:
   - How to limit the highest version of the OffsetCommit API to 8 in the admin 
client;
   - Integration test mentioned in a prior review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121724366


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -50,14 +52,21 @@
 public class OffsetCommitResponse extends AbstractResponse {
 
 private final OffsetCommitResponseData data;
+private final short version;

Review Comment:
   Adding the version to the response seems to be an anti-pattern as I haven't 
seen any other similar use in other responses. Semantically it should be OK 
because the response instance is supposed to be built against a given version. 
If another approach is advisable, I will remove it.



##
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java:
##
@@ -435,17 +435,32 @@ public void testOffsetCommitRequestVersions() throws 
Exception {
 int partition = 2;
 int offset = 100;
 
-testAllMessageRoundTrips(new OffsetCommitRequestData()
- .setGroupId(groupId)
- .setTopics(Collections.singletonList(
- new OffsetCommitRequestTopic()
- .setName(topicName)
- 
.setPartitions(Collections.singletonList(
- new 
OffsetCommitRequestPartition()
- 
.setPartitionIndex(partition)
- 
.setCommittedMetadata(metadata)
- 
.setCommittedOffset(offset)
- );
+OffsetCommitRequestData byTopicName = new OffsetCommitRequestData()
+.setGroupId(groupId)
+.setTopics(Collections.singletonList(
+new OffsetCommitRequestTopic()
+.setName(topicName)
+.setPartitions(Collections.singletonList(
+new OffsetCommitRequestPartition()
+.setPartitionIndex(partition)
+.setCommittedMetadata(metadata)
+.setCommittedOffset(offset)
+;
+
+OffsetCommitRequestData byTopicId = new OffsetCommitRequestData()
+.setGroupId(groupId)
+.setTopics(Collections.singletonList(
+new OffsetCommitRequestTopic()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Collections.singletonList(
+new OffsetCommitRequestPartition()
+.setPartitionIndex(partition)
+.setCommittedMetadata(metadata)
+.setCommittedOffset(offset)
+;
+
+testAllMessageRoundTripsBeforeVersion((short) 9, byTopicName, 
byTopicName);

Review Comment:
   Note: is this OK to break message round trip between < 9 and >= 9?



##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -188,12 +199,62 @@ public Builder merge(
 }
 });
 }
-
 return this;
 }
 
-public OffsetCommitResponse build() {
-return new OffsetCommitResponse(data);
+public final OffsetCommitResponse build() {
+return new OffsetCommitResponse(data, version);
+}
+
+protected abstract void validate(String topicName, Uuid topicId);
+
+protected abstract T classifer(String topicName, Uuid topicId);
+
+private OffsetCommitResponseTopic getOrCreateTopic(String topicName, 
Uuid topicId) {
+T topicKey = classifer(topicName, topicId);
+OffsetCommitResponseTopic topic = topics.get(topicKey);
+if (topic == null) {
+topic = new 
OffsetCommitResponseTopic().setName(topicName).setTopicId(topicId);
+data.topics().add(topic);
+topics.put(topicKey, topic);
+}
+return topic;
+}
+}
+
+public static final class BuilderByTopicId extends Builder {

Review Comment:
   Will add Javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-01 Thread via GitHub


chia7712 commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450145297

   @divijvaidya thanks for great feedback!
   
   >  we have also have to ensure that deadlock doesn't occur when trying to 
acquire SocketServer lock and Processors lock.
   
   that is interesting. the mutation of processors is locked by `Acceptor` 
object. It means getting only lock of SocketServer is not safe when we plan to 
access the `processors`. We should require `Acceptor` lock also. In other 
words, this issue is related not only performance but also potential bug 
(concurrent issue).
   
   > we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for 
storing processors.
   
   I will use `CopyOnWriteArrayList`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

2023-03-01 Thread via GitHub


divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450105039

   > This is less concerning because the MirrorMaker class isn't part of public 
API;
   
   It's not technically part of public API but it has a `main()` method which 
is executed by the customers. As you pointed out, today, they have to rely on 
logs/metrics to figure out whether MM started correctly or not. 
   
   I agree to the creation of `connectorStatus()` and usage in tests as you 
suggested, but additionally we should block the completion of `main()` until 
all connectors are online. The part about blocking could be taken up in a 
different PR and we can discuss the merits/demerits of doing that over there.
   
   Meanwhile, I will add `connectorStatus()` and relevant changes to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

2023-03-01 Thread via GitHub


divijvaidya commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450075461

   I am afraid the latest approach will also not work. This is because there is 
a possibility that `ArrayBuffer` is internally performing size expansion while 
we read at `dataPlaneProcessors.size` or when iterating on it using the `map`. 
This concurrent access could lead to undefined results (notably, this would not 
have been a problem if we were using a fixed size array). Also, note that the 
current implementation works for `controlPlaneAcceptorOpt` since they do not 
access the `processors` arrayBuffer outside the lock.
   
   When I mentioned, option 1 of using fine grained locking, I actually implied 
locking on processors object instead of locking on entire SocketServer object. 
If we go down this path, we will have to change other places in the file to 
acquire this processor lock when mutation and we have also have to ensure that 
deadlock doesn't occur when trying to acquire SocketServer lock and Processors 
lock.
   
   Hence, my suggestion would be to opt for a lock-free concurrent access data 
structure for storing processors.
   
   Here's our requirement for such a data structure:
   - we don't mutate the data structure frequently, so even if writes are slow, 
we are ok with that.
   - we require lock-free concurrent reads since we perform a read with every 
connection setup and every time we emit a metric
   - the size of the data structure is going to be small, in tens to low 
hundreds entries
   - the data structure should be able to expand it's size since we allow 
dynamic shrinking and expanding
   
   Based on the above, we can choose to use a ConcurrentHashMap or a 
CopyOnWriteArrayList for storing processors.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-01 Thread Pierangelo Di Pilato (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695040#comment-17695040
 ] 

Pierangelo Di Pilato edited comment on KAFKA-14771 at 3/1/23 12:22 PM:
---

[~showuon] what do you think about this improvement?


was (Author: pierdipi):
[~lukech] what do you think about this improvement?

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-01 Thread Pierangelo Di Pilato (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695040#comment-17695040
 ] 

Pierangelo Di Pilato commented on KAFKA-14771:
--

[~lukech] what do you think about this improvement?

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest

2023-03-01 Thread via GitHub


showuon commented on PR #13319:
URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450060935

   I can see the test result is better, but there are still failed tests in 
`DefaultStateUpdaterTest`
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-01 Thread Pierangelo Di Pilato (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695039#comment-17695039
 ] 

Pierangelo Di Pilato commented on KAFKA-14771:
--

I can provide a patch if this improvement is desired by maintainers

> Include current thread ids in ConcurrentModificationException message
> -
>
> Key: KAFKA-14771
> URL: https://issues.apache.org/jira/browse/KAFKA-14771
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.2, 3.2.3
>Reporter: Pierangelo Di Pilato
>Priority: Minor
>  Labels: consumer
>
> In the KafkaConsumer.acquire method a ConcurrentModificationException 
> exception is thrown when
>  
> {code:java}
> threadId != currentThread.get() && 
> !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
> however, the exception message doesn't include:
>  
>  * Thread.currentThread().getId()
>  * currentThread.get() 
>  
> I think including the aforementioned variables is very useful for debugging 
> the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message

2023-03-01 Thread Pierangelo Di Pilato (Jira)
Pierangelo Di Pilato created KAFKA-14771:


 Summary: Include current thread ids in 
ConcurrentModificationException message
 Key: KAFKA-14771
 URL: https://issues.apache.org/jira/browse/KAFKA-14771
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 3.2.3, 2.8.2
Reporter: Pierangelo Di Pilato


In the KafkaConsumer.acquire method a ConcurrentModificationException exception 
is thrown when

 
{code:java}
threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code}
however, the exception message doesn't include:

 
 * Thread.currentThread().getId()


 * currentThread.get() 

 

I think including the aforementioned variables is very useful for debugging the 
issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449936916

   Sorry, used the wrong button...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac closed pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit 
API and propagate for request version >= 9
URL: https://github.com/apache/kafka/pull/13240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449935049

   > Hello David, thanks for the fast review. Apologies for being slow, I 
hadn't finished the previous revision. Will include your comments. Working on 
it right now. Thanks!
   
   No worries. You are not slow. I just 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


Hangleton commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449920535

   Hello David, thanks for the fast review. Apologies for being slow, I hadn't 
finished the previous revision. Will include your comments. Working on it right 
now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14770) Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match

2023-03-01 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-14770:
--

 Summary: Allow dynamic keystore update for brokers if string 
representation of DN matches even if canonical DNs don't match
 Key: KAFKA-14770
 URL: https://issues.apache.org/jira/browse/KAFKA-14770
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 3.5.0


To avoid mistakes during dynamic broker config updates that could potentially 
affect clients, we restrict changes that can be performed dynamically without 
broker restart. For broker keystore updates, we require the DN to be the same 
for the old and new certificates since this could potentially contain host 
names used for host name verification by clients. DNs are compared using 
standard Java implementation of X500Principal.equals() which compares canonical 
names. If tags of fields change from one with a printable string representation 
and one without or vice-versa, canonical name check fails even if the actual 
name is the same since canonical representation converts to hex for some tags 
only. We can relax the verification to allow dynamic updates in this case by 
enabling dynamic update if either the canonical name or the RFC2253 string 
representation of the DN matches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121510354


##
checkstyle/suppressions.xml:
##
@@ -93,7 +93,7 @@
   
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
 
 
+  
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|OffsetCommitResponseTest).java"/>

Review Comment:
   Is this really needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121506569


##
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java:
##
@@ -17,37 +17,64 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.function.Function.identity;
 import static 
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class OffsetCommitResponseTest {
 
 protected final int throttleTimeMs = 10;
 
-protected final String topicOne = "topic1";
+protected final String topic1 = "topic1";
+protected final Uuid topic1Id = Uuid.randomUuid();
 protected final int partitionOne = 1;
+protected final int partitionTwo = 2;
 protected final Errors errorOne = Errors.COORDINATOR_NOT_AVAILABLE;
 protected final Errors errorTwo = Errors.NOT_COORDINATOR;
-protected final String topicTwo = "topic2";
-protected final int partitionTwo = 2;
-
-protected TopicPartition tp1 = new TopicPartition(topicOne, partitionOne);
-protected TopicPartition tp2 = new TopicPartition(topicTwo, partitionTwo);
+protected final String topic2 = "topic2";
+protected final int p3 = 3;
+protected final int p4 = 4;
+protected final String topic3 = "topic3";
+protected final int p5 = 5;
+protected final int p6 = 6;
+protected final String topic4 = "topic4";
+protected final Uuid topic4Id = Uuid.randomUuid();
+protected final int p7 = 7;
+protected final int p8 = 8;
+protected final Uuid topic5Id = Uuid.randomUuid();
+protected final int p9 = 9;
+protected final int p10 = 10;
+protected final String topic6 = "topic6";
+protected final Uuid topic6Id = Uuid.randomUuid();
+protected final int p11 = 11;
+protected final int p12 = 12;

Review Comment:
   I am not a fan of all those attributes in test. One or two are fine if they 
are really re-used on all the tests. Otherwise, it may be better to check 
define what you need in tests. I would also use `TopicIdPartition` when 
relevant so you can basically group the name, id, and partition together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121501980


##
clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java:
##
@@ -80,10 +81,10 @@ public void setUp() {
 );
 }
 
-@Test
 @Override
-public void testConstructor() {
-
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+public void testConstructor(short version) {

Review Comment:
   Is this change related to the PR? If not, I would rather do it in a separate 
PR.



##
clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java:
##
@@ -38,9 +40,9 @@ public void testConstructorWithErrorResponse() {
 assertEquals(throttleTimeMs, response.throttleTimeMs());
 }
 
-@Test
-@Override
-public void testParse() {
+@ParameterizedTest
+@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)

Review Comment:
   Same question here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121499577


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##
@@ -188,12 +188,73 @@ public Builder merge(
 }
 });
 }
-
 return this;
 }
 
-public OffsetCommitResponse build() {
+public final OffsetCommitResponse build() {
 return new OffsetCommitResponse(data);
 }
+
+protected abstract void validate(String topicName, Uuid topicId);
+
+protected abstract T classifer(String topicName, Uuid topicId);
+
+protected abstract OffsetCommitResponseTopic newTopic(String 
topicName, Uuid topicId);
+private OffsetCommitResponseTopic getOrCreateTopic(String topicName, 
Uuid topicId) {
+T topicKey = classifer(topicName, topicId);
+OffsetCommitResponseTopic topic = topics.get(topicKey);
+if (topic == null) {
+topic = newTopic(topicName, topicId);
+data.topics().add(topic);
+topics.put(topicKey, topic);
+}
+return topic;
+}
+}
+
+public static final class BuilderByTopicId extends Builder {
+protected BuilderByTopicId(short version) {
+super(version);
+}
+
+@Override
+protected void validate(String topicName, Uuid topicId) {
+if (topicId == null || Uuid.ZERO_UUID.equals(topicId))
+throw new UnsupportedVersionException("OffsetCommitResponse 
version " + version +
+" does not support zero topic IDs.");
+}
+
+@Override
+protected Uuid classifer(String topicName, Uuid topicId) {
+return topicId;
+}
+
+@Override
+protected OffsetCommitResponseTopic newTopic(String topicName, Uuid 
topicId) {
+return new 
OffsetCommitResponseTopic().setName(null).setTopicId(topicId);

Review Comment:
   We could get this in the base class and always set both of them. The 
serialization framework knows what to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121496085


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##
@@ -101,18 +103,18 @@ public static List 
getErrorResponseTopics(

.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
 }
-responseTopicData.add(new OffsetCommitResponseTopic()
-.setName(entry.name())
-.setPartitions(responsePartitions)
-);
+OffsetCommitResponseTopic responseTopic = new 
OffsetCommitResponseTopic()
+.setTopicId(version >= 9 ? entry.topicId() : Uuid.ZERO_UUID)
+.setName(version < 9 ? entry.name() : null);

Review Comment:
   I think that we could just set both the name and the id all the time as the 
fields are ignorable. The serialization framework will do the right thing based 
on the version. We could also remove `version` from the arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121494379


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##
@@ -57,8 +55,22 @@ public Builder(OffsetCommitRequestData data) {
 public OffsetCommitRequest build(short version) {
 if (data.groupInstanceId() != null && version < 7) {
 throw new UnsupportedVersionException("The broker offset 
commit protocol version " +
-version + " does not support usage of config 
group.instance.id.");
+version + " does not support usage of config 
group.instance.id.");
 }
+
+data.topics().forEach(topic -> {

Review Comment:
   I think that we could remove those checks now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121489116


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel,
   requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
+  val topicNames =
+if (offsetCommitRequest.version() >= 9)
+  metadataCache.topicIdsToNames()
+else
+  Collections.emptyMap[Uuid, String]()
+
+  val resolvedTopics = new 
ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+var topicName = topic.name()
+if (Utils.isBlank(topicName)) {
+  // Expected for requests version >= 9 which rely on topic IDs 
exclusively.
+  topicName = topicNames.get(topic.topicId())
+}
+if (topicName != null) {
+  topic.setName(topicName)
+  resolvedTopics += topic
+}
+  }
+
   val authorizedTopics = authHelper.filterByAuthorized(
 request.context,
 READ,
 TOPIC,
-offsetCommitRequest.data.topics.asScala
+resolvedTopics
   )(_.name)
 
-  val responseBuilder = new OffsetCommitResponse.Builder()
+  val responseBuilder = 
OffsetCommitResponse.newBuilder(offsetCommitRequest.version())
   val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
   offsetCommitRequest.data.topics.forEach { topic =>

Review Comment:
   You could use `resolvedTopics` instead of `offsetCommitRequest.data.topics` 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121488495


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel,
   requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
+  val topicNames =
+if (offsetCommitRequest.version() >= 9)
+  metadataCache.topicIdsToNames()
+else
+  Collections.emptyMap[Uuid, String]()
+
+  val resolvedTopics = new 
ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+var topicName = topic.name()
+if (Utils.isBlank(topicName)) {
+  // Expected for requests version >= 9 which rely on topic IDs 
exclusively.
+  topicName = topicNames.get(topic.topicId())
+}
+if (topicName != null) {
+  topic.setName(topicName)
+  resolvedTopics += topic
+}
+  }
+
   val authorizedTopics = authHelper.filterByAuthorized(
 request.context,
 READ,
 TOPIC,
-offsetCommitRequest.data.topics.asScala
+resolvedTopics
   )(_.name)
 
-  val responseBuilder = new OffsetCommitResponse.Builder()
+  val responseBuilder = 
OffsetCommitResponse.newBuilder(offsetCommitRequest.version())
   val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
   offsetCommitRequest.data.topics.forEach { topic =>
-if (!authorizedTopics.contains(topic.name)) {
+if (Utils.isBlank(topic.name)) {

Review Comment:
   I would move this up and do it in the first iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121487731


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel,
   requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
   CompletableFuture.completedFuture[Unit](())
 } else {
+  val topicNames =
+if (offsetCommitRequest.version() >= 9)
+  metadataCache.topicIdsToNames()
+else
+  Collections.emptyMap[Uuid, String]()
+
+  val resolvedTopics = new 
ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+  offsetCommitRequest.data.topics.forEach { topic =>
+var topicName = topic.name()
+if (Utils.isBlank(topicName)) {

Review Comment:
   It would be better to rely on the version of the request instead of the 
topic name here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-03-01 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1121486904


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1352,8 +1361,42 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture futu
 Set unauthorizedTopics = new HashSet<>();
 
 for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+String topicName = topic.name();

Review Comment:
   Now that we can rely on the version, we should use it here and simplify all 
this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)

2023-03-01 Thread via GitHub


dajac commented on code in PR #12990:
URL: https://github.com/apache/kafka/pull/12990#discussion_r1121387449


##
clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java:
##
@@ -74,45 +105,194 @@ public String name() {
 
 private Map> consumersPerTopic(Map consumerMetadata) {
 Map> topicToConsumers = new HashMap<>();
-for (Map.Entry subscriptionEntry : 
consumerMetadata.entrySet()) {
-String consumerId = subscriptionEntry.getKey();
-MemberInfo memberInfo = new MemberInfo(consumerId, 
subscriptionEntry.getValue().groupInstanceId());
-for (String topic : subscriptionEntry.getValue().topics()) {
-put(topicToConsumers, topic, memberInfo);
-}
-}
+consumerMetadata.forEach((consumerId, subscription) -> {
+MemberInfo memberInfo = new MemberInfo(consumerId, 
subscription.groupInstanceId(), subscription.rackId());
+subscription.topics().forEach(topic -> put(topicToConsumers, 
topic, memberInfo));
+});
 return topicToConsumers;
 }
 
+/**
+ * Performs range assignment of the specified partitions for the consumers 
with the provided subscriptions.
+ * If rack-awareness is enabled for one or more consumers, we perform 
rack-aware assignment first to assign
+ * the subset of partitions that can be aligned on racks, while retaining 
the same co-partitioning and
+ * per-topic balancing guarantees as non-rack-aware range assignment. The 
remaining partitions are assigned
+ * using standard non-rack-aware range assignment logic, which may result 
in mis-aligned racks.
+ */
 @Override
-public Map> assign(Map 
partitionsPerTopic,
-Map 
subscriptions) {
+public Map> assignPartitions(Map> partitionsPerTopic,
+  Map subscriptions) {
 Map> consumersPerTopic = 
consumersPerTopic(subscriptions);
+Map consumerRacks = consumerRacks(subscriptions);
+List topicAssignmentStates = 
partitionsPerTopic.entrySet().stream()
+.filter(e -> !e.getValue().isEmpty())
+.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), 
consumersPerTopic.get(e.getKey()), consumerRacks))
+.collect(Collectors.toList());
 
 Map> assignment = new HashMap<>();
-for (String memberId : subscriptions.keySet())
-assignment.put(memberId, new ArrayList<>());
+subscriptions.keySet().forEach(memberId -> assignment.put(memberId, 
new ArrayList<>()));
+
+boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> 
t.needsRackAwareAssignment);
+if (useRackAware)
+assignWithRackMatching(topicAssignmentStates, assignment);
+
+topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, 
assignment));
+
+if (useRackAware)
+assignment.values().forEach(list -> 
list.sort(PARTITION_COMPARATOR));
+return assignment;
+}
+
+// This method is not used, but retained for compatibility with any custom 
assignors that extend this class.
+@Override
+public Map> assign(Map 
partitionsPerTopic,
+Map 
subscriptions) {
+return 
assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions);
+}
 
-for (Map.Entry> topicEntry : 
consumersPerTopic.entrySet()) {
-String topic = topicEntry.getKey();
-List consumersForTopic = topicEntry.getValue();
+private void assignRanges(TopicAssignmentState assignmentState,
+  BiFunction 
mayAssign,
+  Map> assignment) {
+for (String consumer : assignmentState.consumers.keySet()) {
+if (assignmentState.unassignedPartitions.isEmpty())
+break;
+List assignablePartitions = 
assignmentState.unassignedPartitions.stream()
+.filter(tp -> mayAssign.apply(consumer, tp))
+.collect(Collectors.toList());
 
-Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
-if (numPartitionsForTopic == null)
+int maxAssignable = 
Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size());
+if (maxAssignable <= 0)
 continue;
 
-Collections.sort(consumersForTopic);
+assign(consumer, assignablePartitions.subList(0, maxAssignable), 
assignmentState, assignment);
+}
+}
 
-int numPartitionsPerConsumer = numPartitionsForTopic / 
consumersForTopic.size();
-int consumersWithExtraPartition = numPartitionsForTopic % 
consumersForTopic.size();
+private void assignWithRackMatching(Collection 
assignmentStates,
+Map> 

  1   2   >