[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on PR #13046: URL: https://github.com/apache/kafka/pull/13046#issuecomment-1420265571 These test failures seem to be unrelated. I rebased the PR against trunk. I will wait for the test results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
rittikaadhikari commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1097985124 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. Review Comment: nit: Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition -case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => - debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " + -s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition") - if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, -fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset())) -partitionsWithError += topicPartition + Review Comment: nit: extra line ## core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.FetchRequest; + +import java.util.Optional; + +public class ReplicaAlterLogDirsTierStateMachine implements
[GitHub] [kafka] yashmayya commented on pull request #13120: MINOR: Connect Javadocs improvements
yashmayya commented on PR #13120: URL: https://github.com/apache/kafka/pull/13120#issuecomment-1420163844 Thanks Mickael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on PR #13191: URL: https://github.com/apache/kafka/pull/13191#issuecomment-1420146327 > Thanks @hgeraldino! > > I'm a little confused by the amount of churn here--it seems like a lot of utility methods related to topic creation, record transformation, etc. have been removed and their contents inlined directly into test cases. If this isn't necessary for the migration, can we try to retain that approach in order to reduce duplication? > > It's also worth noting that there are unused stubbings in some of these tests, which should be failing the build but are not at the moment due to [KAFKA-14682](https://issues.apache.org/jira/browse/KAFKA-14682). You can find these unused stubbings by running `./gradlew :connect:runtime:test --tests AbstractWorkerSourceTaskTest` in your command line, or possibly by running the entire `AbstractWorkerSourceTaskTest` test suite in IntelliJ (which is how I discovered them). These unused stubbings should be removed before we merge the PR. That's fair. Personally I prefer self-contained test methods hat can be read top-bottom (without having to jump around), even if it violates the DRY principle. But I'm ok on keeping the existing structure & style, so I put back the (revised) helper methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1098136639 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -639,93 +814,25 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - -expectSendRecord(); -expectSendRecord(); - -PowerMock.replayAll(); +when(transformationChain.apply(any(SourceRecord.class))) +.thenAnswer((Answer) invocation -> invocation.getArgument(0)); +when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA), +anyString())) +.thenAnswer((Answer) invocation -> { +String headerValue = invocation.getArgument(3, String.class); +return headerValue.getBytes(StandardCharsets.UTF_8); +}); +when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(KEY_SCHEMA), eq(KEY))) +.thenReturn(SERIALIZED_KEY); +when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(RECORD_SCHEMA), +eq(RECORD))) +.thenReturn(SERIALIZED_RECORD); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); -} - -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -Headers headers -) { -if (headers != null) -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); - -expectApplyTransformationChain(anyTimes); - -Capture> sent = EasyMock.newCapture(); - -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); - -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (Callback cb : producerCallbacks.getValues()) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); -} -producerCallbacks.reset(); -} -return null; -}; - -if (anyTimes) -expect.andStubAnswer(expectResponse); -else -expect.andAnswer(expectResponse); - -expectTaskGetTopic(anyTimes); - -return sent; -} - -private Capture> expectSendRecordAnyTimes() { -return expectSendRecord(TOPIC, true, emptyHeaders()); -} - -private Capture> expectSendRecord() { -return expectSendRecord(TOPIC, false, emptyHeaders()); -} - -private void expectTaskGetTopic(boolean anyTimes) { -final Capture connectorCapture = EasyMock.newCapture(); -final Capture topicCapture = EasyMock.newCapture(); -IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( -EasyMock.capture(connectorCapture), -EasyMock.capture(topicCapture))); -if (anyTimes) { -expect.andStubAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.milliseconds())); -} else { -expect.andAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.milliseconds())); -} -if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { -assertEquals("job", connectorCapture.getValue()); -assertEquals(TOPIC, topicCapture.getValue()); -} -} - -private void expectTopicCreation(String topic) { -if (config.topicCreationEnable()) { - EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap()); -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic)); -} Review Comment: Fair enough, I restored some of these methods -- This is an automated message from the Apache Git
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1098135416 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest { @Mock private ConnectorOffsetBackingStore offsetStore; @Mock private StatusBackingStore statusBackingStore; @Mock private WorkerSourceTaskContext sourceTaskContext; -@MockStrict private TaskStatus.Listener statusListener; Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request, #13209: MINOR: remove unnecessary helper method
mjsax opened a new pull request, #13209: URL: https://github.com/apache/kafka/pull/13209 The helper method in question was updated via KIP-622 instead of removed. Cf https://github.com/apache/kafka/commit/029f5a136ae2c74f7f93e716bcc30ce90d8241ad -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
gharris1727 commented on PR #13208: URL: https://github.com/apache/kafka/pull/13208#issuecomment-1420026975 I verified that this fix corrects the flakey failures I was seeing in ConnectDistributedTest.test_bounce by manually running the tests with a flush interval of 1ms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
jolshan commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1098052600 ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED Review Comment: +1 to this idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684977#comment-17684977 ] Greg Harris commented on KAFKA-5756: [~mimaison] I have opened a PR that I think may alleviate this failure mode. > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-5756: -- Assignee: Greg Harris > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Assignee: Greg Harris >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
gharris1727 opened a new pull request, #13208: URL: https://github.com/apache/kafka/pull/13208 Both the SourceTaskOffsetCommitter and WorkerSourceTask trigger offset commits. Currently, when both threads attempt to start concurrent flushes, the second to call beginFlush receives an exception. The SourceTaskOffsetCommitter swallows this exception, while the WorkerSourceTask propagates the exception, preventing the final offset from completing cleanly and dropping final offsets. This change allows the second caller of waitForBeginFlush to wait for the first flush operation to complete, avoiding exceptions if offset flushes are prompt. Signed-off-by: Greg Harris ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14686) MockAdminClient.createTopics does not provide TopicMetadataAndConfig
Kirk True created KAFKA-14686: - Summary: MockAdminClient.createTopics does not provide TopicMetadataAndConfig Key: KAFKA-14686 URL: https://issues.apache.org/jira/browse/KAFKA-14686 Project: Kafka Issue Type: Bug Components: admin, clients, unit tests Reporter: Kirk True Assignee: Kirk True [Line 386 of {{MockAdminClient}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L386] does this: {quote}{{future.complete(null);}} {quote} It seems like we should be creating a {{TopicMetadataAndConfig}} instance and passing that in instead so that its available to the unit test caller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
hachikuji commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1098020205 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3509,6 +3510,13 @@ class KafkaApis(val requestChannel: RequestChannel, ) } + def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { +val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest] +// KIP-848 is not implemented yet so return UNSUPPORTED_VERSION. +requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) Review Comment: Would it be overkill to have a unit test for this in `KafkaApisTest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
hachikuji commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1098014628 ## clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java: ## @@ -212,6 +215,36 @@ public boolean isVersionSupported(short apiVersion) { return apiVersion >= oldestVersion() && apiVersion <= latestVersion(); } +public boolean isVersionEnabled(short apiVersion, boolean enableUnstableLastVersion) { +// ApiVersions API is a particular case that we always accept any, even Review Comment: It would be helpful to clarify the reason for this. ## core/src/main/scala/kafka/server/ApiVersionManager.scala: ## @@ -45,55 +45,70 @@ object ApiVersionManager { listenerType, forwardingManager, supportedFeatures, - metadataCache + metadataCache, + config.unstableApiVersionsEnabled ) } } class SimpleApiVersionManager( val listenerType: ListenerType, val enabledApis: collection.Set[ApiKeys], - brokerFeatures: Features[SupportedVersionRange] + brokerFeatures: Features[SupportedVersionRange], + enableUnstableLastVersion: Boolean ) extends ApiVersionManager { - def this(listenerType: ListenerType) = { -this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures()) + def this( +listenerType: ListenerType, +enableUnstableLastVersion: Boolean + ) = { +this( + listenerType, + ApiKeys.apisForListener(listenerType).asScala, + BrokerFeatures.defaultSupportedFeatures(), + enableUnstableLastVersion +) } - private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava) + private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures) } + + override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { +apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) + } } class DefaultApiVersionManager( val listenerType: ListenerType, forwardingManager: Option[ForwardingManager], features: BrokerFeatures, - metadataCache: MetadataCache + metadataCache: MetadataCache, + enableUnstableLastVersion: Boolean ) extends ApiVersionManager { + val enabledApis = ApiKeys.apisForListener(listenerType).asScala + override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { val supportedFeatures = features.supportedFeatures val finalizedFeatures = metadataCache.features() val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) ApiVersionsResponse.createApiVersionsResponse( -throttleTimeMs, -metadataCache.metadataVersion().highestSupportedRecordVersion, -supportedFeatures, -finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, -finalizedFeatures.epoch, -controllerApiVersions.orNull, -listenerType) - } - - override def enabledApis: collection.Set[ApiKeys] = { -ApiKeys.apisForListener(listenerType).asScala + throttleTimeMs, + metadataCache.metadataVersion().highestSupportedRecordVersion, + supportedFeatures, + finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, + finalizedFeatures.epoch, + controllerApiVersions.orNull, + listenerType, + enableUnstableLastVersion +) } - override def isApiEnabled(apiKey: ApiKeys): Boolean = { -apiKey.inScope(listenerType) + override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { +apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) Review Comment: Could we pull this implementation up to the `trait`? The implementation looks the same for `SimpleApiVersionManager`. ## generator/src/main/java/org/apache/kafka/message/RequestApiStabilityType.java: ## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific
[jira] [Commented] (KAFKA-14685) TierStateMachine interface for building remote aux log
[ https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684968#comment-17684968 ] Matthew Wong commented on KAFKA-14685: -- https://github.com/apache/kafka/pull/13206 > TierStateMachine interface for building remote aux log > -- > > Key: KAFKA-14685 > URL: https://issues.apache.org/jira/browse/KAFKA-14685 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Matthew Wong >Priority: Major > > To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can > introduce an interface to manage state transitions of building the remote aux > log asynchronously -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14685) TierStateMachine interface for building remote aux log
Matthew Wong created KAFKA-14685: Summary: TierStateMachine interface for building remote aux log Key: KAFKA-14685 URL: https://issues.apache.org/jira/browse/KAFKA-14685 Project: Kafka Issue Type: Sub-task Components: core Reporter: Matthew Wong To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can introduce an interface to manage state transitions of building the remote aux log asynchronously -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #13207: KAFKA-14664; Fix inaccurate raft idle ratio metric
hachikuji opened a new pull request, #13207: URL: https://github.com/apache/kafka/pull/13207 The raft idle ratio is currently computed as the average of all recorded poll durations. This tends to underestimate the actual idle ratio since it treats all measurements equally regardless how much time was spent. For example, say we poll twice with the following durations: Poll 1: 2s Poll 2: 0s Assume that the busy time is negligible, so 2s passes overall. In the first measurement, 2s is spent waiting, so we compute and record a ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 0.5), which suggests that the process was busy for 1s, which overestimates the true busy time. In this patch, I've created a new `TimeRatio` class which tracks the total duration of a periodic event over a full interval of time measurement. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
junrao commented on PR #13206: URL: https://github.com/apache/kafka/pull/13206#issuecomment-1419883513 cc @satishd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 opened a new pull request, #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 opened a new pull request, #13206: URL: https://github.com/apache/kafka/pull/13206 This PR adds the TierStateMachine interface to handle all state transitions related to tiered storage and building the remote log aux state. The new interface supports a `start` and `maybeAdvanceState`. In the `ReplicaFetcherTierStateMachine`, the `maybeAdvanceState` is unused since the implementation is synchronous. Only the `start` is needed. This PR keeps the addition of the `maybeAdvanceState` because there is an existing task for building the remote log aux state in an asynchronous manner that will be able to use the full interface https://issues.apache.org/jira/browse/KAFKA-13560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684958#comment-17684958 ] Matthias J. Sax commented on KAFKA-14660: - Correct. 3.4.0 is already voted and should be released soon. I plan to cherry-pick for 3.4.1 after 3.4.0 is out. > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 3.5.0 > > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
ijuma commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1419776869 @dejan2609 We'll wait for the final release, but it's fine to start working through the issues. Looks like the build failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > since a standby task keeps getting data from the changelogs written by the active stream tasks and hence would not complete restoration at all. -> `since a standby task does not actually restore state (and updating a standby task does never finish).` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > since a standby task keeps getting data from the changelogs written by the active stream tasks and hence would not complete restoration at all. -> `since a standby task does not actually _restore_ state (and updating a standby does never finish).` ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > since a standby task keeps getting data from the changelogs written by the active stream tasks and hence would not complete restoration at all. -> `since a standby task does not actually _restore_ state (and updating a standby task does never finish).` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: > since a standby task keeps getting data from the changelogs written by the active stream tasks and hence would not complete restoration at all. -> `since a standby task does not actually _restore_ state, and updating a standby does never finish. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
mjsax commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097931763 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: `restoration` -> `update` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
philipnee commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1097801059 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupStateManager.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.OffsetCommitRequest; + +import java.util.Objects; +import java.util.Optional; + +public class GroupStateManager { Review Comment: thanks for suggestions, changed. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; +private final Logger log; +private final Optional autoCommitState; +private final CoordinatorRequestManager coordinatorRequestManager; +private final GroupStateManager groupState; Review Comment: Agreed. makes sense. thanks. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig;
[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684947#comment-17684947 ] Hector Geraldino commented on KAFKA-14132: -- Perfect. Entered https://issues.apache.org/jira/browse/KAFKA-14683 and https://issues.apache.org/jira/browse/KAFKA-14684 to track progress > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) > # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) > ([https://github.com/apache/kafka/pull/12728]) > # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) > # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) > # RetryUtilTest (owner: [~mdedetrich-aiven] ) > # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) > # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hector Geraldino updated KAFKA-14132: - Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # WorkerSinkTaskTest (owner: ??) # WorkerSinkTaskThreadedTest (owner: ??) # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner:
[jira] [Created] (KAFKA-14684) Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskThreadedTest
Hector Geraldino created KAFKA-14684: Summary: Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskThreadedTest Key: KAFKA-14684 URL: https://issues.apache.org/jira/browse/KAFKA-14684 Project: Kafka Issue Type: Sub-task Components: KafkaConnect Reporter: Hector Geraldino Assignee: Hector Geraldino -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
rondagostino commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1097911296 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -322,15 +325,37 @@ class ControllerApis(val requestChannel: RequestChannel, } } // Finally, the idToName map contains all the topics that we are authorized to delete. -// Perform the deletion and create responses for each one. -controller.deleteTopics(context, idToName.keySet).thenApply { idToError => - idToError.forEach { (id, error) => -appendResponse(idToName.get(id), id, error) +// First check the controller mutation quota if necessary, and then Review Comment: I believe these locks are scoped by the controller mutation quota manager such that nobody else would contest for them aside from the single-threaded controller. @dajac might you be able to confirm this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14683) Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskTest
Hector Geraldino created KAFKA-14683: Summary: Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskTest Key: KAFKA-14683 URL: https://issues.apache.org/jira/browse/KAFKA-14683 Project: Kafka Issue Type: Sub-task Components: KafkaConnect Reporter: Hector Geraldino Assignee: Hector Geraldino -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
C0urante commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1097852548 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest { @Mock private ConnectorOffsetBackingStore offsetStore; @Mock private StatusBackingStore statusBackingStore; @Mock private WorkerSourceTaskContext sourceTaskContext; -@MockStrict private TaskStatus.Listener statusListener; Review Comment: In order to retain the same guarantees we have currently w/r/t interactions with this class, can we add a call to `verifyNoMoreInteractions(statusListener);` in the `tearDown` method? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -639,93 +814,25 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - -expectSendRecord(); -expectSendRecord(); - -PowerMock.replayAll(); +when(transformationChain.apply(any(SourceRecord.class))) +.thenAnswer((Answer) invocation -> invocation.getArgument(0)); +when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA), +anyString())) +.thenAnswer((Answer) invocation -> { +String headerValue = invocation.getArgument(3, String.class); +return headerValue.getBytes(StandardCharsets.UTF_8); +}); +when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(KEY_SCHEMA), eq(KEY))) +.thenReturn(SERIALIZED_KEY); +when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(RECORD_SCHEMA), +eq(RECORD))) +.thenReturn(SERIALIZED_RECORD); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); -} - -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -Headers headers -) { -if (headers != null) -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); - -expectApplyTransformationChain(anyTimes); - -Capture> sent = EasyMock.newCapture(); - -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); - -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (Callback cb : producerCallbacks.getValues()) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); -} -producerCallbacks.reset(); -} -return null; -}; - -if (anyTimes) -expect.andStubAnswer(expectResponse); -else -expect.andAnswer(expectResponse); - -expectTaskGetTopic(anyTimes); - -return sent; -} - -private Capture> expectSendRecordAnyTimes() { -return expectSendRecord(TOPIC, true, emptyHeaders()); -} - -private Capture> expectSendRecord() { -return expectSendRecord(TOPIC, false, emptyHeaders()); -} - -private void expectTaskGetTopic(boolean anyTimes) { -final Capture connectorCapture = EasyMock.newCapture(); -final Capture topicCapture = EasyMock.newCapture(); -IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( -EasyMock.capture(connectorCapture), -EasyMock.capture(topicCapture))); -if (anyTimes) { -expect.andStubAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.milliseconds())); -} else { -expect.andAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.milliseconds())); -} -if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { -
[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14682: -- Description: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter instead of the former, which can cause tests to fail due to unnecessary stubbings when being run in that IDE but not when being built on Jenkins. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. was: We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the > latter instead of the former, which can cause tests to fail due to > unnecessary stubbings when being run in that IDE but not when being built on > Jenkins. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
[ https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684909#comment-17684909 ] Chris Egerton commented on KAFKA-14682: --- [~divijvaidya] FYI, this may interest you. > Unused stubbings are not reported by Mockito during CI builds > - > > Key: KAFKA-14682 > URL: https://issues.apache.org/jira/browse/KAFKA-14682 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Chris Egerton >Priority: Major > > We've started using [strict > stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] > for unit tests written with Mockito, which is supposed to automatically fail > tests when they set up mock expectations that go unused. > However, these failures are not reported during Jenkins builds, even if they > are reported when building/testing locally. > In at least one case, this difference appears to be because our [Jenkins > build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] > uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the > project's [Gradle build > file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], > instead of the {{test}} task. > It's possible that, because the custom test tasks filter out some tests from > running, Mockito does not check for unnecessary stubbings in order to avoid > incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds
Chris Egerton created KAFKA-14682: - Summary: Unused stubbings are not reported by Mockito during CI builds Key: KAFKA-14682 URL: https://issues.apache.org/jira/browse/KAFKA-14682 Project: Kafka Issue Type: Test Components: unit tests Reporter: Chris Egerton We've started using [strict stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html] for unit tests written with Mockito, which is supposed to automatically fail tests when they set up mock expectations that go unused. However, these failures are not reported during Jenkins builds, even if they are reported when building/testing locally. In at least one case, this difference appears to be because our [Jenkins build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35] uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the project's [Gradle build file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543], instead of the {{test}} task. It's possible that, because the custom test tasks filter out some tests from running, Mockito does not check for unnecessary stubbings in order to avoid incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vladimirdyuzhev commented on pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used
vladimirdyuzhev commented on PR #13081: URL: https://github.com/apache/kafka/pull/13081#issuecomment-1419634909 Created JIRA [KAFKA-14681](https://issues.apache.org/jira/browse/KAFKA-14681) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)
[ https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vlad D. updated KAFKA-14681: Description: The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided callback. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. was: The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided ticket. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. > Refreshing Kerberos TGT is not using CallbackHandler (causing failure to > refresh) > - > > Key: KAFKA-14681 > URL: https://issues.apache.org/jira/browse/KAFKA-14681 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Vlad D. >Priority: Major > Labels: kerberos, security > > The JAAS + Kerberos authentication in KerberosLogin.java class, when > obtaining Kerberos TGT, makes use of the client-provided callback handler. > This is a must-have when the security configuration is not default. > However, the same code, when it is time to renew the Kerberos TGT ticket, > ignores the provided callback. That works OK for default configuration (JAAS > configuration, Kerberos config and keytab are available). > But when the security configuration sources are custom, and the default > Kerberos code is not supporting them, the callback is to be used to obtain > the configuration properties. > A fix is done to pass the same callback handler in KerberosLogin::reLogin and > store the callback handler in the super class AbstractLogin, similar to > contextName and configuration. > The fix is in PR [https://github.com/apache/kafka/pull/13081] > It is tested in our SFT environments and works fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)
[ https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vlad D. updated KAFKA-14681: Description: The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided callback. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used even for TGT refresh to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. was: The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided callback. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. > Refreshing Kerberos TGT is not using CallbackHandler (causing failure to > refresh) > - > > Key: KAFKA-14681 > URL: https://issues.apache.org/jira/browse/KAFKA-14681 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Vlad D. >Priority: Major > Labels: kerberos, security > > The JAAS + Kerberos authentication in KerberosLogin.java class, when > obtaining Kerberos TGT, makes use of the client-provided callback handler. > This is a must-have when the security configuration is not default. > However, the same code, when it is time to renew the Kerberos TGT ticket, > ignores the provided callback. That works OK for default configuration (JAAS > configuration, Kerberos config and keytab are available). > But when the security configuration sources are custom, and the default > Kerberos code is not supporting them, the callback is to be used even for TGT > refresh to obtain the configuration properties. > A fix is done to pass the same callback handler in KerberosLogin::reLogin and > store the callback handler in the super class AbstractLogin, similar to > contextName and configuration. > The fix is in PR [https://github.com/apache/kafka/pull/13081] > It is tested in our SFT environments and works fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)
[ https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vlad D. updated KAFKA-14681: Description: The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided ticket. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. was: The SASL + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided ticket. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. > Refreshing Kerberos TGT is not using CallbackHandler (causing failure to > refresh) > - > > Key: KAFKA-14681 > URL: https://issues.apache.org/jira/browse/KAFKA-14681 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Vlad D. >Priority: Major > Labels: kerberos, security > > The JAAS + Kerberos authentication in KerberosLogin.java class, when > obtaining Kerberos TGT, makes use of the client-provided callback handler. > This is a must-have when the security configuration is not default. > However, the same code, when it is time to renew the Kerberos TGT ticket, > ignores the provided ticket. That works OK for default configuration (JAAS > configuration, Kerberos config and keytab are available). > But when the security configuration sources are custom, and the default > Kerberos code is not supporting them, the callback is to be used to obtain > the configuration properties. > A fix is done to pass the same callback handler in KerberosLogin::reLogin and > store the callback handler in the super class AbstractLogin, similar to > contextName and configuration. > The fix is in PR [https://github.com/apache/kafka/pull/13081] > It is tested in our SFT environments and works fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)
Vlad D. created KAFKA-14681: --- Summary: Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh) Key: KAFKA-14681 URL: https://issues.apache.org/jira/browse/KAFKA-14681 Project: Kafka Issue Type: Bug Components: security Reporter: Vlad D. The SASL + Kerberos authentication in KerberosLogin.java class, when obtaining Kerberos TGT, makes use of the client-provided callback handler. This is a must-have when the security configuration is not default. However, the same code, when it is time to renew the Kerberos TGT ticket, ignores the provided ticket. That works OK for default configuration (JAAS configuration, Kerberos config and keytab are available). But when the security configuration sources are custom, and the default Kerberos code is not supporting them, the callback is to be used to obtain the configuration properties. A fix is done to pass the same callback handler in KerberosLogin::reLogin and store the callback handler in the super class AbstractLogin, similar to contextName and configuration. The fix is in PR [https://github.com/apache/kafka/pull/13081] It is tested in our SFT environments and works fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1419585603 Related to #13199 Thanx for heads up @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13972) Reassignment cancellation causes stray replicas
[ https://issues.apache.org/jira/browse/KAFKA-13972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13972. - Resolution: Fixed > Reassignment cancellation causes stray replicas > --- > > Key: KAFKA-13972 > URL: https://issues.apache.org/jira/browse/KAFKA-13972 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.4.1 > > > A stray replica is one that is left behind on a broker after the partition > has been reassigned to other brokers or the partition has been deleted. We > found one case where this can happen is after a cancelled reassignment. When > a reassignment is cancelled, the controller sends `StopReplica` requests to > any of the adding replicas, but it does not necessarily bump the leader > epoch. Following > [KIP-570|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest],] > brokers will ignore `StopReplica` requests if the leader epoch matches the > current partition leader epoch. So we need to bump the epoch whenever we need > to ensure that `StopReplica` will be received. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #13197: Minor: Decode the envelope requests for the request log
jsancio commented on code in PR #13197: URL: https://github.com/apache/kafka/pull/13197#discussion_r1097782211 ## core/src/main/scala/kafka/network/RequestConvertToJson.scala: ## @@ -60,7 +61,16 @@ object RequestConvertToJson { case req: ElectLeadersRequest => ElectLeadersRequestDataJsonConverter.write(req.data, request.version) case req: EndTxnRequest => EndTxnRequestDataJsonConverter.write(req.data, request.version) case req: EndQuorumEpochRequest => EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version) - case req: EnvelopeRequest => EnvelopeRequestDataJsonConverter.write(req.data, request.version) + case req: EnvelopeRequest => { +val envelopeRequestData = req.data() +val envelopeData = envelopeRequestData.requestData().duplicate() +val envelopeHeader = EnvelopeUtils.parseForwardedRequestHeader(envelopeData) +val requestJson = RequestConvertToJson.request(AbstractRequest.parseRequest(envelopeHeader.apiKey(), envelopeHeader.apiVersion(), envelopeData).request) Review Comment: Hey @mumrah, does this parse the enveloped request twice? If so, is it possible to implement this feature without parsing the enveloped request twice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 opened a new pull request, #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 opened a new pull request, #13205: URL: https://github.com/apache/kafka/pull/13205 details: * gradle upgrade: 7.6 -->> 8.0-rc-3 * spotbugs plugin upgrade: 5.0.9 -->> 5.0.13 * declaration test.dependsOn(':spotlessScalaCheck') removed for project(':streams:streams-scala') in order to comply with Gradle 8 defaults -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0
cmccabe merged PR #13198: URL: https://github.com/apache/kafka/pull/13198 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14680) Gradle version upgrade 7 -->> 8
[ https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684864#comment-17684864 ] Ismael Juma edited comment on KAFKA-14680 at 2/6/23 6:31 PM: - [https://github.com/apache/kafka/pull/13199] helps with some gradle plugin version bumps. was (Author: ijuma): [https://github.com/apache/kafka/pull/13199] helps. > Gradle version upgrade 7 -->> 8 > --- > > Key: KAFKA-14680 > URL: https://issues.apache.org/jira/browse/KAFKA-14680 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Major > > *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released > soon): > * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3] > * [https://docs.gradle.org/8.0-rc-3/release-notes.html] > *Upgrade notes:* > [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14680) Gradle version upgrade 7 -->> 8
[ https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684864#comment-17684864 ] Ismael Juma commented on KAFKA-14680: - [https://github.com/apache/kafka/pull/13199] helps. > Gradle version upgrade 7 -->> 8 > --- > > Key: KAFKA-14680 > URL: https://issues.apache.org/jira/browse/KAFKA-14680 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Dejan Stojadinović >Assignee: Dejan Stojadinović >Priority: Major > > *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released > soon): > * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3] > * [https://docs.gradle.org/8.0-rc-3/release-notes.html] > *Upgrade notes:* > [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14680) Gradle version upgrade 7 -->> 8
Dejan Stojadinović created KAFKA-14680: -- Summary: Gradle version upgrade 7 -->> 8 Key: KAFKA-14680 URL: https://issues.apache.org/jira/browse/KAFKA-14680 Project: Kafka Issue Type: Improvement Components: build Reporter: Dejan Stojadinović Assignee: Dejan Stojadinović *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released soon): * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3] * [https://docs.gradle.org/8.0-rc-3/release-notes.html] *Upgrade notes:* [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
mumrah commented on PR #13183: URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419529065 Gotcha, yea the docs are still in progress. We'll have something published soon (hopefully before the release announcement) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097749095 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -986,8 +986,23 @@ public void unregister(final Collection revokedChangelogs) { for (final TopicPartition partition : revokedChangelogs) { final ChangelogMetadata changelogMetadata = changelogs.remove(partition); if (changelogMetadata != null) { +// if the changelog is still in REGISTERED, it means it has not initialized and started +// restoring yet, and hence we should not try to remove the changelog partition if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) { revokedInitializedChangelogs.add(partition); + +// if the changelog is not in RESTORING, it means +// the corresponding onRestoreStart was not called; in this case +// we should not call onRestoreSuspended either +if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE && + changelogMetadata.state().equals(ChangelogState.RESTORING)) { +try { +final String storeName = changelogMetadata.storeMetadata.store().name(); +stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored); +} catch (final Exception e) { +throw new StreamsException("State restore listener failed on restore paused", e); Review Comment: The exception returned from user instantiated functions is arbitrary while the exception channels we are sending from restore thread to the main thread currently is only expecting internal ones. So I think we either need to allow arbitrary exceptions to be sent to the main thread via the queue as well, or do that. Personally I'm in favor of this approach to avoid a `catch all` clause that may hide any lurking exception handling bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14663) High throughput topics can starve low-throughput MM2 offset syncs
[ https://issues.apache.org/jira/browse/KAFKA-14663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14663. --- Resolution: Duplicate > High throughput topics can starve low-throughput MM2 offset syncs > - > > Key: KAFKA-14663 > URL: https://issues.apache.org/jira/browse/KAFKA-14663 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.1.0, 3.0.0, 3.3.0, 3.4.0, 3.5.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > In MM2, a semaphore is used to throttle the number of offset syncs written to > the offset-syncs topic. If too many offset writes are requested (for example, > from high-throughput topics) then some are silently dropped and never > retried. This is acceptable for a single topic-partition, where a later > record may re-trigger the offset-sync and write the sync successfully. > However, if there is a large variance between throughput in the topics > emitted by an MM2 instance, it is possible for high-throughput topics to > trigger many offset syncs, and cause the offset-syncs for a co-located > low-throughput topic to be unfairly dropped. > This can cause the downstream offsets for the starved topic to lag behind > significantly, or be prevented completely. > Instead, we should have some sort of fairness mechanism where low-thoughput > topics are given similar priority to high-throughput topics in producing > offset syncs, and cause excess sync messages from high-throughput topics to > be dropped instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener
guozhangwang commented on code in PR #13179: URL: https://github.com/apache/kafka/pull/13179#discussion_r1097746204 ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -37,6 +40,11 @@ * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single * class during state store registration. * + * + * Also note that standby tasks restoration process are not monitored via this interface, since a standby task keep Review Comment: ack ## streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java: ## @@ -85,4 +93,17 @@ void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored); +/** + * Method called when restoring the {@link StateStore} is suspended due to the task being migrated out of the host. + * If the migrated task is recycled or re-assigned back to the current host, another + * {@link #onRestoreStart(TopicPartition, String, long, long)} would be called. + * + * @param topicPartition the TopicPartition containing the values to restore Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
FireBurn commented on PR #13183: URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419452307 Fab, I've just built 3.4.0 from the tag, but couldn't find the migration docs, so worried it had been delayed to 3.4.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
mumrah commented on PR #13183: URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419445476 @FireBurn, no it hasn't been delayed. We have an early access version coming out in 3.4.0 which will be announced this week. We plan on back-porting the remaining ZK migration work to the 3.4.x line so we can continue releasing without waiting for the next major release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration
FireBurn commented on PR #13183: URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419426113 Has ZK migration to Kraft been delayed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
clolov commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r1097662809 ## clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java: ## @@ -870,7 +870,37 @@ synchronized public AlterReplicaLogDirsResult alterReplicaLogDirs( @Override synchronized public DescribeLogDirsResult describeLogDirs(Collection brokers, Review Comment: This implementation is provided because I wanted to use MockAdminClient rather than use Mockito and mock the Admin interface. I am open to suggestions for improving the current logic. ## tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java: ## @@ -0,0 +1,68 @@ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.Node; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogDirsCommandTest { + +@Test +public void checkLogDirsCommandOutput() throws UnsupportedEncodingException, TerseException, ExecutionException, JsonProcessingException, InterruptedException { +ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); +PrintStream printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name()); + +PrintStream originalStandardOut = System.out; +System.setOut(printStream); + +Node broker = new Node(1, "hostname", 9092); + +try (MockAdminClient adminClient = new MockAdminClient(Collections.singletonList(broker), broker)) { Review Comment: I opted to move the unit test as is, rather than improve it. If there is a preference to improve on it, I would either use a Mockito to mock the Admin interface for describeLogDirs or I would contribute a more accurate implementation to the MockAdminClient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13199: MINOR: Update build and test dependencies for 3.5
ijuma commented on PR #13199: URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419378585 @showuon This is now ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13199: MINOR: Update build and test dependencies for 3.5
ijuma commented on PR #13199: URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419378251 JDK 11 build passed, the other two had unrelated failures: > Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true 20 sec 1 > Build / JDK 8 and Scala 2.12 / kafka.admin.ReassignPartitionsIntegrationTest.testProduceAndConsumeWithReassignmentInProgress(String).quorum=kraft -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM opened a new pull request, #13204: KAFKA-14593: Move LeaderElectionCommand to tools
OmniaGM opened a new pull request, #13204: URL: https://github.com/apache/kafka/pull/13204 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14679) Add new __consumer_offsets records
David Jacot created KAFKA-14679: --- Summary: Add new __consumer_offsets records Key: KAFKA-14679 URL: https://issues.apache.org/jira/browse/KAFKA-14679 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13200: KAFKA-14678: Move `__consumer_offsets` records from `core` to `group-coordinator`
dajac commented on PR #13200: URL: https://github.com/apache/kafka/pull/13200#issuecomment-1419247177 @mimaison Thanks! Created KAFKA-14678. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14678) Move __consumer_offsets records from core to group-coordinator
David Jacot created KAFKA-14678: --- Summary: Move __consumer_offsets records from core to group-coordinator Key: KAFKA-14678 URL: https://issues.apache.org/jira/browse/KAFKA-14678 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097469398 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] clolov commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers
clolov commented on code in PR #13196: URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, FetchDataInfo, Fetch import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ +/** + * Listener receives notification from an Online Partition. + * + * A listener can be (re-)registered to an Online partition only. The listener + * is notified as long as the partition remains Online. When the partition fails + * or is deleted, respectively `onFailed` or `onDeleted` are called once. No further + * notifications are sent after this point on. + * + * Note that the callbacks are executed in the thread that triggers the change + * AND that locks may be hold during their execution. They are meant to be used Review Comment: ```suggestion * AND that locks may be held during their execution. They are meant to be used ``` ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(replicas, partition.assignmentState.replicas) } + @Test + def testAddAndRemoveListeners(): Unit = { +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) + +partition.makeLeader( + new LeaderAndIsrPartitionState() +.setControllerEpoch(0) +.setLeader(brokerId) +.setLeaderEpoch(0) +.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setPartitionEpoch(1) +.setIsNew(true), + offsetCheckpoints, + topicId = None) + +val listener1 = new MockPartitionListener() +val listener2 = new MockPartitionListener() + +assertTrue(partition.maybeAddListener(listener1)) +listener1.verify(expectedHighWatermark = 0L) + +partition.appendRecordsToLeader( + records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))), + origin = AppendOrigin.CLIENT, + requiredAcks = 0, + requestLocal = RequestLocal.NoCaching +) + +listener1.verify() +listener2.verify() + +assertTrue(partition.maybeAddListener(listener2)) +listener2.verify(expectedHighWatermark = 0L) + +partition.appendRecordsToLeader( + records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))), + origin = AppendOrigin.CLIENT, + requiredAcks = 0, + requestLocal = RequestLocal.NoCaching +) + +fetchFollower( + partition = partition, + replicaId = brokerId + 1, + fetchOffset = partition.localLogOrException.logEndOffset +) + +listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset) +listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset) + +partition.removeListener(listener1) + +partition.appendRecordsToLeader( + records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))), + origin = AppendOrigin.CLIENT, + requiredAcks = 0, + requestLocal = RequestLocal.NoCaching +) + +fetchFollower( + partition = partition, + replicaId = brokerId + 1, + fetchOffset = partition.localLogOrException.logEndOffset +) + +listener1.verify() +listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset) + } + + @Test + def testAddListenerFailsWhenPartitionIsDeleted(): Unit = { +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) + +partition.makeLeader( + new LeaderAndIsrPartitionState() +.setControllerEpoch(0) +.setLeader(brokerId) +.setLeaderEpoch(0) +.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setPartitionEpoch(1) +.setIsNew(true), + offsetCheckpoints, + topicId = None) + +partition.delete() + +assertFalse(partition.maybeAddListener(new MockPartitionListener())) + } + + @Test + def testPartitionListenerWhenLogOffsetsChanged(): Unit = { +partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None) + +partition.makeLeader( + new LeaderAndIsrPartitionState() +.setControllerEpoch(0) +.setLeader(brokerId) +.setLeaderEpoch(0) +.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava) +.setPartitionEpoch(1) +.setIsNew(true), + offsetCheckpoints, + topicId = None) + +val listener = new MockPartitionListener() +partition.maybeAddListener(listener) Review Comment: Nit:
[GitHub] [kafka] dajac opened a new pull request, #13203: MINOR: Add KIP-848 new `__consumer_offsets` records
dajac opened a new pull request, #13203: URL: https://github.com/apache/kafka/pull/13203 WIP. https://github.com/apache/kafka/pull/13200 must go first. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
fvaleri commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] dajac opened a new pull request, #13202: KAFKA-14513; Add broker side PartitionAssignor interface
dajac opened a new pull request, #13202: URL: https://github.com/apache/kafka/pull/13202 This patch adds the broker side `PartitionAssignor` interface as detailed in KIP-848. The interfaces differs a bit from the KIP in the following ways: * The POJOs are not defined within the interface because the interface is to heavy like this. * The interface is kept in the `group-coordinator` module for now. We don't want to have it out there until KIP-848 is ready to be released. We will move it to its final destination later. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
fvaleri commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
fvaleri commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
fvaleri commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0
clolov commented on code in PR #13198: URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323 ## core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala: ## @@ -174,7 +174,7 @@ class FeatureCommandUnitTest { @Test def testMetadataVersionsToString(): Unit = { assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1", Review Comment: Already pointed out by @satishd, but I believe changing 3.4-IV1 to 3.5-IV0 here will make the test pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0
clolov commented on code in PR #13198: URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323 ## core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala: ## @@ -174,7 +174,7 @@ class FeatureCommandUnitTest { @Test def testMetadataVersionsToString(): Unit = { assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1", Review Comment: Already pointed out by @fvaleri, but I believe changing 3.4-IV1 to 3.5-IV0 here will make the test pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0
clolov commented on code in PR #13198: URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323 ## core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala: ## @@ -174,7 +174,7 @@ class FeatureCommandUnitTest { @Test def testMetadataVersionsToString(): Unit = { assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1", Review Comment: Already pointed out by @fvaleri, but I believe changing 3.4-IV1 here will make the test pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] clolov commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
clolov commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1097398711 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] dajac commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`
dajac commented on code in PR #13200: URL: https://github.com/apache/kafka/pull/13200#discussion_r1097390286 ## build.gradle: ## @@ -1266,6 +1272,23 @@ project(':group-coordinator') { javadoc { enabled = false } + + task processMessages(type:JavaExec) { +mainClass = "org.apache.kafka.message.MessageGenerator" +classpath = configurations.generator +args = [ "-p", "org.apache.kafka.coordinator.group.generated", Review Comment: Yeah, I have noticed this as well. Personally, I like having `generated` in the package name because it makes the intent clear in the code. I am also fine if folks prefer to remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`
dajac commented on code in PR #13200: URL: https://github.com/apache/kafka/pull/13200#discussion_r1097389168 ## checkstyle/import-control.xml: ## @@ -341,7 +341,9 @@ + Review Comment: Yes. The generated code needs this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method
C0urante commented on PR #13181: URL: https://github.com/apache/kafka/pull/13181#issuecomment-1419098772 Thanks Mickael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`
clolov commented on code in PR #13200: URL: https://github.com/apache/kafka/pull/13200#discussion_r1097379412 ## checkstyle/import-control.xml: ## @@ -341,7 +341,9 @@ + Review Comment: For my curiosity, are these lines needed in the context of this pull request or they are a remnant from a previous change? ## build.gradle: ## @@ -1266,6 +1272,23 @@ project(':group-coordinator') { javadoc { enabled = false } + + task processMessages(type:JavaExec) { +mainClass = "org.apache.kafka.message.MessageGenerator" +classpath = configurations.generator +args = [ "-p", "org.apache.kafka.coordinator.group.generated", Review Comment: I noticed that sometimes we append `.generated` in similar closures and sometimes we do not. Are we appending `.generated` going forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] fqaiser94 commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
fqaiser94 commented on code in PR #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r1097375552 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java: ## @@ -45,34 +47,85 @@ public void setIfUnset(final SerdeGetter getter) { } } +@SuppressWarnings("checkstyle:cyclomaticComplexity") +private boolean isUpgrade(final Map configs) { +final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); +if (upgradeFrom == null) { +return false; +} + +switch ((String) upgradeFrom) { +case StreamsConfig.UPGRADE_FROM_0100: +case StreamsConfig.UPGRADE_FROM_0101: +case StreamsConfig.UPGRADE_FROM_0102: +case StreamsConfig.UPGRADE_FROM_0110: +case StreamsConfig.UPGRADE_FROM_10: +case StreamsConfig.UPGRADE_FROM_11: +case StreamsConfig.UPGRADE_FROM_20: +case StreamsConfig.UPGRADE_FROM_21: +case StreamsConfig.UPGRADE_FROM_22: +case StreamsConfig.UPGRADE_FROM_23: +case StreamsConfig.UPGRADE_FROM_24: +case StreamsConfig.UPGRADE_FROM_25: +case StreamsConfig.UPGRADE_FROM_26: +case StreamsConfig.UPGRADE_FROM_27: +case StreamsConfig.UPGRADE_FROM_28: +case StreamsConfig.UPGRADE_FROM_30: +case StreamsConfig.UPGRADE_FROM_31: +case StreamsConfig.UPGRADE_FROM_32: +case StreamsConfig.UPGRADE_FROM_33: +return true; +default: +return false; +} +} + +@Override +public void configure(final Map configs, final boolean isKey) { +this.isUpgrade = isUpgrade(configs); +} + /** * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * both values are not null and is upgrading from a version less than 3.4 */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; - -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; + +final byte[] newData = inner.serialize(topic, headers, data.newValue); +final byte[] oldData = inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newValueIsNull ? 0 : newData.length; +final int oldDataLength = oldValueIsNull ? 0 : oldData.length; + +// The serialization format is: +// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} +// {BYTE_ARRAY newValue}{BYTE newOldFlag=1} +// {INT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} +final ByteBuffer buf; +if (!newValueIsNull && !oldValueIsNull) { +if (isUpgrade) { throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); ++ " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading."); +} else { +final int capacity = Integer.BYTES + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE; +buf = ByteBuffer.allocate(capacity); +buf.putInt(newDataLength).put(newData).put(oldData).put((byte) 2); } - -serializedKey = inner.serialize(topic, headers, data.newValue); +} else if (!newValueIsNull) { +final int capacity = newDataLength + NEW_OLD_FLAG_SIZE; +buf = ByteBuffer.allocate(capacity); +buf.put(newData).put((byte) 1); +} else if (!oldValueIsNull) { +final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE; +buf = ByteBuffer.allocate(capacity); +buf.put(oldData).put((byte) 0); } else { -if (data.oldValue == null) { -throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); -} - -serializedKey = inner.serialize(topic, headers, data.oldValue); +throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed."); } -final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE); -buf.put(serializedKey); -buf.put((byte) (data.newValue != null ? 1 : 0)); - return buf.array(); } Review Comment: Thanks for your feedback! Just FYI, I've created the KIP [here](https://cwiki.apache.org/confluence/x/P5VbDg). And started a discussion thread
[GitHub] [kafka] clolov commented on pull request #13199: MINOR: Update build and test dependencies for 3.5
clolov commented on PR #13199: URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419072645 Do we use some tool which suggests these newer versions or we do the checks manually for each release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
Schm1tz1 commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1097355787 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} Review Comment: @OneCricketeer Good idea! Will have a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`
dajac commented on PR #13200: URL: https://github.com/apache/kafka/pull/13200#issuecomment-1419045542 @mimaison Could you review this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097338495 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097335270 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097332224 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097332224 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684639#comment-17684639 ] Nikolay Izhikov commented on KAFKA-14595: - [~omnia_h_ibrahim] Thanks to let me know! > Move ReassignPartitionsCommand to tools > --- > > Key: KAFKA-14595 > URL: https://issues.apache.org/jira/browse/KAFKA-14595 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14595) Move ReassignPartitionsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684635#comment-17684635 ] Omnia Ibrahim edited comment on KAFKA-14595 at 2/6/23 12:41 PM: Hi [~nizhikov], just a note, I moved the methods `{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and `{{{}TestUtils.removeReplicationThrottleForPartitions` from `{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used only }} by `TopicCommand` and `ReassignPartitionCommand`. To avoid the converting between Scala and Java collections. The changes are here [https://github.com/apache/kafka/pull/13201 |https://github.com/apache/kafka/pull/13201] was (Author: omnia_h_ibrahim): Hi [~nizhikov], just a note, I moved the methods `{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and `{{{}TestUtils.removeReplicationThrottleForPartitions` from `{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used only }} by `TopicCommand` and `ReassignPartitionCommand`. To avoid the converting between Scala and Java collections. The changes are here https://github.com/apache/kafka/pull/13201{{{}{}}} > Move ReassignPartitionsCommand to tools > --- > > Key: KAFKA-14595 > URL: https://issues.apache.org/jira/browse/KAFKA-14595 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684635#comment-17684635 ] Omnia Ibrahim commented on KAFKA-14595: --- Hi [~nizhikov], just a note, I moved the methods `{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and `{{{}TestUtils.removeReplicationThrottleForPartitions` from `{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used only }} by `TopicCommand` and `ReassignPartitionCommand`. To avoid the converting between Scala and Java collections. The changes are here https://github.com/apache/kafka/pull/13201{{{}{}}} > Move ReassignPartitionsCommand to tools > --- > > Key: KAFKA-14595 > URL: https://issues.apache.org/jira/browse/KAFKA-14595 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
ijuma commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097325246 ## storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochFileCache.java: ## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); Review Comment: My bad, we are calling the private `assign` overload., not the public one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] OmniaGM opened a new pull request, #13201: KAFKA-14596: Move TopicCommand to tools
OmniaGM opened a new pull request, #13201: URL: https://github.com/apache/kafka/pull/13201 This pr include - The changes include switching Scala code to java - Move TopicCommand and all test cases to tools - The PR depends on #13171 to replace the usage of `CoreUtils.duplicate` by `ToolsUtils.findDuplicates` Some implementation notes: - I added `ToolsTestUtils.createBrokerProperties` as a wrapper for `TestUtils.createBrokerConfig` to hide the conversion between Scala and Java types. - Replicated `TestUtils.setReplicationThrottleForPartitions` and `TestUtils.removeReplicationThrottleForPartitions` to `ToolsTestUtils` as the methods are used only `TopicCommandIntegrationTest` and `ReassignPartitionsIntegrationTest`. We need to remove it from `TestUtils` once we migrate `ReassignPartitions` - Replicated `TestInfoUtils.TestWithParameterizedQuorumName` to `ToolsTestUtils` as java convert this into a getter function which cannot be used with `ParameterizedTest` *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097238384 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.epoch; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.server.log.internals.EpochEntry; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); +} + +/** + * Assigns the supplied Leader Epoch to the supplied Offset + * Once the epoch is assigned it cannot be reassigned + */ +public void assign(int epoch, long startOffset) { +EpochEntry entry = new EpochEntry(epoch, startOffset); +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +flush(); +} +} + +public void assign(List entries) { +entries.forEach(entry -> { +if (assign(entry)) { +log.debug("Appended new epoch entry {}. Cache now contains {} entries.", entry, epochs.size()); +} +}); +if (entries.size() > 0) flush(); +} + +private boolean isUpdateNeeded(EpochEntry entry) { +return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch || entry.startOffset < epochEntry.startOffset).orElse(true); +} + +private boolean assign(EpochEntry entry) { +if (entry.epoch < 0 || entry.startOffset < 0) { +throw new IllegalArgumentException("Received invalid partition leader epoch entry " + entry); +} + +// Check whether the append is needed before acquiring the write lock +// in order to avoid contention with readers in the common case +if (!isUpdateNeeded(entry)) return false; + +lock.writeLock().lock(); +try { +if (isUpdateNeeded(entry)) { +maybeTruncateNonMonotonicEntries(entry); +epochs.put(entry.epoch, entry); +return true; +} else { +return false; +} +} finally { +lock.writeLock().unlock(); +} +} + +/** + * Remove any entries which violate monotonicity prior to appending a new entry + */ +private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) { +List removedEpochs = removeFromEnd(entry -> entry.epoch >= newEntry.epoch || entry.startOffset >= newEntry.startOffset); + + +if (removedEpochs.size() > 1 ||
[jira] [Assigned] (KAFKA-14578) Move ConsumerPerformance to tools
[ https://issues.apache.org/jira/browse/KAFKA-14578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri reassigned KAFKA-14578: --- Assignee: Federico Valeri > Move ConsumerPerformance to tools > - > > Key: KAFKA-14578 > URL: https://issues.apache.org/jira/browse/KAFKA-14578 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13128: MINOR: Define a root project name in the Gradle settings file
mimaison merged PR #13128: URL: https://github.com/apache/kafka/pull/13128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1097245192 ## storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochFileCache.java: ## @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.slf4j.Logger; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH; +import static org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET; + +/** + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + */ +public class LeaderEpochFileCache { +private final LeaderEpochCheckpoint checkpoint; +private final Logger log; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final TreeMap epochs = new TreeMap<>(); + +/** + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + */ +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +this.checkpoint = checkpoint; +LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); +log = logContext.logger(LeaderEpochFileCache.class); +checkpoint.read().forEach(this::assign); Review Comment: `assign(EpochEntry entry)` is already a private method that does not pass this instance outside this class. Am I missing anything here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13120: MINOR: Connect Javadocs improvements
mimaison merged PR #13120: URL: https://github.com/apache/kafka/pull/13120 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org