[GitHub] [kafka] vcrfxia commented on pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
vcrfxia commented on PR #13264: URL: https://github.com/apache/kafka/pull/13264#issuecomment-1451404037 > There is test failures. Could it be related to this PR? Good catch. These failures only appeared when I switched from the original try-catch approach for casting stores to either TimestampedKeyValueStore or VersionedKeyValueStore, to fetching the store as a generic StateStore and then performing `instanceof` checks. The failures happened because the unit tests use a mock context which returns null for the state store, which failed the `instanceof` checks. I'm pretty sure in the actual code (non-mocked contexts) it's not possible for `context.getStateStore()` to return null and therefore this "issue" is limited to unit tests only, but I've updated the PR to return to the original try-catch approach for casting in order to be safe (and in light of your other comment above, regarding type casts on every `put/get` call). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
vcrfxia commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122676573 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private TimestampedKeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = context.getStateStore(storeName); +return; +} catch (final ClassCastException e) { +// ignore since could be versioned store instead +} + +try { +// next try versioned store +versionedStore = context.getStateStore(storeName); +} catch (final ClassCastException e) { +throw new InvalidStateStoreException("KTable source state store must implement either TimestampedKeyValueStore or VersionedKeyValueStore."); +} +} + +public ValueAndTimestamp get(final K key) { +if (timestampedStore != null) { +return timestampedStore.get(key); +} +if (versionedStore != null) { +final VersionedRecord versionedRecord = versionedStore.get(key); +return versionedRecord == null +? null +: ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()); +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public void put(final K key, final V value, final long timestamp) { +if (timestampedStore != null) { +timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp)); +return; +} +if (versionedStore != null) { +versionedStore.put(key, value, timestamp); +return; +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public StateStore getStore() { Review Comment: Yeah, your point about casting overhead is valid. I had hoped it'd be fairly minimal, but that's moot now since I went ahead and updated the PR to use your proposal (three store pointers) in response to your comment about test failures (see below). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools
showuon merged PR #13172: URL: https://github.com/apache/kafka/pull/13172 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools
showuon commented on PR #13172: URL: https://github.com/apache/kafka/pull/13172#issuecomment-1451363118 Failed tests are unrelated ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator Build / JDK 8 and Scala 2.12 / kafka.server.KafkaServerKRaftRegistrationTest.[1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest.testPrepareJoinAndRejoinAfterFailedRebalance() Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorManyStandbys ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14768) proposal to reduce the first message's send time cost and max block time for safety
[ https://issues.apache.org/jira/browse/KAFKA-14768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695451#comment-17695451 ] Luke Chen commented on KAFKA-14768: --- For (1), could you explain what "warmup" messages mean? How could you send messages before getting broker's metadata (ex: broker address?) For (2) and (3), it makes sense to me. There was a KIP ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update)] which is trying to fix the similar issue. It didn't get accepted. Maybe you can check the discussion thread in that KIP for reference. > proposal to reduce the first message's send time cost and max block time for > safety > > > Key: KAFKA-14768 > URL: https://issues.apache.org/jira/browse/KAFKA-14768 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.3.1, 3.3.2 >Reporter: fujian >Assignee: hzh0425 >Priority: Major > Labels: needs-kip, performance > > Hi, Team: > > Nice to meet you! > > In our business, we found two types of issue which need to improve: > > *(1) Take much time to send the first message* > Sometimes, we found the users' functional interaction take a lot of time. At > last, we figure out the root cause is that after we complete deploy or > restart the servers. The first message's delivery on each application server > by kafka client will take much time. > So, we try to find one solution to improve it. > > After analyzing the source code about the first time's sending logic. The > time cost is caused by the getting metadata before the sending. The latter's > sending won't take the much time due to the cached metadata. The logic is > right and necessary. Thus, we still want to improve the experience for the > first message's send/user first interaction. > > *(2) can't reduce the send message's block time to wanted value.* > Sometimes our application's thread will block for max.block.ms to send > message. When we try to reduce the max.block.ms to reduce the blocking time. > It can't meet the getting metadata's time requirement sometimes. The root > cause is the configured max.block.ms is shared with "get metadata" operation > and "send message" operation. We can refer to follow tables: > |*where to block* > |*when it is blocked* > |*how long it will be blocked?* > | > |org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata|the first > request which need to load the metadata from kafka| |org.apache.kafka.clients.producer.internals.RecordAccumulator#append|at peak > time for business, if the network can’t send message in short > time.| > What's the solution for the above two issues: > I think about current logic and figure out followed possible solution: > (1) send one "warmup" message, thus we can't send any fake message. > (2) provide one extra configure time configure which dedicated for getting > metadata. thus it will break the define for the max.block.ms > (3) add one method to call waitOnMetadata with one timeout setting without > using the max.block.ms (PR: [KAFKA-14768: provide new method to warmup first > record's sending and reduce the max.block.ms safely by jiafu1115 · Pull > Request #13320 · apache/kafka > (github.com)|https://github.com/apache/kafka/pull/13320]) > > _note: org.apache.kafka.clients.producer.KafkaProducer#waitOnMetadata_ > ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long > nowMs, long maxWaitMs) > > __ > after the change, we can call it before the service is marked as ready. After > the ready. it won't block to get metadata due to cache. And then we can be > safe to reduce the max.block.ms to a lower value to reduce thread's blocking > time. > > After adopting the solution 3. we solve the above issues. For example, we > reduce the first message's send about 4s seconds. The log can refer to > followed: > _warmup test_topic at phase phase 2: get metadata from mq start_ > _warmup test_topic at phase phase 2: get metadata from mq end consume > *4669ms*_ > And after the change, we reduce the max.block.ms from 10s to 2s without worry > can't get metadata. > > {*}So what's your thought for these two issues and the solution I > proposed{*}. I hope to get your feedback and thought for the issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
satishd commented on code in PR #13304: URL: https://github.com/apache/kafka/pull/13304#discussion_r1121995162 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { Review Comment: Right. This is the reason I did not add it earlier. But I am not strongly opinionated about that. I am fine adding them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message
[ https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695434#comment-17695434 ] Luke Chen commented on KAFKA-14771: --- [~pierDipi] , the suggestion makes sense to me. Welcome to submit a PR for it. Thanks. > Include current thread ids in ConcurrentModificationException message > - > > Key: KAFKA-14771 > URL: https://issues.apache.org/jira/browse/KAFKA-14771 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.2, 3.2.3 >Reporter: Pierangelo Di Pilato >Priority: Minor > Labels: consumer > > In the KafkaConsumer.acquire method a ConcurrentModificationException > exception is thrown when > > {code:java} > threadId != currentThread.get() && > !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code} > however, the exception message doesn't include: > > * Thread.currentThread().getId() > * currentThread.get() > > I think including the aforementioned variables is very useful for debugging > the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14729. --- Fix Version/s: 3.5.0 Resolution: Fixed > The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to > the abnormal exit of the heartbeat thread > - > > Key: KAFKA-14729 > URL: https://issues.apache.org/jira/browse/KAFKA-14729 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.3.0 >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > Fix For: 3.5.0 > > Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt > > > h2. case situation: > 1. The business program occupies a large amount of memory, causing the > `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally. > {code:java} > 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer > clientId=consumer-5, groupId=*_dev_VA] Heartbeat thread failed due to > unexpected error java.lang.OutOfMemoryError: Java heap space {code} > 2. The finally module of the heartbeat thread ` run()` method only prints the > log, but does not update the value of `AbstractCoordinator.state`. > 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the > `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take > the value `timeToNextHeartbeat(now)`. > 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will > never be updated again. > And the `AbstractCoordinator.state` field value will always be {*}STABLE{*}, > So the `timeToNextHeartbeat(long now)` method will return > {color:#ff}0{color}. > 0 will be passed to the underlying `networkClient#poll` method. > > In the end, the user calls the `poll(duration)` method in an endless loop, > and the `kafkaConsumer#pollForFetches(timer)` method will always return very > quickly, taking up a lot of cpu. > > h2. solution: > 1. Refer to the note of `MemberState.STABLE` : > {code:java} > the client has joined and is sending heartbeats.{code} > When the heartbeat thread exits, in `finally` module, we should add code: > {code:java} > state = MemberState.UNJOINED; > closed = true;{code} > 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new > judgment condition: `heartbeatThread.hasFailed()` > {code:java} > if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed()) > return Long.MAX_VALUE; > return heartbeat.timeToNextHeartbeat(now);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13270: KAFKA-14729: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
showuon merged PR #13270: URL: https://github.com/apache/kafka/pull/13270 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] CalvinConfluent opened a new pull request, #13323: Add ReplicaState to FetchRequest.
CalvinConfluent opened a new pull request, #13323: URL: https://github.com/apache/kafka/pull/13323 As the first part of the [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR), it updates the FetchRequest: - Deprecate the ReplicaId field - Create a new tagged field ReplicaState with ReplicaId and ReplicaEpoch - Bump the FetchRequest version to 15 - Bump metadata version to 3.5-IV1 https://issues.apache.org/jira/browse/KAFKA-14617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
kirktrue commented on code in PR #13301: URL: https://github.com/apache/kafka/pull/13301#discussion_r1122535670 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Value; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics. + * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it + * records matches up with the topic-partitions in use. + */ +class FetchManagerMetrics { Review Comment: These should come from here: https://kafka.apache.org/documentation/#consumer_fetch_monitoring -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
kirktrue commented on code in PR #13301: URL: https://github.com/apache/kafka/pull/13301#discussion_r1122517751 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the + * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}. + * + * @param Record key type + * @param Record value type + */ +class CompletedFetch { + +private final Logger log; +private final SubscriptionState subscriptions; +private final boolean checkCrcs; +private final BufferSupplier decompressionBufferSupplier; +private final Deserializer keyDeserializer; +private final Deserializer valueDeserializer; +private final IsolationLevel isolationLevel; +public final TopicPartition partition; Review Comment: All fields are either package `protected` or `private` and are grouped into `mutable` and `final` sections, for a total of four sections. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java: ## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import
[GitHub] [kafka] mjsax commented on pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on PR #13264: URL: https://github.com/apache/kafka/pull/13264#issuecomment-1451176639 There is test failures. Could it be related to this PR? ``` org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoinTest.shouldLogAndMeterSkippedRecordsDueToNullLeftKey java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "this.store" is null ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525554 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private TimestampedKeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = context.getStateStore(storeName); +return; +} catch (final ClassCastException e) { +// ignore since could be versioned store instead +} + +try { +// next try versioned store +versionedStore = context.getStateStore(storeName); +} catch (final ClassCastException e) { +throw new InvalidStateStoreException("KTable source state store must implement either TimestampedKeyValueStore or VersionedKeyValueStore."); +} +} + +public ValueAndTimestamp get(final K key) { +if (timestampedStore != null) { +return timestampedStore.get(key); +} +if (versionedStore != null) { +final VersionedRecord versionedRecord = versionedStore.get(key); +return versionedRecord == null +? null +: ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()); +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public void put(final K key, final V value, final long timestamp) { +if (timestampedStore != null) { +timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp)); +return; +} +if (versionedStore != null) { +versionedStore.put(key, value, timestamp); +return; +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public StateStore getStore() { Review Comment: My proposal was actually to have three member, the previously existing `KeyValueStore keyValueStore`, `VersionedStore versionStore` and a new `Store store`. So we just simplify the code for the case when we don't care about the store type, but when we care we check if `keyValueStore != null` or `versionStore != null` and do the right thing. Sorry for not explaining it good enough -- using an enum works, too, I guess, but also tend to agree it a little overkill. But you made the change already... So maybe also not worth to revert it again to sue `KeyValueStore keyValueStore` and `VersionedStore versionStore` as you did originally. The only question (by might be pre-mature optimization): we know need to cast on very single get/put call using the enum. Sounds like more overhead that using KeyValueStore keyValueStore` /
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122527645 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private final StateStore store; +private final StoreType storeType; + +private enum StoreType { +TIMESTAMPED, +VERSIONED; +} + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +store = context.getStateStore(storeName); + +if (store instanceof TimestampedKeyValueStore) { Review Comment: I think this is fine. -- Even if we have the cast as in the original code, I don't think it would be safer. The compile check cannot verify the generic types anyway, because `getStateStore` does not contain any information about it -- and at runtime, the generic types are gone and the actually "cast" if necessary from `Object` to `K` (or `V`) would happen somewhere else in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122527645 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private final StateStore store; +private final StoreType storeType; + +private enum StoreType { +TIMESTAMPED, +VERSIONED; +} + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +store = context.getStateStore(storeName); + +if (store instanceof TimestampedKeyValueStore) { Review Comment: I think this is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525901 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java: ## @@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final MaterializedInternal> materialize() { +public StoreBuilder materialize() { Review Comment: I we need it later, it's fine to do it right away. Maybe a little cleaner from a "commit history" POV do only to it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525901 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedKeyValueStoreMaterializer.java: ## @@ -33,7 +32,7 @@ public TimestampedKeyValueStoreMaterializer(final MaterializedInternal> materialize() { +public StoreBuilder materialize() { Review Comment: I we need it later, it's fine to do it right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13264: KAFKA-14491: [12/N] Relax requirement that KTable stores must be TimestampedKVStores
mjsax commented on code in PR #13264: URL: https://github.com/apache/kafka/pull/13264#discussion_r1122525554 ## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are + * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. + * + * @param The key type + * @param The value type + */ +public class KeyValueStoreWrapper implements StateStore { + +private TimestampedKeyValueStore timestampedStore = null; +private VersionedKeyValueStore versionedStore = null; + +public KeyValueStoreWrapper(final ProcessorContext context, final String storeName) { +try { +// first try timestamped store +timestampedStore = context.getStateStore(storeName); +return; +} catch (final ClassCastException e) { +// ignore since could be versioned store instead +} + +try { +// next try versioned store +versionedStore = context.getStateStore(storeName); +} catch (final ClassCastException e) { +throw new InvalidStateStoreException("KTable source state store must implement either TimestampedKeyValueStore or VersionedKeyValueStore."); +} +} + +public ValueAndTimestamp get(final K key) { +if (timestampedStore != null) { +return timestampedStore.get(key); +} +if (versionedStore != null) { +final VersionedRecord versionedRecord = versionedStore.get(key); +return versionedRecord == null +? null +: ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()); +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public void put(final K key, final V value, final long timestamp) { +if (timestampedStore != null) { +timestampedStore.put(key, ValueAndTimestamp.make(value, timestamp)); +return; +} +if (versionedStore != null) { +versionedStore.put(key, value, timestamp); +return; +} +throw new IllegalStateException("KeyValueStoreWrapper must be initialized with either timestamped or versioned store"); +} + +public StateStore getStore() { Review Comment: My proposal was actually to have three member, the previously existing `KeyValueStore keyValueStore`, `VersionedStore versionStore` and a new `Store store`. So we just simplify the code for the case when we don't care about the store type, but when we care we check if `keyValueStore != null` or `versionStore != null` and do the right thing. Sorry for not explaining it good enough -- using an enum works, too, I guess, but also tend to agree it a little overkill. But you made the change already... So maybe also not worth to revert it again to sue `KeyValueStore keyValueStore` and `VersionedStore versionStore` as you did originally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1451145318 Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang merged PR #13319: URL: https://github.com/apache/kafka/pull/13319 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
junrao commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1119167627 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + Review Comment: Could we remove the extra new line? ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +private final File cacheDir; +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); +private final Object lock = new Object(); +private final RemoteStorageManager remoteStorageManager; +private final Map entries; +private final ShutdownableThread cleanerThread; + +private volatile boolean closed = false; + +public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +this(1024, remoteStorageManager, logDir); +} + +/** + * Creates RemoteIndexCache with the given configs. + * + * @param maxSize maximum number of segment index entries to be cached. + * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. + * @param logDir log directory + */ +public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException { +
[GitHub] [kafka] guozhangwang merged pull request #13288: MINOR: fix rerun-tests for unit test
guozhangwang merged PR #13288: URL: https://github.com/apache/kafka/pull/13288 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13288: MINOR: fix rerun-tests for unit test
guozhangwang commented on PR #13288: URL: https://github.com/apache/kafka/pull/13288#issuecomment-1451098190 Ah thanks @chia7712 , my bad missing that change in the PR. As for the solution I do not have a preference in either side and I agree that both should not involve recompilation. Let's just follow-up with `rerun-tests` to avoid another change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1122473180 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: Perhaps we can also consider this a failure of the signature of Herder::requestTaskReconfiguration. The DistributedHerder makes this asynchronous, but provides no future or callback to confirm the progress of the request. Arguably StandaloneHerder is implementing the function signature correctly as a request that either succeeds or fails. It also makes me think that a connector which repeatedly calls requestTaskReconfiguration (and then always fails in generateTaskConfigs) could spam the herder with retried restart requests. This is such a messy situation that the old function signatures hid from us :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects
gharris1727 commented on code in PR #13185: URL: https://github.com/apache/kafka/pull/13185#discussion_r1122467555 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ## @@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String connName) { log.error("Task that requested reconfiguration does not exist: {}", connName); return; } -updateConnectorTasks(connName); +try { +updateConnectorTasks(connName); +} catch (Exception e) { +log.error("Unable to generate task configs for {}", connName, e); +} Review Comment: Yes this is a change in behavior. There is precedent for throwing ConnectException from ConnectorContext::requestTaskReconfiguration, so perhaps wrapping this in a ConnectException and propagating it would be a better behavior. I can move this to HerderConnectorContext, except it would only be effective for the standalone herder. We can also see this as an opportunity to improve the StandaloneHerder by handling reconfigurations asynchronously and retry them in the background, rather than 500'ing the REST API or dropping the failure silently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1122411269 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val addPartitionsToTxnRequest = + if (request.context.apiVersion() < 4) +request.body[AddPartitionsToTxnRequest].normalizeRequest() + else +request.body[AddPartitionsToTxnRequest] +val version = addPartitionsToTxnRequest.version +val responses = new AddPartitionsToTxnResultCollection() +val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + +// Newer versions of the request should only come from other brokers. +if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + +// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the +// response so there are a few differences in handling errors and sending responses. +def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { +// There will only be one response in data. Add it to the response data object. +val data = new AddPartitionsToTxnResponseData() +responses.forEach(result => { + data.setResultsByTopicV3AndBelow(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) +}) +new AddPartitionsToTxnResponse(data) + } else { +new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } +} - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { -// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the -// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded -// the authorization check to indicate that they were not added to the transaction. -val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) +val txns = addPartitionsToTxnRequest.data.transactions +def maybeSendResponse(): Unit = { + var canSend = false + responses.synchronized { +if (responses.size() == txns.size()) { + canSend = true +} + } + if (canSend) { +requestHelper.sendResponseMaybeThrottle(request, createResponse) + } +} + +txns.forEach( transaction => { + val transactionalId = transaction.transactionalId + val partitionsToAdd = partitionsByTransaction.get(transactionalId).asScala + + // Versions < 4 come from clients and must be authorized to write for the given transaction and for the given topics. + if (version < 4 &&
[GitHub] [kafka] OneCricketeer commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
OneCricketeer commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1122385195 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: > a protected REST API does prevent untrusted users Sure, but authorized/trusted does not imply _non-malicious_. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jeffkbkim commented on code in PR #13322: URL: https://github.com/apache/kafka/pull/13322#discussion_r1122374811 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -163,6 +163,20 @@ object Defaults { val GroupInitialRebalanceDelayMs = 3000 val GroupMaxSize: Int = Int.MaxValue + /** New group coordinator configs */ + val NewGroupCoordinatorEnable = false + val GroupCoordinatorNumThreads = 1 + + /** Consumer group configs */ + val ConsumerGroupSessionTimeoutMs = 45000 + val ConsumerGroupMinSessionTimeoutMs = 45000 + val ConsumerGroupMaxSessionTimeoutMs = 6 + val ConsumerGroupHeartbeatIntervalMs = 5000 + val ConsumerGroupMinHeartbeatInternalMs = 5000 + val ConsumerGroupMaxHeartbeatInternalMs = 15000 + val ConsumerGroupMaxSize = Int.MaxValue + val ConsumerGroupAssignors = "" Review Comment: KIP 848 lists ``` org.apache.kafka.server.group.consumer.UniformAssignor, org.apache.kafka.server.group.consumer.RangeAssignor ``` as the default. are we waiting until these two are implemented? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -163,6 +163,20 @@ object Defaults { val GroupInitialRebalanceDelayMs = 3000 val GroupMaxSize: Int = Int.MaxValue + /** New group coordinator configs */ + val NewGroupCoordinatorEnable = false + val GroupCoordinatorNumThreads = 1 + + /** Consumer group configs */ + val ConsumerGroupSessionTimeoutMs = 45000 + val ConsumerGroupMinSessionTimeoutMs = 45000 + val ConsumerGroupMaxSessionTimeoutMs = 6 + val ConsumerGroupHeartbeatIntervalMs = 5000 + val ConsumerGroupMinHeartbeatInternalMs = 5000 + val ConsumerGroupMaxHeartbeatInternalMs = 15000 Review Comment: should these be "Interval"? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -483,11 +497,27 @@ object KafkaConfig { val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" val ControlledShutdownEnableProp = "controlled.shutdown.enable" + /** * Group coordinator configuration ***/ val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms" val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms" val GroupInitialRebalanceDelayMsProp = "group.initial.rebalance.delay.ms" val GroupMaxSizeProp = "group.max.size" + + /** New group coordinator configs */ + val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable" + val GroupCoordinatorNumThreadsProp = "group.coordinator.threads" + + /** Consumer group configs */ + val ConsumerGroupSessionTimeoutMsProp = "group.consumer.session.timeout.ms" + val ConsumerGroupMinSessionTimeoutMsProp = "group.consumer.min.session.timeout.ms" + val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms" + val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms" + val ConsumerGroupMinHeartbeatInternalMsProp = "group.consumer.min.heartbeat.interval.ms" + val ConsumerGroupMaxHeartbeatInternalMsProp ="group.consumer.max.heartbeat.interval.ms" Review Comment: should these also be "Interval"? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1267,6 +1314,24 @@ object KafkaConfig { .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc) .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), MEDIUM, GroupMaxSizeDoc) + /** New group coordinator configs */ + // All properties are kept internal until KIP-848 is released. + // This property meant to be here only during the development of KIP-848. It will + // be replaced by a metadata version before releasing it. + .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc) + .defineInternal(GroupCoordinatorNumThreadsProp, INT, Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, GroupCoordinatorNumThreadsDoc) + + /** Consumer groups configs */ + // All properties are kept internal until KIP-848 is released. + .defineInternal(ConsumerGroupSessionTimeoutMsProp, INT, Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupSessionTimeoutMsDoc) Review Comment: how does the operator use ConsumerGroupSessionTimeoutMs and ConsumerGroupSessionMin/MaxTimeoutMs configs? is the idea to set min/max once, then configure the base timeout which the min/max would enforce? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1267,6 +1314,24 @@ object KafkaConfig { .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc) .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), MEDIUM, GroupMaxSizeDoc)
[GitHub] [kafka] jolshan merged pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan merged PR #13078: URL: https://github.com/apache/kafka/pull/13078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1450918075 Tests look unrelated. Going to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14451) Make range assignor rack-aware if consumer racks are configured
[ https://issues.apache.org/jira/browse/KAFKA-14451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-14451. Fix Version/s: 3.5.0 Reviewer: David Jacot Resolution: Fixed > Make range assignor rack-aware if consumer racks are configured > --- > > Key: KAFKA-14451 > URL: https://issues.apache.org/jira/browse/KAFKA-14451 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.5.0 > > > See KIP-881 for details -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
cmccabe commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1450843896 > I'm confused by this comment. There is a ./metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java in the review. It is the last file in the review. Sorry, my mistake. I see it now. I don't know how I missed it. We still need the MetadataVersion fixes and a test for that in the image test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram merged PR #12990: URL: https://github.com/apache/kafka/pull/12990 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on PR #13114: URL: https://github.com/apache/kafka/pull/13114#issuecomment-1450839352 > We need a test like `./metadata/src/test/java/org/apache/kafka/image/ScramImageTesst.java` , similar to the tests for the other Image classes. I'm confused by this comment. There is a `./metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java` in the review. It is the last file in the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on PR #12990: URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450838774 @dajac Thanks for the reviews. Test failures not related (have verified them locally), merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
Hangleton commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1122285463 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: Apologies for the delay - would you have any JMH benchmark for this change? E.g. something like in #13312. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122271424 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,43 +128,53 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } -public static class Builder { -OffsetCommitResponseData data = new OffsetCommitResponseData(); -HashMap byTopicName = new HashMap<>(); +public short version() { +return version; +} -private OffsetCommitResponseTopic getOrCreateTopic( -String topicName -) { -OffsetCommitResponseTopic topic = byTopicName.get(topicName); -if (topic == null) { -topic = new OffsetCommitResponseTopic().setName(topicName); -data.topics().add(topic); -byTopicName.put(topicName, topic); -} -return topic; +public static Builder newBuilder(TopicResolver topicResolver, short version) { +if (version >= 9) { +return new Builder<>(topicResolver, new ByTopicId(), version); Review Comment: Note - this duplicated invocation of the `Builder` constructor is to allow the resolution of the parameter type as either `Uuid` or `String`. Not graceful but... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122265994 ## core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala: ## @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.server.{BaseRequestTest, KafkaConfig} +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.OffsetCommitRequestData +import org.apache.kafka.common.message.OffsetCommitRequestData.{OffsetCommitRequestPartition, OffsetCommitRequestTopic} +import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetCommitResponse} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +import java.util.Collections.singletonList +import java.util.Properties + +class OffsetCommitRequestTest extends BaseRequestTest { Review Comment: Adding more tests to this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
jsancio commented on PR #13102: URL: https://github.com/apache/kafka/pull/13102#issuecomment-1450774546 > @jsancio @hachikuji Was there a reason to have this field or was it added accidentally? @ijuma I think this is just an artifact that `quorum-state` was implemented before `KIP-631` added cluster id to `meta.properites`. In that change we missed removing cluster id from `quorum-state`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13102: KAFKA-14371: Remove unused clusterId field from quorum-state file
jsancio commented on PR #13102: URL: https://github.com/apache/kafka/pull/13102#issuecomment-1450767005 Thanks for the change and review. I think it is safe to remove the cluster id from `quorum-state`. The cluster id is stored and read from the file `meta.properties` in `KafkaConfig.metadataLogDir`. See https://github.com/apache/kafka/blob/510e99e1a2636f9a7035020f682ab7df8530986b/core/src/main/scala/kafka/raft/RaftManager.scala#L244 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13318: [DO NOT MERGE] KAFKA-14533: Re-enable state-updater in SmokeTestDriverIntegrationTest
guozhangwang commented on PR #13318: URL: https://github.com/apache/kafka/pull/13318#issuecomment-1450748832 Re-starting new checks as `SmokeTestDriverIntegrationTest` did not fail again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122188628 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -166,20 +187,27 @@ public Builder addPartitions( return this; } -public Builder merge( -OffsetCommitResponseData newData -) { +public Builder merge(OffsetCommitResponseData newData) { +if (version >= 9) { +// This method is called after the group coordinator committed the offsets. The group coordinator +// provides the OffsetCommitResponseData it built in the process. As of now, this data does +// not contain topic ids, so we resolve them here. +newData.topics().forEach( +topic -> topic.setTopicId(topicResolver.getTopicId(topic.name()).orElse(Uuid.ZERO_UUID))); Review Comment: At this point, topic ids should be always resolvable. However if some aren't, we should fallback to adding the topic "as is" to the response to avoid caching `ZERO_UUID` with risk of overwrites. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122175488 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -119,43 +130,62 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } -public static class Builder { -OffsetCommitResponseData data = new OffsetCommitResponseData(); -HashMap byTopicName = new HashMap<>(); +public short version() { +return version; +} -private OffsetCommitResponseTopic getOrCreateTopic( -String topicName -) { -OffsetCommitResponseTopic topic = byTopicName.get(topicName); -if (topic == null) { -topic = new OffsetCommitResponseTopic().setName(topicName); -data.topics().add(topic); -byTopicName.put(topicName, topic); -} -return topic; +public static Builder newBuilder(TopicResolver topicResolver, short version) { +if (version >= 9) { +return new Builder<>(topicResolver, new ByTopicId(), version); +} else { +return new Builder<>(topicResolver, new ByTopicName(), version); } +} -public Builder addPartition( -String topicName, -int partitionIndex, -Errors error -) { -final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); +public static final class Builder { +private final TopicResolver topicResolver; +private final TopicClassifier topicClassifier; +private final short version; -topicResponse.partitions().add(new OffsetCommitResponsePartition() -.setPartitionIndex(partitionIndex) -.setErrorCode(error.code())); +private OffsetCommitResponseData data = new OffsetCommitResponseData(); +private final Map topics = new HashMap<>(); + +protected Builder(TopicResolver topicResolver, TopicClassifier topicClassifier, short version) { +this.topicResolver = topicResolver; +this.topicClassifier = topicClassifier; +this.version = version; +} + +public Builder addPartition(String topicName, Uuid topicId, int partitionIndex, Errors error) { +Uuid resolvedTopicId = maybeResolveTopicId(topicName, topicId); + +if (version >= 9 && Uuid.ZERO_UUID.equals(resolvedTopicId)) { +Errors reported = error != Errors.NONE ? error : Errors.UNKNOWN_TOPIC_ID; Review Comment: This case shouldn't be reachable because once we have proceeded with constructing the response via `addPartition` all topic ids are supposed to have been resolved successfully. Here, we choose to add the topic to the response with the error code `UNKNOWN_TOPIC_ID` if no error is already set. Any existing error is not overwritten. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
C0urante commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1122150546 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: > This isn't stopped by a protected REST API. I don't really agree with this assessment, since a protected REST API does prevent untrusted users from submitting connector configurations. Generally speaking we've only tried to take unfettered access to Connect clusters into account for features that are enabled by default. Given that this feature is disabled by default, it's not necessary to take steps to guard it against malicious connector configurations. It's also worth noting that, if you're running a Connect cluster where non-trusted users are able to submit connector configurations, a REST extension or a fork of this config provider are both reasonable alternatives. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on PR #13322: URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450638812 Got it. So the distinction is "publishing" and that is done when the config is not internal. Thanks for clarifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
dajac commented on PR #13322: URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450613278 > Just a general question about configs -- do we have any guidelines for adding and removing configs between releases? Just curious about compatibility and typical guidelines around that. Configs are treated as public APIs so we treat them as such. This is why I keep all of them internal for now. They won't be published until we remove this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on PR #13322: URL: https://github.com/apache/kafka/pull/13322#issuecomment-1450610551 Just a general question about configs -- do we have any guidelines for adding and removing configs between releases? Just curious about compatibility and typical guidelines around that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on code in PR #13322: URL: https://github.com/apache/kafka/pull/13322#discussion_r1122123411 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1267,6 +1314,24 @@ object KafkaConfig { .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc) .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), MEDIUM, GroupMaxSizeDoc) + /** New group coordinator configs */ + // All properties are kept internal until KIP-848 is released. + // This property meant to be here only during the development of KIP-848. It will + // be replaced by a metadata version before releasing it. + .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc) Review Comment: nit: you can leave "null" out (same with the other prop below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on code in PR #13322: URL: https://github.com/apache/kafka/pull/13322#discussion_r1122121405 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -976,6 +976,20 @@ class KafkaConfigTest { case RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) +/** New group coordinator configs */ +case KafkaConfig.NewGroupCoordinatorEnableProp => // ignore string Review Comment: nit: this one isn't a string -- some of the other configs just said "ignore" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on code in PR #13322: URL: https://github.com/apache/kafka/pull/13322#discussion_r1122120725 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -976,6 +976,20 @@ class KafkaConfigTest { case RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) +/** New group coordinator configs */ +case KafkaConfig.NewGroupCoordinatorEnableProp => // ignore string +case KafkaConfig.GroupCoordinatorNumThreadsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + +/** Consumer groups configs */ +case KafkaConfig.ConsumerGroupSessionTimeoutMsProp => // ignore string Review Comment: Should this one assert the same as the ones below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
jolshan commented on code in PR #13322: URL: https://github.com/apache/kafka/pull/13322#discussion_r1122119145 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1267,6 +1314,24 @@ object KafkaConfig { .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc) .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), MEDIUM, GroupMaxSizeDoc) + /** New group coordinator configs */ + // All properties are kept internal until KIP-848 is released. + // This property meant to be here only during the development of KIP-848. It will + // be replaced by a metadata version before releasing it. + .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc) + .defineInternal(GroupCoordinatorNumThreadsProp, INT, Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, GroupCoordinatorNumThreadsDoc) + + /** Consumer groups configs */ + // All properties are kept internal until KIP-848 is released. + .defineInternal(ConsumerGroupSessionTimeoutMsProp, INT, Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupSessionTimeoutMsDoc) + .defineInternal(ConsumerGroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerGroupMinSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupMinSessionTimeoutMsDoc) + .defineInternal(ConsumerGroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerGroupMaxSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupMaxSessionTimeoutMsDoc) Review Comment: Do we want to enforce that the min value is less than or equal to the max value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1450588812 Seems like there was a build falilure related to `rat` I restarted the build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes
gharris1727 commented on code in PR #13182: URL: https://github.com/apache/kafka/pull/13182#discussion_r1122112042 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -419,7 +423,14 @@ private Collection> getServiceLoaderPluginDesc(Class klass, Collection> result = new ArrayList<>(); try { ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (T pluginImpl : serviceLoader) { +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Unable to instantiate plugin{}", reflectiveErrorDescription(t.getCause()), t); Review Comment: I changed this and the previous log message so that it wasn't confusing that we were instantiating a SinkConnector. It is unfortunate that on this branch we don't get to see the `Class` after a failure occurs, and have to rely on the ServiceConfigurationError message to report what specific plugin had an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes
gharris1727 commented on code in PR #13182: URL: https://github.com/apache/kafka/pull/13182#discussion_r1122110705 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -439,6 +457,22 @@ public static String versionFor(Class pluginKlass) throws Refle versionFor(pluginKlass.getDeclaredConstructor().newInstance()) : UNDEFINED_VERSION; } +private static String reflectiveErrorDescription(Throwable t) { +if (t instanceof NoSuchMethodException) { +return ": Plugin class must have a default constructor, and cannot be a non-static inner class"; +} else if (t instanceof SecurityException) { +return ": Security settings must allow reflective instantiation of plugin classes"; +} else if (t instanceof IllegalAccessException) { +return ": Plugin class default constructor must be public"; +} else if (t instanceof ExceptionInInitializerError) { +return ": Plugin class should not throw exception during static initialization"; +} else if (t instanceof InvocationTargetException) { +return ": Constructor must complete without throwing an exception"; +} else { +return ""; Review Comment: I think that actionable error messages are better than non-actionable ones, especially when they can save the novice user a trip to the internet or risk chasing the wrong problem. They also shouldn't be misleading enough to get in the way of an advanced user. I do agree that those particular messages were a bit weak though, and those inner messages did not really do a good job of explaining what went wrong. I now know what an ExceptionInInitializerError means, but I do remember being quite confused by it 2-3y ago when I first encountered it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes
gharris1727 commented on code in PR #13182: URL: https://github.com/apache/kafka/pull/13182#discussion_r1122107145 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java: ## @@ -111,20 +115,62 @@ public enum TestPlugin { /** * A plugin which shares a jar file with {@link TestPlugin#MULTIPLE_PLUGINS_IN_JAR_THING_ONE} */ -MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", "test.plugins.ThingTwo"); +MULTIPLE_PLUGINS_IN_JAR_THING_TWO("multiple-plugins-in-jar", "test.plugins.ThingTwo"), +/** + * A plugin which is incorrectly packaged, and is missing a superclass definition. + */ +FAIL_TO_INITIALIZE_MISSING_SUPERCLASS("fail-to-initialize", "test.plugins.MissingSuperclass", false, REMOVE_CLASS_FILTER), +/** + * A plugin which is packaged with other incorrectly packaged plugins, but itself has no issues loading. + */ +FAIL_TO_INITIALIZE_CO_LOCATED("fail-to-initialize", "test.plugins.CoLocatedPlugin", true, REMOVE_CLASS_FILTER), +/** + * A connector which is incorrectly packaged, and throws during static initialization. + */ + FAIL_TO_INITIALIZE_STATIC_INITIALIZER_THROWS_CONNECTOR("fail-to-initialize", "test.plugins.StaticInitializerThrowsConnector", false, REMOVE_CLASS_FILTER), +/** + * A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method. + */ + FAIL_TO_INITIALIZE_VERSION_METHOD_THROWS_CONNECTOR("fail-to-initialize", "test.plugins.VersionMethodThrowsConnector", false, REMOVE_CLASS_FILTER), Review Comment: This is not a description of the behavior of the plugin, but how it is packaged. It is packaged with the other connectors which fail during plugin discovery. And it is packaged this way because before this patch, throwing from the version method caused other plugins to be shadowed (such as CoLocatedPlugin). I think that the name of this group of plugins could change though. Are any of `FAIL_DURING_DISCOVERY`, `PACKAGED_WITH_FAILING_PLUGINS`, or `BAD_PACKAGING` better than `FAIL_TO_INITIALIZE`? Alternatively I could remove the prefix from these constants and keep or change the `fail-to-initialize` directory name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450565316 I checked the latest failure on `shouldPauseStandbyTaskAndNotTransit..` and found out that it's because of the test condition itself is timing dependent. What it wants to verify is the following sequence: `task1 added, task2 added, and then task1 paused`. In this case the `transitToUpdateStandby` should be called once. But it is possible that the following sequence can also happen: `task1 added, task1 paused, task2 added` (because the updater thread can interleave its runOnce with the `addTask` call), in which case the `transitToUpdateStandby` could be called twice. I've relaxed that condition and I think this test should not fail anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1122096890 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -149,13 +161,15 @@ public Builder addPartition( return this; } -public Builder addPartitions( -String topicName, -List partitions, -Function partitionIndex, -Errors error +public final Builder addPartitions( Review Comment: There is still a problem here if `topicName` and `topicId` are both undefined in which case we should do what was done before and add to the response without caching. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -425,35 +426,59 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicResolver = TopicResolver.fromTopicIds(metadataCache.topicNamesToIds()) Review Comment: Move the `TopicResolver` in the `MetadataCache` or create it without copying the map of topic ids as this is costly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
guozhangwang commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450512377 Honestly I do not quite understanding how mockito is messing here.. take `shouldPauseActiveTaskAndTransitToUpdateStandby` for example, after task1 (named topologyA) and task2 (named topologyB) are updating, then there's the line `when(topologyMetadata.isPaused("A")).thenReturn(true);`. In succeeding cases, task1 is indeed paused, and everything works just fine; however in those flaky failing cases, task2 is paused instead, indicating `topologyMetadata.isPaused("B")` returns true but `topologyMetadata.isPaused("A")` returns false... I honestly have no idea why that could happen, but after changing that to `when(topologyMetadata.isPaused(task1.id().topologyName())).thenReturn(false).thenReturn(true);` (i.e. let it to return false on the first call when adding task, and then return true on the second call to pause) I did not see that test failing any more. I checked mockito's docs but still do not see how `when` could mess up on the return values, so I kinda fixed it but do not know exactly why I fixed it.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13315: KAFKA-14767: Fix missing commitId build error after git gc
gharris1727 commented on PR #13315: URL: https://github.com/apache/kafka/pull/13315#issuecomment-1450445379 It appears that the mustRunAfter directive has fixed the build, and now it's just normal flakey tests. I won't be reverting the fixups and should be ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
satishd commented on code in PR #13304: URL: https://github.com/apache/kafka/pull/13304#discussion_r1121995162 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.message.FetchResponseData; + +import java.util.Optional; + +/** + * Structure used for lower level reads using {@link kafka.cluster.Partition#fetchRecords()}. + */ +public class LogReadInfo { Review Comment: This is the reason I did not add it earlier. But I am not strongly opinionated about that. I added them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module
satishd commented on PR #13304: URL: https://github.com/apache/kafka/pull/13304#issuecomment-1450425363 Thanks @junrao @ijuma for your review. Addressed the review comments with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1450362232 Please note that as mentioned by David above, in the current state, topic ids are not provided by the group coordinator when constructing the response, hence are not returned with the `OffsetCommitResponse`. A change to the PR will be made to address this shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450247692 @C0urante ready for your review. I have increased the timeout to 2 min. and also added the wait for mirror maker to get ready. In a separate JIRA we should talk about how we can make the startup synchronous. I have created one such JIRA at https://issues.apache.org/jira/browse/KAFKA-14773 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14773) Make MirrorMaker startup synchronous
Divij Vaidya created KAFKA-14773: Summary: Make MirrorMaker startup synchronous Key: KAFKA-14773 URL: https://issues.apache.org/jira/browse/KAFKA-14773 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya Currently, MirrorMaker is started using ` ./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if the shell command has exited and a log with `Kafka MirrorMaker started` has been printed, it is likely that the underlying connectors and tasks have not been configured. This tasks aims to make the MirrorMaker startup synchronous by either waiting for connections & tasks to move to running state before exiting the `MirrorMaker#start()` function or by blocking completion of `main()`. A conversation about this was done in [https://github.com/apache/kafka/pull/13284] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1121820892 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java: ## @@ -14,14 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.connect.mirror.integration; Review Comment: Note for reviewers: this was required so that we can access isConfiguredAndRunning from the test without having to make the method public. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals
dpcollins-google commented on code in PR #13162: URL: https://github.com/apache/kafka/pull/13162#discussion_r1121810375 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel destChannel, * @param length The number of bytes to write * @throws IOException For any errors writing to the output */ -public static void writeTo(DataOutput out, ByteBuffer buffer, int length) throws IOException { +public static void writeTo(DataOutputStream out, ByteBuffer buffer, int length) throws IOException { if (buffer.hasArray()) { out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length); } else { -int pos = buffer.position(); -for (int i = pos; i < length + pos; i++) -out.writeByte(buffer.get(i)); +Channels.newChannel(out).write(buffer); Review Comment: ping @ijuma / @Hangleton , are there any blockers to getting this merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14772) Add ConsumerGroupHeartbeat API to AuthorizerIntegrationTest
David Jacot created KAFKA-14772: --- Summary: Add ConsumerGroupHeartbeat API to AuthorizerIntegrationTest Key: KAFKA-14772 URL: https://issues.apache.org/jira/browse/KAFKA-14772 Project: Kafka Issue Type: Sub-task Reporter: David Jacot When the new group coordinator can be enabled, we need to update AuthorizerIntegrationTest to include new APIs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java
satishd commented on PR #13067: URL: https://github.com/apache/kafka/pull/13067#issuecomment-1450196726 > @satishd can you help review this? @ijuma Sure, I will have cycles after this week. I will review it by next Monday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #13322: KAFKA-14462; [1/N] Add new server configurations (KIP-848)
dajac opened a new pull request, #13322: URL: https://github.com/apache/kafka/pull/13322 This patch adds the new server configurations introduced in KIP-848. All of them are kept as internal configurations for now. We will make them public when the KIP is ready. It also adds an internal configuration named `group.coordinator.new.enable` that we will used during the development of the KIP. This one will be remove later on and replaced by IBP/feature flag. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on PR #12990: URL: https://github.com/apache/kafka/pull/12990#issuecomment-1450159413 @dajac Thanks for the reviews, I have addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121733442 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { Review Comment: Ah, I missed that one, I think it is one of the new tests that was added when fixing FFF issues. I moved this test to that class. It uses 2 brokers instead of 3, but that seems ok for this test. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121729089 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); Review Comment: Good idea, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121728688 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +private void assignWithRackMatching(Collection assignmentStates, +
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1450155072 Thanks David, included all your comments in the PR. I now work on: - How to limit the highest version of the OffsetCommit API to 8 in the admin client; - Integration test mentioned in a prior review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121724366 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -50,14 +52,21 @@ public class OffsetCommitResponse extends AbstractResponse { private final OffsetCommitResponseData data; +private final short version; Review Comment: Adding the version to the response seems to be an anti-pattern as I haven't seen any other similar use in other responses. Semantically it should be OK because the response instance is supposed to be built against a given version. If another approach is advisable, I will remove it. ## clients/src/test/java/org/apache/kafka/common/message/MessageTest.java: ## @@ -435,17 +435,32 @@ public void testOffsetCommitRequestVersions() throws Exception { int partition = 2; int offset = 100; -testAllMessageRoundTrips(new OffsetCommitRequestData() - .setGroupId(groupId) - .setTopics(Collections.singletonList( - new OffsetCommitRequestTopic() - .setName(topicName) - .setPartitions(Collections.singletonList( - new OffsetCommitRequestPartition() - .setPartitionIndex(partition) - .setCommittedMetadata(metadata) - .setCommittedOffset(offset) - ); +OffsetCommitRequestData byTopicName = new OffsetCommitRequestData() +.setGroupId(groupId) +.setTopics(Collections.singletonList( +new OffsetCommitRequestTopic() +.setName(topicName) +.setPartitions(Collections.singletonList( +new OffsetCommitRequestPartition() +.setPartitionIndex(partition) +.setCommittedMetadata(metadata) +.setCommittedOffset(offset) +; + +OffsetCommitRequestData byTopicId = new OffsetCommitRequestData() +.setGroupId(groupId) +.setTopics(Collections.singletonList( +new OffsetCommitRequestTopic() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Collections.singletonList( +new OffsetCommitRequestPartition() +.setPartitionIndex(partition) +.setCommittedMetadata(metadata) +.setCommittedOffset(offset) +; + +testAllMessageRoundTripsBeforeVersion((short) 9, byTopicName, byTopicName); Review Comment: Note: is this OK to break message round trip between < 9 and >= 9? ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -188,12 +199,62 @@ public Builder merge( } }); } - return this; } -public OffsetCommitResponse build() { -return new OffsetCommitResponse(data); +public final OffsetCommitResponse build() { +return new OffsetCommitResponse(data, version); +} + +protected abstract void validate(String topicName, Uuid topicId); + +protected abstract T classifer(String topicName, Uuid topicId); + +private OffsetCommitResponseTopic getOrCreateTopic(String topicName, Uuid topicId) { +T topicKey = classifer(topicName, topicId); +OffsetCommitResponseTopic topic = topics.get(topicKey); +if (topic == null) { +topic = new OffsetCommitResponseTopic().setName(topicName).setTopicId(topicId); +data.topics().add(topic); +topics.put(topicKey, topic); +} +return topic; +} +} + +public static final class BuilderByTopicId extends Builder { Review Comment: Will add Javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics
chia7712 commented on PR #13285: URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450145297 @divijvaidya thanks for great feedback! > we have also have to ensure that deadlock doesn't occur when trying to acquire SocketServer lock and Processors lock. that is interesting. the mutation of processors is locked by `Acceptor` object. It means getting only lock of SocketServer is not safe when we plan to access the `processors`. We should require `Acceptor` lock also. In other words, this issue is related not only performance but also potential bug (concurrent issue). > we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for storing processors. I will use `CopyOnWriteArrayList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start
divijvaidya commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450105039 > This is less concerning because the MirrorMaker class isn't part of public API; It's not technically part of public API but it has a `main()` method which is executed by the customers. As you pointed out, today, they have to rely on logs/metrics to figure out whether MM started correctly or not. I agree to the creation of `connectorStatus()` and usage in tests as you suggested, but additionally we should block the completion of `main()` until all connectors are online. The part about blocking could be taken up in a different PR and we can discuss the merits/demerits of doing that over there. Meanwhile, I will add `connectorStatus()` and relevant changes to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics
divijvaidya commented on PR #13285: URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450075461 I am afraid the latest approach will also not work. This is because there is a possibility that `ArrayBuffer` is internally performing size expansion while we read at `dataPlaneProcessors.size` or when iterating on it using the `map`. This concurrent access could lead to undefined results (notably, this would not have been a problem if we were using a fixed size array). Also, note that the current implementation works for `controlPlaneAcceptorOpt` since they do not access the `processors` arrayBuffer outside the lock. When I mentioned, option 1 of using fine grained locking, I actually implied locking on processors object instead of locking on entire SocketServer object. If we go down this path, we will have to change other places in the file to acquire this processor lock when mutation and we have also have to ensure that deadlock doesn't occur when trying to acquire SocketServer lock and Processors lock. Hence, my suggestion would be to opt for a lock-free concurrent access data structure for storing processors. Here's our requirement for such a data structure: - we don't mutate the data structure frequently, so even if writes are slow, we are ok with that. - we require lock-free concurrent reads since we perform a read with every connection setup and every time we emit a metric - the size of the data structure is going to be small, in tens to low hundreds entries - the data structure should be able to expand it's size since we allow dynamic shrinking and expanding Based on the above, we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for storing processors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message
[ https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695040#comment-17695040 ] Pierangelo Di Pilato edited comment on KAFKA-14771 at 3/1/23 12:22 PM: --- [~showuon] what do you think about this improvement? was (Author: pierdipi): [~lukech] what do you think about this improvement? > Include current thread ids in ConcurrentModificationException message > - > > Key: KAFKA-14771 > URL: https://issues.apache.org/jira/browse/KAFKA-14771 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.2, 3.2.3 >Reporter: Pierangelo Di Pilato >Priority: Minor > Labels: consumer > > In the KafkaConsumer.acquire method a ConcurrentModificationException > exception is thrown when > > {code:java} > threadId != currentThread.get() && > !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code} > however, the exception message doesn't include: > > * Thread.currentThread().getId() > * currentThread.get() > > I think including the aforementioned variables is very useful for debugging > the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message
[ https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695040#comment-17695040 ] Pierangelo Di Pilato commented on KAFKA-14771: -- [~lukech] what do you think about this improvement? > Include current thread ids in ConcurrentModificationException message > - > > Key: KAFKA-14771 > URL: https://issues.apache.org/jira/browse/KAFKA-14771 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.2, 3.2.3 >Reporter: Pierangelo Di Pilato >Priority: Minor > Labels: consumer > > In the KafkaConsumer.acquire method a ConcurrentModificationException > exception is thrown when > > {code:java} > threadId != currentThread.get() && > !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code} > however, the exception message doesn't include: > > * Thread.currentThread().getId() > * currentThread.get() > > I think including the aforementioned variables is very useful for debugging > the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13319: MINOR: Fix flaky tests in DefaultStateUpdaterTest
showuon commented on PR #13319: URL: https://github.com/apache/kafka/pull/13319#issuecomment-1450060935 I can see the test result is better, but there are still failed tests in `DefaultStateUpdaterTest` ``` Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest.shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message
[ https://issues.apache.org/jira/browse/KAFKA-14771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695039#comment-17695039 ] Pierangelo Di Pilato commented on KAFKA-14771: -- I can provide a patch if this improvement is desired by maintainers > Include current thread ids in ConcurrentModificationException message > - > > Key: KAFKA-14771 > URL: https://issues.apache.org/jira/browse/KAFKA-14771 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.2, 3.2.3 >Reporter: Pierangelo Di Pilato >Priority: Minor > Labels: consumer > > In the KafkaConsumer.acquire method a ConcurrentModificationException > exception is thrown when > > {code:java} > threadId != currentThread.get() && > !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code} > however, the exception message doesn't include: > > * Thread.currentThread().getId() > * currentThread.get() > > I think including the aforementioned variables is very useful for debugging > the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14771) Include current thread ids in ConcurrentModificationException message
Pierangelo Di Pilato created KAFKA-14771: Summary: Include current thread ids in ConcurrentModificationException message Key: KAFKA-14771 URL: https://issues.apache.org/jira/browse/KAFKA-14771 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 3.2.3, 2.8.2 Reporter: Pierangelo Di Pilato In the KafkaConsumer.acquire method a ConcurrentModificationException exception is thrown when {code:java} threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId){code} however, the exception message doesn't include: * Thread.currentThread().getId() * currentThread.get() I think including the aforementioned variables is very useful for debugging the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449936916 Sorry, used the wrong button... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac closed pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9 URL: https://github.com/apache/kafka/pull/13240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449935049 > Hello David, thanks for the fast review. Apologies for being slow, I hadn't finished the previous revision. Will include your comments. Working on it right now. Thanks! No worries. You are not slow. I just -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1449920535 Hello David, thanks for the fast review. Apologies for being slow, I hadn't finished the previous revision. Will include your comments. Working on it right now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14770) Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match
Rajini Sivaram created KAFKA-14770: -- Summary: Allow dynamic keystore update for brokers if string representation of DN matches even if canonical DNs don't match Key: KAFKA-14770 URL: https://issues.apache.org/jira/browse/KAFKA-14770 Project: Kafka Issue Type: Improvement Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 3.5.0 To avoid mistakes during dynamic broker config updates that could potentially affect clients, we restrict changes that can be performed dynamically without broker restart. For broker keystore updates, we require the DN to be the same for the old and new certificates since this could potentially contain host names used for host name verification by clients. DNs are compared using standard Java implementation of X500Principal.equals() which compares canonical names. If tags of fields change from one with a printable string representation and one without or vice-versa, canonical name check fails even if the actual name is the same since canonical representation converts to hex for some tags only. We can relax the verification to allow dynamic updates in this case by enabling dynamic update if either the canonical name or the RFC2253 string representation of the DN matches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121510354 ## checkstyle/suppressions.xml: ## @@ -93,7 +93,7 @@ files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/> + files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|OffsetCommitResponseTest).java"/> Review Comment: Is this really needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121506569 ## clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitResponseTest.java: ## @@ -17,37 +17,64 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.function.Function.identity; import static org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class OffsetCommitResponseTest { protected final int throttleTimeMs = 10; -protected final String topicOne = "topic1"; +protected final String topic1 = "topic1"; +protected final Uuid topic1Id = Uuid.randomUuid(); protected final int partitionOne = 1; +protected final int partitionTwo = 2; protected final Errors errorOne = Errors.COORDINATOR_NOT_AVAILABLE; protected final Errors errorTwo = Errors.NOT_COORDINATOR; -protected final String topicTwo = "topic2"; -protected final int partitionTwo = 2; - -protected TopicPartition tp1 = new TopicPartition(topicOne, partitionOne); -protected TopicPartition tp2 = new TopicPartition(topicTwo, partitionTwo); +protected final String topic2 = "topic2"; +protected final int p3 = 3; +protected final int p4 = 4; +protected final String topic3 = "topic3"; +protected final int p5 = 5; +protected final int p6 = 6; +protected final String topic4 = "topic4"; +protected final Uuid topic4Id = Uuid.randomUuid(); +protected final int p7 = 7; +protected final int p8 = 8; +protected final Uuid topic5Id = Uuid.randomUuid(); +protected final int p9 = 9; +protected final int p10 = 10; +protected final String topic6 = "topic6"; +protected final Uuid topic6Id = Uuid.randomUuid(); +protected final int p11 = 11; +protected final int p12 = 12; Review Comment: I am not a fan of all those attributes in test. One or two are fine if they are really re-used on all the tests. Otherwise, it may be better to check define what you need in tests. I would also use `TopicIdPartition` when relevant so you can basically group the name, id, and partition together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121501980 ## clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitRequestTest.java: ## @@ -80,10 +81,10 @@ public void setUp() { ); } -@Test @Override -public void testConstructor() { - +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) +public void testConstructor(short version) { Review Comment: Is this change related to the PR? If not, I would rather do it in a separate PR. ## clients/src/test/java/org/apache/kafka/common/requests/TxnOffsetCommitResponseTest.java: ## @@ -38,9 +40,9 @@ public void testConstructorWithErrorResponse() { assertEquals(throttleTimeMs, response.throttleTimeMs()); } -@Test -@Override -public void testParse() { +@ParameterizedTest +@ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) Review Comment: Same question here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121499577 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ## @@ -188,12 +188,73 @@ public Builder merge( } }); } - return this; } -public OffsetCommitResponse build() { +public final OffsetCommitResponse build() { return new OffsetCommitResponse(data); } + +protected abstract void validate(String topicName, Uuid topicId); + +protected abstract T classifer(String topicName, Uuid topicId); + +protected abstract OffsetCommitResponseTopic newTopic(String topicName, Uuid topicId); +private OffsetCommitResponseTopic getOrCreateTopic(String topicName, Uuid topicId) { +T topicKey = classifer(topicName, topicId); +OffsetCommitResponseTopic topic = topics.get(topicKey); +if (topic == null) { +topic = newTopic(topicName, topicId); +data.topics().add(topic); +topics.put(topicKey, topic); +} +return topic; +} +} + +public static final class BuilderByTopicId extends Builder { +protected BuilderByTopicId(short version) { +super(version); +} + +@Override +protected void validate(String topicName, Uuid topicId) { +if (topicId == null || Uuid.ZERO_UUID.equals(topicId)) +throw new UnsupportedVersionException("OffsetCommitResponse version " + version + +" does not support zero topic IDs."); +} + +@Override +protected Uuid classifer(String topicName, Uuid topicId) { +return topicId; +} + +@Override +protected OffsetCommitResponseTopic newTopic(String topicName, Uuid topicId) { +return new OffsetCommitResponseTopic().setName(null).setTopicId(topicId); Review Comment: We could get this in the base class and always set both of them. The serialization framework knows what to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121496085 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ## @@ -101,18 +103,18 @@ public static List getErrorResponseTopics( .setPartitionIndex(requestPartition.partitionIndex()) .setErrorCode(e.code())); } -responseTopicData.add(new OffsetCommitResponseTopic() -.setName(entry.name()) -.setPartitions(responsePartitions) -); +OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic() +.setTopicId(version >= 9 ? entry.topicId() : Uuid.ZERO_UUID) +.setName(version < 9 ? entry.name() : null); Review Comment: I think that we could just set both the name and the id all the time as the fields are ignorable. The serialization framework will do the right thing based on the version. We could also remove `version` from the arguments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121494379 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ## @@ -57,8 +55,22 @@ public Builder(OffsetCommitRequestData data) { public OffsetCommitRequest build(short version) { if (data.groupInstanceId() != null && version < 7) { throw new UnsupportedVersionException("The broker offset commit protocol version " + -version + " does not support usage of config group.instance.id."); +version + " does not support usage of config group.instance.id."); } + +data.topics().forEach(topic -> { Review Comment: I think that we could remove those checks now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121489116 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = +if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() +else + Collections.emptyMap[Uuid, String]() + + val resolvedTopics = new ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +var topicName = topic.name() +if (Utils.isBlank(topicName)) { + // Expected for requests version >= 9 which rely on topic IDs exclusively. + topicName = topicNames.get(topic.topicId()) +} +if (topicName != null) { + topic.setName(topicName) + resolvedTopics += topic +} + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, -offsetCommitRequest.data.topics.asScala +resolvedTopics )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(offsetCommitRequest.version()) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => Review Comment: You could use `resolvedTopics` instead of `offsetCommitRequest.data.topics` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121488495 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = +if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() +else + Collections.emptyMap[Uuid, String]() + + val resolvedTopics = new ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +var topicName = topic.name() +if (Utils.isBlank(topicName)) { + // Expected for requests version >= 9 which rely on topic IDs exclusively. + topicName = topicNames.get(topic.topicId()) +} +if (topicName != null) { + topic.setName(topicName) + resolvedTopics += topic +} + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, -offsetCommitRequest.data.topics.asScala +resolvedTopics )(_.name) - val responseBuilder = new OffsetCommitResponse.Builder() + val responseBuilder = OffsetCommitResponse.newBuilder(offsetCommitRequest.version()) val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() offsetCommitRequest.data.topics.forEach { topic => -if (!authorizedTopics.contains(topic.name)) { +if (Utils.isBlank(topic.name)) { Review Comment: I would move this up and do it in the first iteration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121487731 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -425,35 +425,63 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = +if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() +else + Collections.emptyMap[Uuid, String]() + + val resolvedTopics = new ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]() + offsetCommitRequest.data.topics.forEach { topic => +var topicName = topic.name() +if (Utils.isBlank(topicName)) { Review Comment: It would be better to rely on the version of the request instead of the topic name here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1121486904 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1352,8 +1361,42 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu Set unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { +String topicName = topic.name(); Review Comment: Now that we can rely on the version, we should use it here and simplify all this logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
dajac commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1121387449 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -74,45 +105,194 @@ public String name() { private Map> consumersPerTopic(Map consumerMetadata) { Map> topicToConsumers = new HashMap<>(); -for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { -String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { -put(topicToConsumers, topic, memberInfo); -} -} +consumerMetadata.forEach((consumerId, subscription) -> { +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +subscription.topics().forEach(topic -> put(topicToConsumers, topic, memberInfo)); +}); return topicToConsumers; } +/** + * Performs range assignment of the specified partitions for the consumers with the provided subscriptions. + * If rack-awareness is enabled for one or more consumers, we perform rack-aware assignment first to assign + * the subset of partitions that can be aligned on racks, while retaining the same co-partitioning and + * per-topic balancing guarantees as non-rack-aware range assignment. The remaining partitions are assigned + * using standard non-rack-aware range assignment logic, which may result in mis-aligned racks. + */ @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +Map consumerRacks = consumerRacks(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey()), consumerRacks)) +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); -for (String memberId : subscriptions.keySet()) -assignment.put(memberId, new ArrayList<>()); +subscriptions.keySet().forEach(memberId -> assignment.put(memberId, new ArrayList<>())); + +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers.keySet()) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +private void assignWithRackMatching(Collection assignmentStates, +Map>