[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595613347 > Sorry I'm late to the party but I had a question on this: > > > It seems like @clolov is right, I tested it both in quorum and zk mode, Kafka successfully reconciles the questionable case (when X-1 on B comes back after A has compacted the logs), so I think it's fine to merge in this PR. > > I was also thinking of creating some integration test for this but it's hard to simulate disk errors in Java and we can't have any assumptions about where the tests run, so I think that should be a separate task as it's out of scope for this one. If you folks know a good fault injection framework, I'm all ears. > > Did we confirm that if B comes back it is cleaned and or resumes correctly? Yes, I found this issue online. I used this PR to fix our online code and found that it solved the problem very well. But online issue belongs to delete policy's case, and for compact policy, I'll test it next week. @jolshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595608359 > @viktorsomogyi Trogdor has some fault injection capabilities. I'm not sure if disk errors are among them, but it could probably be added. See https://github.com/apache/kafka/tree/trunk/trogdor/src/main/java/org/apache/kafka/trogdor/fault for the types of faults we currently support. Thanks! @mumrah , I'm going to learn about trogdor to see how to use it. In addition, if we add integration tests, put them in this PR, or need to open another PR? @viktorsomogyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] iblislin opened a new pull request, #13871: fix doc typo of StreamsBuilder.addGlobalStore
iblislin opened a new pull request, #13871: URL: https://github.com/apache/kafka/pull/13871 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout
José Armando García Sancio created KAFKA-15100: -- Summary: Unsafe to call tryCompleteFetchResponse on request timeout Key: KAFKA-15100 URL: https://issues.apache.org/jira/browse/KAFKA-15100 Project: Kafka Issue Type: Bug Components: kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio When the fetch request times out the future is completed from the "raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that tryCompleteFetchResponse is always called from the same thread. This invariant is violated in this case. {code:java} return future.handle((completionTimeMs, exception) -> { if (exception != null) { Throwable cause = exception instanceof ExecutionException ? exception.getCause() : exception; // If the fetch timed out in purgatory, it means no new data is available, // and we will complete the fetch successfully. Otherwise, if there was // any other error, we need to return it. Errors error = Errors.forException(cause); if (error != Errors.REQUEST_TIMED_OUT) { logger.info("Failed to handle fetch from {} at {} due to {}", replicaId, fetchPartition.fetchOffset(), error); return buildEmptyFetchResponse(error, Optional.empty()); } } // FIXME: `completionTimeMs`, which can be null logger.trace("Completing delayed fetch from {} starting at offset {} at {}", replicaId, fetchPartition.fetchOffset(), completionTimeMs); return tryCompleteFetchRequest(replicaId, fetchPartition, time.milliseconds()); }); {code} One solution is to always build an empty response if the future was completed exceptionally. This works because the ExpirationService completes the future with a `TimeoutException`. A longer-term solution is to use a more flexible event executor service. This would be a service that allows more kinds of event to get scheduled/submitted to the KRaft thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)
wcarlson5 commented on code in PR #13855: URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ## @@ -56,10 +75,63 @@ public void init(final ProcessorContext context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); +internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); +if (buffer.isPresent()) { +if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { Review Comment: If you find it easier to understand sure. that is fine ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -1256,10 +1261,25 @@ private KStream doStreamTableJoin(final KTable table, final NamedInternal renamed = new NamedInternal(joinedInternal.name()); final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME); + +Optional> buffer = Optional.empty(); + +if (joined.gracePeriod() != null) { Review Comment: I see what you mean but we actually were going to create the store and leave it empty for zero duration. The point was to make it easier to change the grace period if desired so the store isn't getting created and destroyed. Something about making it more backward compatible. ## streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java: ## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; Review Comment: yep sure. (Edit: The class it extents also uses junit 4 and I cant change this one without also chaining that and all other join integration tests) ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ## @@ -56,10 +75,63 @@ public void init(final ProcessorContext context) { final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); +internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); +if (buffer.isPresent()) { +if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { +throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); +} + +buffer.get().setSerdesIfNull(new SerdeGetter(context)); +//cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context) + buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null); +} } @Override public void process(final Record record) { +internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); +updateObservedStreamTime(record.timesta
[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
jolshan commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859633 ## core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala: ## @@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, thread.partitionCount) } + @Test + def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = { +val brokerId = 1 +val partitionId = 0 +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234")) + +val partition = Mockito.mock(classOf[Partition]) +val replicaManager = Mockito.mock(classOf[ReplicaManager]) +val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager]) +val futureLog = Mockito.mock(classOf[UnifiedLog]) +val logManager = Mockito.mock(classOf[LogManager]) + +val logs = new Pool[TopicPartition, UnifiedLog]() +logs.put(t1p0, futureLog) +val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = logs, + logDirFailureChannel = new LogDirFailureChannel(1), + time = new MockTime) + +val leaderEpoch = 5 +val logEndOffset = 0 + +when(partition.partitionId).thenReturn(partitionId) +when(replicaManager.metadataCache).thenReturn(metadataCache) +when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog) +when(replicaManager.futureLogExists(t1p0)).thenReturn(true) +when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition)) +when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) +when(replicaManager.logManager).thenReturn(logManager) +doAnswer(_ => { + logCleaner.abortAndPauseCleaning(t1p0) +}).when(logManager).abortAndPauseCleaning(t1p0) +doAnswer(_ => { + logCleaner.resumeCleaning(Seq(t1p0)) +}).when(logManager).resumeCleaning(t1p0) + +when(quotaManager.isQuotaExceeded).thenReturn(false) + +when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false)) + .thenReturn(new EpochEndOffset() +.setPartition(partitionId) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(leaderEpoch) +.setEndOffset(logEndOffset)) +when(partition.futureLocalLogOrException).thenReturn(futureLog) +doNothing().when(partition).truncateTo(offset = 0, isFuture = true) +when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false) + +when(futureLog.logStartOffset).thenReturn(0L) +when(futureLog.logEndOffset).thenReturn(0L) +when(futureLog.latestEpoch).thenReturn(None) + +val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, + config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) +val responseData = new FetchPartitionData( + Errors.NONE, + 0L, + 0L, + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))), + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) +mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) + +val endPoint = new BrokerEndPoint(0, "localhost", 1000) +val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, quotaManager) +val thread = new ReplicaAlterLogDirsThread( + "alter-logs-dirs-thread", + leader, + failedPartitions, + replicaManager, + quotaManager, + new BrokerTopicStats, + config.replicaFetchBackoffMs) + +// before starting the fetch, pause the clean of the partition. +logManager.abortAndPauseCleaning(t1p0) +thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, leaderEpoch))) +assertTrue(thread.fetchState(t1p0).isDefined) +assertEquals(1, thread.partitionCount) + +// first test: get handle without exception +when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenReturn(None) + +thread.doWork() + +assertTrue(thread.fetchState(t1p0).isDefined) +assertEquals(1, thread.partitionCount) +assertTrue(logCleaner.isCleaningInStatePaused(t1p0)) + +// second test: process partition data with throwing a KafkaStorageException. +when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error")) Review Comment: Do we have a way to verify the log is cleaned up after it is in failed state? Or in a state to resume correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
jolshan commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595410641 Sorry I'm late to the party but I had a question on this: >It seems like @clolov is right, I tested it both in quorum and zk mode, Kafka successfully reconciles the questionable case (when X-1 on B comes back after A has compacted the logs), so I think it's fine to merge in this PR. I was also thinking of creating some integration test for this but it's hard to simulate disk errors in Java and we can't have any assumptions about where the tests run, so I think that should be a separate task as it's out of scope for this one. If you folks know a good fault injection framework, I'm all ears. Did we confirm that if B comes back it is cleaned and or resumes correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
jolshan commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859633 ## core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala: ## @@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, thread.partitionCount) } + @Test + def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = { +val brokerId = 1 +val partitionId = 0 +val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "localhost:1234")) + +val partition = Mockito.mock(classOf[Partition]) +val replicaManager = Mockito.mock(classOf[ReplicaManager]) +val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager]) +val futureLog = Mockito.mock(classOf[UnifiedLog]) +val logManager = Mockito.mock(classOf[LogManager]) + +val logs = new Pool[TopicPartition, UnifiedLog]() +logs.put(t1p0, futureLog) +val logCleaner = new LogCleaner(new CleanerConfig(true), + logDirs = Array(TestUtils.tempDir()), + logs = logs, + logDirFailureChannel = new LogDirFailureChannel(1), + time = new MockTime) + +val leaderEpoch = 5 +val logEndOffset = 0 + +when(partition.partitionId).thenReturn(partitionId) +when(replicaManager.metadataCache).thenReturn(metadataCache) +when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog) +when(replicaManager.futureLogExists(t1p0)).thenReturn(true) +when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition)) +when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition) +when(replicaManager.logManager).thenReturn(logManager) +doAnswer(_ => { + logCleaner.abortAndPauseCleaning(t1p0) +}).when(logManager).abortAndPauseCleaning(t1p0) +doAnswer(_ => { + logCleaner.resumeCleaning(Seq(t1p0)) +}).when(logManager).resumeCleaning(t1p0) + +when(quotaManager.isQuotaExceeded).thenReturn(false) + +when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false)) + .thenReturn(new EpochEndOffset() +.setPartition(partitionId) +.setErrorCode(Errors.NONE.code) +.setLeaderEpoch(leaderEpoch) +.setEndOffset(logEndOffset)) +when(partition.futureLocalLogOrException).thenReturn(futureLog) +doNothing().when(partition).truncateTo(offset = 0, isFuture = true) +when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false) + +when(futureLog.logStartOffset).thenReturn(0L) +when(futureLog.logEndOffset).thenReturn(0L) +when(futureLog.latestEpoch).thenReturn(None) + +val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L, + config.replicaFetchMaxBytes, Optional.of(leaderEpoch)) +val responseData = new FetchPartitionData( + Errors.NONE, + 0L, + 0L, + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, "foo".getBytes(StandardCharsets.UTF_8))), + Optional.empty(), + OptionalLong.empty(), + Optional.empty(), + OptionalInt.empty(), + false) +mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, responseData) + +val endPoint = new BrokerEndPoint(0, "localhost", 1000) +val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, quotaManager) +val thread = new ReplicaAlterLogDirsThread( + "alter-logs-dirs-thread", + leader, + failedPartitions, + replicaManager, + quotaManager, + new BrokerTopicStats, + config.replicaFetchBackoffMs) + +// before starting the fetch, pause the clean of the partition. +logManager.abortAndPauseCleaning(t1p0) +thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, leaderEpoch))) +assertTrue(thread.fetchState(t1p0).isDefined) +assertEquals(1, thread.partitionCount) + +// first test: get handle without exception +when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenReturn(None) + +thread.doWork() + +assertTrue(thread.fetchState(t1p0).isDefined) +assertEquals(1, thread.partitionCount) +assertTrue(logCleaner.isCleaningInStatePaused(t1p0)) + +// second test: process partition data with throwing a KafkaStorageException. +when(partition.appendRecordsToFollowerOrFutureReplica(any(), ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error")) Review Comment: Do we have a way to verify the log is cleaned up after it is in failed state? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
jolshan commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859045 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -706,6 +706,8 @@ abstract class AbstractFetcherThread(name: String, *- the request succeeded or *- it was fenced and this thread hasn't received new epoch, which means we need not backoff and retry as the *partition is moved to failed state. + *- for ReplicaAlterLogDirsThread, "OutOfRange" error is generated when starting fetch to leader (source dir, Review Comment: Just for my understanding, the new replica starts with trying to fetch offset 0, and in the case 0 is not the first offset, throws OffsetOutOfRange? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…
jolshan commented on PR #13721: URL: https://github.com/apache/kafka/pull/13721#issuecomment-1595392012 Yup, looks like this is what the KIP mentioned. It's very possible this was just missed in the initial implementation. I don't think this should cause major issues since the defaults are still valid (retry backoff is only 100 ms) Though I do wonder if it is super necessary to include. In the case where the delivery timeout is just slightly larger than the request timeout, we wouldn't have the full amount of time to retry anyway. In that KIP, we don't account for the time to await sending either. On the other hand, I don't think it hurts a lot to have unless someone set their retry backoff super high. This could potentially cause more failures than before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
mumrah commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595373852 @viktorsomogyi Trogdor has some fault injection capabilities. I'm not sure if disk errors are among them, but it could probably be added. See https://github.com/apache/kafka/tree/trunk/trogdor/src/main/java/org/apache/kafka/trogdor/fault for the types of faults we currently support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #13870: KAFKA-14500; [5/N]
jeffkbkim opened a new pull request, #13870: URL: https://github.com/apache/kafka/pull/13870 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)
jolshan commented on PR #13848: URL: https://github.com/apache/kafka/pull/13848#issuecomment-1595350950 Btw, @dajac do we typically autocreate this topic? Just curious because I've seen some issues with __transaction_state not having enough replicas in tests (taking 10 or so seconds to get a valid isr) Just wondering if pre-creating the topic is a valid approach for tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15099) Flaky Test kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
Justine Olshan created KAFKA-15099: -- Summary: Flaky Test kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft Key: KAFKA-15099 URL: https://issues.apache.org/jira/browse/KAFKA-15099 Project: Kafka Issue Type: Bug Reporter: Justine Olshan This one often fails with: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 6ms while awaiting InitProducerId seems like a Kraft only issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest
jolshan commented on PR #13868: URL: https://github.com/apache/kafka/pull/13868#issuecomment-1595345954 Thanks for the fix. The thread leaks have been bugging me. Did we also want to set the hw argument to false here too? https://github.com/apache/kafka/blob/48a3aab85469836f107f4fd2af03d9fa3cb4e670/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala#L4033 I also noticed some lines didn't explicitly name the argument, but not sure if that's as important: https://github.com/apache/kafka/blob/48a3aab85469836f107f4fd2af03d9fa3cb4e670/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala#L142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest
jolshan commented on code in PR #13868: URL: https://github.com/apache/kafka/pull/13868#discussion_r1232797807 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -342,9 +347,7 @@ class ReplicaManagerTest { rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) assertTrue(appendResult.hasFired) -} finally { - rm.shutdown(checkpointHW = false) -} +} finally rm.shutdown(checkpointHW = false) Review Comment: should we be consistent with the new line vs no new line formatting? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest
jolshan commented on code in PR #13868: URL: https://github.com/apache/kafka/pull/13868#discussion_r1232796642 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -84,7 +84,7 @@ import scala.jdk.CollectionConverters._ class ReplicaManagerTest { val topic = "test-topic" - val topicId = Uuid.randomUuid() + val topicId: Uuid = Uuid.randomUuid() Review Comment: did we need this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set
[ https://issues.apache.org/jira/browse/KAFKA-15098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-15098: -- Description: [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to fatal exception java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration does not yet support authorizers. Remove authorizer.class.name before performing a migration. was: java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration does not yet support authorizers. Remove authorizer.class.name before performing a migration. > KRaft migration does not proceed and broker dies if authorizer.class.name is > set > > > Key: KAFKA-15098 > URL: https://issues.apache.org/jira/browse/KAFKA-15098 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.0 >Reporter: Ron Dagostino >Assignee: David Arthur >Priority: Blocker > > [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to > fatal exception > java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration > does not yet support authorizers. Remove authorizer.class.name before > performing a migration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set
Ron Dagostino created KAFKA-15098: - Summary: KRaft migration does not proceed and broker dies if authorizer.class.name is set Key: KAFKA-15098 URL: https://issues.apache.org/jira/browse/KAFKA-15098 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.5.0 Reporter: Ron Dagostino Assignee: David Arthur java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration does not yet support authorizers. Remove authorizer.class.name before performing a migration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vp224 closed pull request #13869: Ktls test
vp224 closed pull request #13869: Ktls test URL: https://github.com/apache/kafka/pull/13869 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vp224 opened a new pull request, #13869: Ktls test
vp224 opened a new pull request, #13869: URL: https://github.com/apache/kafka/pull/13869 Description: Rebased the repo with ktls-jni changes and modified the ktls-jni library version JIRA Ticket: https://jira01.corp.linkedin.com:8443/browse/LIKAFKA-53140 Changes: - Added log in SslTransportLayer.java to log when the write is performed with kernel tls enabled. - Changed the version of ktls-jni to 0.0.2 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights
divijvaidya commented on PR #13676: URL: https://github.com/apache/kafka/pull/13676#issuecomment-1595092684 @ijuma do you have any other questions wrt this PR? This is a useful feature to debug our flaky tests and I would like to ensure that your questions are addressed since you were involved in this review earlier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
machi1990 commented on PR #13865: URL: https://github.com/apache/kafka/pull/13865#issuecomment-1595032501 Thanks for the review @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest
divijvaidya opened a new pull request, #13868: URL: https://github.com/apache/kafka/pull/13868 Fixes thread leaks by closing the ReplicaManager properly at the end of each test. The leaks were leading to flaky test failures in ReplicaManagerTest with errors such as: ``` org.opentest4j.AssertionFailedError: Found unexpected 1 NonDaemon threads=kafka.server.ReplicaManagerTest:ReplicaFetcherThread-0-1 ==> expected: <0> but was: <1> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:560) at app//kafka.utils.TestUtils$.assertNoNonDaemonThreads(TestUtils.scala:1349) at app//kafka.server.ReplicaManagerTest.testBecomeFollowerWhileOldClientFetchInPurgatory(ReplicaManagerTest.scala:1911) ``` **Note to reviewers:** Since I have added try/finally for a lot of tests, I would recommend to use the "Hide Whitespaces" option of GitHub while looking at the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tkuramoto33 opened a new pull request, #13867: MINOR: Fix help message for kafka-metadata-shell.
tkuramoto33 opened a new pull request, #13867: URL: https://github.com/apache/kafka/pull/13867 The current help message for kafka-metadata-shell.sh is as follows. ``` $ ~/kafka_2.13-3.5.0/bin/kafka-metadata-shell.sh -h usage: metadata-tool [-h] [--snapshot SNAPSHOT] [command [command ...]] The Apache Kafka metadata tool (omitted.) ``` However, it should be as follows. ``` $ ~/kafka_2.13-3.5.0/bin/kafka-metadata-shell.sh -h usage: kafka-metadata-shell [-h] [--snapshot SNAPSHOT] [command [command ...]] The Apache Kafka metadata shell (omitted.) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13854: MINOR: rat should depend on processMessages task
dajac merged PR #13854: URL: https://github.com/apache/kafka/pull/13854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14112) Expose replication-offset-lag Mirror metric
[ https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elkhan Eminov updated KAFKA-14112: -- Description: The offset lag is the difference of the last replicated record's (LRO) source offset and the end offset of the source (LEO). The offset lag is a difference (LRO-LEO), but its constituents calculated at different points of time and place * LEO shall be calculated during source task's poll loop (ready to get it from the consumer) * LRO shall be kept in an in-memory "cache", that is updated during the task's producer callback LRO is initialized when task is started, from the offset store. The difference shall be calculated when the freshest LEO acquired in the poll loop. The calculated amount shall be defined as a MirrorMaker metric. This would describe to amount of "to be replicated" number of records for a certain topic-partition. was: The offset lag is the difference of the last replicated record's source offset and the end offset of the source. The offset lag is a difference (LRO-LEO), but its constituents calculated at different points of time and place * LEO shall be calculated during source task's poll loop (ready to get it from the consumer) * LRO shall be kept in an in-memory "cache", that is updated during the task's producer callback LRO is initialized when task is started, from the offset store. The difference shall be calculated when the freshest LEO acquired in the poll loop. The calculated amount shall be defined as a MirrorMaker metric. This would describe to amount of "to be replicated" number of records for a certain topic-partition. > Expose replication-offset-lag Mirror metric > --- > > Key: KAFKA-14112 > URL: https://issues.apache.org/jira/browse/KAFKA-14112 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Elkhan Eminov >Assignee: Elkhan Eminov >Priority: Minor > > The offset lag is the difference of the last replicated record's (LRO) source > offset and the end offset of the source (LEO). > The offset lag is a difference (LRO-LEO), but its constituents calculated at > different points of time and place > * LEO shall be calculated during source task's poll loop (ready to get it > from the consumer) > * LRO shall be kept in an in-memory "cache", that is updated during the > task's producer callback > LRO is initialized when task is started, from the offset store. The > difference shall be calculated when the freshest LEO acquired > in the poll loop. The calculated amount shall be defined as a MirrorMaker > metric. > This would describe to amount of "to be replicated" number of records for a > certain topic-partition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task
dajac commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594960723 The number if failed tests is scary but none of them are related to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 opened a new pull request, #13866: MINOR: update LogCleaner.scala javadoc with a link to OffsetMap to help with code navigation in IDE
machi1990 opened a new pull request, #13866: URL: https://github.com/apache/kafka/pull/13866 I was going through the LogCleaner.scala file to familiarise myself with that part of the codebase and I noticed this minor improvement to help with code navigation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods
hudeqi commented on PR #13839: URL: https://github.com/apache/kafka/pull/13839#issuecomment-1594953593 > Thanks for the PR @hudeqi. I think it's always good to improve JavaDocs or ScalaDocs. > > I was looking at the `LogCleaner.scala` file and I saw there are plenty of methods that are public which have only a very general documentation and they don't have any parameter annotation with documentation. For example `abortCleaning` in line 220 and some of the following methods. > > For the sake of completion, would you be up to adding the missing annotations to the methods that are public? Extra mile for all the ones that are package-log-protected (`private[log]`) Hi, thanks for the comments, I have added the comments of the methods you mentioned, please help to review, thank you! @jlprat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.
[ https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukesh Mishra updated KAFKA-15097: -- Description: Currently we are facing NoSuchFileException in LogCleaner, which is critical error as result platform get shutown. We are running Kafka in Kraft Mode. The cluster is having 3 Node. Kafka version - 3.3.1 We are facing issue systematically, which occurs after retention reached. Here's the logs of Node 1 for consumer offset 4146 for which we have faced NoSuchFileException on 24 may 2023 Config : [^config.txt] ^Logs :^ ^Server Node 1 [^server.log]^ Log Cleaner : [^log-cleaner.log] ^__consumer_offset_49 directory logs :^ ^!__consumer_offset_49.png!^ we suspect that swap operation is culprit as we can see swap file for consumer offset 4146 for __consumer_offset_49 was: Currently we are facing NoSuchFileException in LogCleaner, which is critical error as result platform get shutown. We are running Kafka in Kraft Mode. The cluster is having 3 Node. Kafka version - 3.3.1 We are facing issue systematically, which occurs after retention reached. Here's the logs of Node 1 for consumer offset 4146 for which we have faced NoSuchFileException on 24 may 2023 Config : [^config.txt] ^Logs :^ ^Server Node 1 [^server.log]^ ^Log Cleaner : [^log-cleaner.log] ^ ^__consumer_offset_49 directory logs :^ ^!__consumer_offset_49.png!^ we suspect that swap operation is culprit as we can see swap file for consumer offset 4146 for __consumer_offset_49 > NoSuchFileException in LogCleaner Operation. > > > Key: KAFKA-15097 > URL: https://issues.apache.org/jira/browse/KAFKA-15097 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 3.3.1 >Reporter: Mukesh Mishra >Priority: Blocker > Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, > server.log > > > Currently we are facing NoSuchFileException in LogCleaner, which is critical > error as result platform get shutown. > We are running Kafka in Kraft Mode. > The cluster is having 3 Node. > Kafka version - 3.3.1 > We are facing issue systematically, which occurs after retention reached. > Here's the logs of Node 1 for consumer offset 4146 for which we have faced > NoSuchFileException on 24 may 2023 > Config : > [^config.txt] > ^Logs :^ > ^Server Node 1 [^server.log]^ > Log Cleaner : > [^log-cleaner.log] > ^__consumer_offset_49 directory logs :^ > ^!__consumer_offset_49.png!^ > we suspect that swap operation is culprit as we can see swap file for > consumer offset 4146 for __consumer_offset_49 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.
[ https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukesh Mishra updated KAFKA-15097: -- Affects Version/s: 3.3.1 > NoSuchFileException in LogCleaner Operation. > > > Key: KAFKA-15097 > URL: https://issues.apache.org/jira/browse/KAFKA-15097 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 3.3.1 >Reporter: Mukesh Mishra >Priority: Blocker > Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, > server.log > > > Currently we are facing NoSuchFileException in LogCleaner, which is critical > error as result platform get shutown. > We are running Kafka in Kraft Mode. > The cluster is having 3 Node. > Kafka version - 3.3.1 > We are facing issue systematically, which occurs after retention reached. > Here's the logs of Node 1 for consumer offset 4146 for which we have faced > NoSuchFileException on 24 may 2023 > Config : > [^config.txt] > ^Logs :^ > ^Server Node 1 [^server.log]^ > ^Log Cleaner : > [^log-cleaner.log] > ^ > ^__consumer_offset_49 directory logs :^ > ^!__consumer_offset_49.png!^ > we suspect that swap operation is culprit as we can see swap file for > consumer offset 4146 for __consumer_offset_49 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #13864: Revert "MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits"
C0urante commented on PR #13864: URL: https://github.com/apache/kafka/pull/13864#issuecomment-1594925376 Thanks Josep! Will merge without awaiting CI results as this is a clean revert and I've verified locally that the `MirrorConnectorsIntegrationTransactionsTest::testReplication` test case passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka
[ https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733576#comment-17733576 ] Manyanda Chitimbo edited comment on KAFKA-15096 at 6/16/23 4:09 PM: Thank you for reporting the issue [~Sasikumarms] an PR has been opened in [https://github.com/apache/kafka/pull/13865] to bump the version. Once merged, I'll let the release managers determine how far the fix can be backported. was (Author: JIRAUSER299903): https://github.com/apache/kafka/pull/13865 > CVE 2023-34455 - Vulnerability identified with Apache kafka > --- > > Key: KAFKA-15096 > URL: https://issues.apache.org/jira/browse/KAFKA-15096 > Project: Kafka > Issue Type: Bug >Reporter: Sasikumar Muthukrishnan Sampath >Assignee: Manyanda Chitimbo >Priority: Major > > A new vulnerability CVE-2023-34455 is identified with apache kafka > dependency. The vulnerability is coming from snappy-java:1.1.8.4 > Version 1.1.10.1 contains a patch for this issue. Please upgrade the > snappy-java version to fix this issue > > snappy-java is a fast compressor/decompressor for Java. Due to use of an > unchecked chunk length, an unrecoverable fatal error can occur in versions > prior to 1.1.10.1. > The code in the function hasNextChunk in the fileSnappyInputStream.java > checks if a given stream has more chunks to read. It does that by attempting > to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function > returns false. Otherwise, if 4 bytes were available, the code treats them as > the length of the next chunk. > In the case that the `compressed` variable is null, a byte array is allocated > with the size given by the input data. Since the code doesn’t test the > legality of the `chunkSize` variable, it is possible to pass a negative > number (such as 0x which is -1), which will cause the code to raise a > `java.lang.NegativeArraySizeException` exception. A worse case would happen > when passing a huge positive value (such as 0x7FFF), which would raise > the fatal `java.lang.OutOfMemoryError` error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.
[ https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mukesh Mishra updated KAFKA-15097: -- Priority: Blocker (was: Critical) > NoSuchFileException in LogCleaner Operation. > > > Key: KAFKA-15097 > URL: https://issues.apache.org/jira/browse/KAFKA-15097 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Mukesh Mishra >Priority: Blocker > Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, > server.log > > > Currently we are facing NoSuchFileException in LogCleaner, which is critical > error as result platform get shutown. > We are running Kafka in Kraft Mode. > The cluster is having 3 Node. > Kafka version - 3.3.1 > We are facing issue systematically, which occurs after retention reached. > Here's the logs of Node 1 for consumer offset 4146 for which we have faced > NoSuchFileException on 24 may 2023 > Config : > [^config.txt] > ^Logs :^ > ^Server Node 1 [^server.log]^ > ^Log Cleaner : > [^log-cleaner.log] > ^ > ^__consumer_offset_49 directory logs :^ > ^!__consumer_offset_49.png!^ > we suspect that swap operation is culprit as we can see swap file for > consumer offset 4146 for __consumer_offset_49 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] machi1990 commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
machi1990 commented on PR #13865: URL: https://github.com/apache/kafka/pull/13865#issuecomment-1594915227 @showuon @mimaison @divijvaidya can one of you have a look? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 opened a new pull request, #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
machi1990 opened a new pull request, #13865: URL: https://github.com/apache/kafka/pull/13865 Upgrade to 1.1.10.1 to fix CVE 2023-34455 The release notes are available at https://github.com/xerial/snappy-java/releases/tag/v1.1.10.1 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.
Mukesh Mishra created KAFKA-15097: - Summary: NoSuchFileException in LogCleaner Operation. Key: KAFKA-15097 URL: https://issues.apache.org/jira/browse/KAFKA-15097 Project: Kafka Issue Type: Bug Components: log cleaner Reporter: Mukesh Mishra Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, server.log Currently we are facing NoSuchFileException in LogCleaner, which is critical error as result platform get shutown. We are running Kafka in Kraft Mode. The cluster is having 3 Node. Kafka version - 3.3.1 We are facing issue systematically, which occurs after retention reached. Here's the logs of Node 1 for consumer offset 4146 for which we have faced NoSuchFileException on 24 may 2023 Config : [^config.txt] ^Logs :^ ^Server Node 1 [^server.log]^ ^Log Cleaner : [^log-cleaner.log] ^ ^__consumer_offset_49 directory logs :^ ^!__consumer_offset_49.png!^ we suspect that swap operation is culprit as we can see swap file for consumer offset 4146 for __consumer_offset_49 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka
[ https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manyanda Chitimbo reassigned KAFKA-15096: - Assignee: Manyanda Chitimbo > CVE 2023-34455 - Vulnerability identified with Apache kafka > --- > > Key: KAFKA-15096 > URL: https://issues.apache.org/jira/browse/KAFKA-15096 > Project: Kafka > Issue Type: Bug >Reporter: Sasikumar Muthukrishnan Sampath >Assignee: Manyanda Chitimbo >Priority: Major > > A new vulnerability CVE-2023-34455 is identified with apache kafka > dependency. The vulnerability is coming from snappy-java:1.1.8.4 > Version 1.1.10.1 contains a patch for this issue. Please upgrade the > snappy-java version to fix this issue > > snappy-java is a fast compressor/decompressor for Java. Due to use of an > unchecked chunk length, an unrecoverable fatal error can occur in versions > prior to 1.1.10.1. > The code in the function hasNextChunk in the fileSnappyInputStream.java > checks if a given stream has more chunks to read. It does that by attempting > to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function > returns false. Otherwise, if 4 bytes were available, the code treats them as > the length of the next chunk. > In the case that the `compressed` variable is null, a byte array is allocated > with the size given by the input data. Since the code doesn’t test the > legality of the `chunkSize` variable, it is possible to pass a negative > number (such as 0x which is -1), which will cause the code to raise a > `java.lang.NegativeArraySizeException` exception. A worse case would happen > when passing a huge positive value (such as 0x7FFF), which would raise > the fatal `java.lang.OutOfMemoryError` error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW
maniekes commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594900986 some extra research. i rolled back changes for cygpath --windows in MSYS2 MINGW64 i got following error: ```bash *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka $ ./bin/kafka-server-start.sh config/server.properties [0.003s][error][logging] Error opening log file 'C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M': Invalid argument [0.003s][error][logging] Initialization of output 'file=C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M' using options '(null)' failed. Invalid -Xlog option '-Xlog:gc*:file=C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M', see error log for details. Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit. ``` after i add this to script it work fine(and logs are now properly displayed): ```bash export MSYS2_ARG_CONV_EXCL="*" # this will do the same, but only for 2 faulty arguments: export MSYS2_ARG_CONV_EXCL="-Xlog:gc*:file=;-Dlog4j.configuration=" (since i preffer lean solutions i added this verion to script) ``` in raw MSYS2 i got following error: ```bash *@@rafal-laptok MSYS /c/rafal/git/maniekes-kafka $ ./bin/kafka-server-start.sh config/server.properties /c/rafal/git/maniekes-kafka/bin/kafka-run-class.sh: linia 352: C:\Program Files\java\jdk-19.0.1/bin/java: Bad address /c/rafal/git/maniekes-kafka/bin/kafka-run-class.sh: linia 352: C:\Program Files\java\jdk-19.0.1/bin/java: No error ``` it was caused by this line: https://github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh#L243 for some strange reason MSYS2 is not allowing to set `KAFKA_LOG4J_OPTS` in this way. was getting "Bad address" even if i removed usages of `KAFKA_LOG4J_OPTS` from the very bottom of the script. i tried to workaround this and finally i decided just to change name: ```bash KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS" ``` wchich worked perfectly for all 4 shelll. here are test results after my commit: ## git bash ```bash *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk) $ uname -a MINGW64_NT-10.0-22621 rafal-laptok 3.3.6-341.x86_64 2022-09-05 20:28 UTC x86_64 Msys *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk) $ ./bin/kafka-server-start.sh config/server.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] [2023-06-16 17:43:05,794] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2023-06-16 17:43:05,950] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2023-06-16 17:43:06,014] INFO starting (kafka.server.KafkaServer) [2023-06-16 17:43:06,015] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2023-06-16 17:43:06,023] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) [2023-06-16 17:43:06,042] INFO Client environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT (org.apache.zookeeper.ZooKeeper) [2023-06-16 17:43:06,042] INFO Client environment:host.name=192.168.1.137 (org.apache.zookeeper.ZooKeeper) [2023-06-16 17:43:06,044] INFO Client environment:java.version=19.0.1 (org.apache.zookeeper.ZooKeeper) [2023-06-16 17:43:06,044] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2023-06-16 17:43:06,044] INFO Client environment:java.home=C:\Program Files\java\jdk-19.0.1 (org.apache.zookeeper.ZooKeeper) (... rest of logs skipped, all works fine) ``` ## MSYS2 MINGW ```bash *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka $ uname -a MINGW64_NT-10.0-22621 r
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1594893813 Thanks for your comment, I have merged the latest trunk to see if it can pass the CI check. @viktorsomogyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka
[ https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sasikumar Muthukrishnan Sampath updated KAFKA-15096: Description: A new vulnerability CVE-2023-34455 is identified with apache kafka dependency. The vulnerability is coming from snappy-java:1.1.8.4 Version 1.1.10.1 contains a patch for this issue. Please upgrade the snappy-java version to fix this issue snappy-java is a fast compressor/decompressor for Java. Due to use of an unchecked chunk length, an unrecoverable fatal error can occur in versions prior to 1.1.10.1. The code in the function hasNextChunk in the fileSnappyInputStream.java checks if a given stream has more chunks to read. It does that by attempting to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function returns false. Otherwise, if 4 bytes were available, the code treats them as the length of the next chunk. In the case that the `compressed` variable is null, a byte array is allocated with the size given by the input data. Since the code doesn’t test the legality of the `chunkSize` variable, it is possible to pass a negative number (such as 0x which is -1), which will cause the code to raise a `java.lang.NegativeArraySizeException` exception. A worse case would happen when passing a huge positive value (such as 0x7FFF), which would raise the fatal `java.lang.OutOfMemoryError` error. was: A new vulnerability CVE-2023-34455 is identified with camel-kafka dependencies. The vulnerability is coming from snappy-java:1.1.8.4 Version 1.1.10.1 contains a patch for this issue. Please upgrade the snappy-java version to fix this issue snappy-java is a fast compressor/decompressor for Java. Due to use of an unchecked chunk length, an unrecoverable fatal error can occur in versions prior to 1.1.10.1. The code in the function hasNextChunk in the fileSnappyInputStream.java checks if a given stream has more chunks to read. It does that by attempting to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function returns false. Otherwise, if 4 bytes were available, the code treats them as the length of the next chunk. In the case that the `compressed` variable is null, a byte array is allocated with the size given by the input data. Since the code doesn’t test the legality of the `chunkSize` variable, it is possible to pass a negative number (such as 0x which is -1), which will cause the code to raise a `java.lang.NegativeArraySizeException` exception. A worse case would happen when passing a huge positive value (such as 0x7FFF), which would raise the fatal `java.lang.OutOfMemoryError` error. > CVE 2023-34455 - Vulnerability identified with Apache kafka > --- > > Key: KAFKA-15096 > URL: https://issues.apache.org/jira/browse/KAFKA-15096 > Project: Kafka > Issue Type: Bug >Reporter: Sasikumar Muthukrishnan Sampath >Priority: Major > > A new vulnerability CVE-2023-34455 is identified with apache kafka > dependency. The vulnerability is coming from snappy-java:1.1.8.4 > Version 1.1.10.1 contains a patch for this issue. Please upgrade the > snappy-java version to fix this issue > > snappy-java is a fast compressor/decompressor for Java. Due to use of an > unchecked chunk length, an unrecoverable fatal error can occur in versions > prior to 1.1.10.1. > The code in the function hasNextChunk in the fileSnappyInputStream.java > checks if a given stream has more chunks to read. It does that by attempting > to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function > returns false. Otherwise, if 4 bytes were available, the code treats them as > the length of the next chunk. > In the case that the `compressed` variable is null, a byte array is allocated > with the size given by the input data. Since the code doesn’t test the > legality of the `chunkSize` variable, it is possible to pass a negative > number (such as 0x which is -1), which will cause the code to raise a > `java.lang.NegativeArraySizeException` exception. A worse case would happen > when passing a huge positive value (such as 0x7FFF), which would raise > the fatal `java.lang.OutOfMemoryError` error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka
Sasikumar Muthukrishnan Sampath created KAFKA-15096: --- Summary: CVE 2023-34455 - Vulnerability identified with Apache kafka Key: KAFKA-15096 URL: https://issues.apache.org/jira/browse/KAFKA-15096 Project: Kafka Issue Type: Bug Reporter: Sasikumar Muthukrishnan Sampath A new vulnerability CVE-2023-34455 is identified with camel-kafka dependencies. The vulnerability is coming from snappy-java:1.1.8.4 Version 1.1.10.1 contains a patch for this issue. Please upgrade the snappy-java version to fix this issue snappy-java is a fast compressor/decompressor for Java. Due to use of an unchecked chunk length, an unrecoverable fatal error can occur in versions prior to 1.1.10.1. The code in the function hasNextChunk in the fileSnappyInputStream.java checks if a given stream has more chunks to read. It does that by attempting to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function returns false. Otherwise, if 4 bytes were available, the code treats them as the length of the next chunk. In the case that the `compressed` variable is null, a byte array is allocated with the size given by the input data. Since the code doesn’t test the legality of the `chunkSize` variable, it is possible to pass a negative number (such as 0x which is -1), which will cause the code to raise a `java.lang.NegativeArraySizeException` exception. A worse case would happen when passing a huge positive value (such as 0x7FFF), which would raise the fatal `java.lang.OutOfMemoryError` error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac opened a new pull request, #13863: MINOR: Reduce flakiness of testRackAwareRangeAssignor, second try
dajac opened a new pull request, #13863: URL: https://github.com/apache/kafka/pull/13863 This test still fails regularly with the following error: ``` Error java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Timed out while awaiting expected assignment Set(topicWithAllPartitionsOnAllRacks-0, topicWithSingleRackPartitions-0). The current assignment is [] Stacktrace java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: Timed out while awaiting expected assignment Set(topicWithAllPartitionsOnAllRacks-0, topicWithSingleRackPartitions-0). The current assignment is [] at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(FetchFromFollowerIntegrationTest.scala:211) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9$adapted(FetchFromFollowerIntegrationTest.scala:211) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) ``` I propose to increase the timeouts to 30 secs to mitigate it. The test already uses 30 secs timeouts in many places. This patch uses 30 secs everywhere. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #13257: MINOR: Add ZK migration docs to the packaged docs
mumrah merged PR #13257: URL: https://github.com/apache/kafka/pull/13257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12976: Improve TestUtils for temporary properties file
divijvaidya commented on code in PR #12976: URL: https://github.com/apache/kafka/pull/12976#discussion_r1232345504 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -156,6 +156,21 @@ object TestUtils extends Logging { */ def tempFile(prefix: String, suffix: String): File = JTestUtils.tempFile(prefix, suffix) + /** + * Create a file with the given contents in the default temporary-file directory, + * using `kafka` as the prefix and `tmp` as the suffix to generate its name. + */ + def tempFile(contents: String): File = JTestUtils.tempFile(contents) + + def tempPropertiesFile(properties: Properties): File = { +return tempPropertiesFile(properties.asScala) + } + + def tempPropertiesFile(properties: Map[String, String]): File = { +val content = properties.map{case (k, v) => k + "=" + v}.mkString("\n") Review Comment: please use `System.lineSeparator()` instead of `\n`. It's a much safer way to write newline characters that make the code system independent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart
tombentley commented on code in PR #13862: URL: https://github.com/apache/kafka/pull/13862#discussion_r1232343003 ## docs/streams/quickstart.html: ## @@ -91,8 +91,8 @@ Step https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz"; title="Kafka downloads">Download the {{fullDotVersion}} release and un-tar it. Note that there are multiple downloadable Scala versions and we choose to use the recommended version ({{scalaVersion}}) here: -> tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz -> cd kafka_{{scalaVersion}}-{{fullDotVersion}} + $ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz Review Comment: Do we need the initial two spaces? ## docs/quickstart.html: ## @@ -215,19 +215,17 @@ Edit the config/connect-standalone.properties file, add or change the plugin.path configuration property match the following, and save the file: - -> echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" +$ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" Review Comment: ```suggestion $ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" >> config/connect-standalone.properties ``` ## docs/quickstart.html: ## @@ -215,19 +215,17 @@ Edit the config/connect-standalone.properties file, add or change the plugin.path configuration property match the following, and save the file: Review Comment: We don't actually mean it "should match the following" if _the following_ is some bash commands. ```suggestion Edit the config/connect-standalone.properties file, adding or changing the plugin.path configuration property to include libs/connect-file-{{fullDotVersion}}.jar and save the file. Alternatively, in your terminal session you can use the following command: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zigarn commented on pull request #12976: Improve TestUtils for temporary properties file
zigarn commented on PR #12976: URL: https://github.com/apache/kafka/pull/12976#issuecomment-1594798209 @divijvaidya: rebased! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] justinrlee commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file
justinrlee commented on PR #12797: URL: https://github.com/apache/kafka/pull/12797#issuecomment-1594792962 Sorry for the delay here! I recently went on paternity leave but will pick this up (and add tests) when I get back in a month or two. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation
divijvaidya commented on PR #13099: URL: https://github.com/apache/kafka/pull/13099#issuecomment-1594751739 @showuon rebase from trunk please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12976: Improve TestUtils for temporary properties file
divijvaidya commented on PR #12976: URL: https://github.com/apache/kafka/pull/12976#issuecomment-1594744324 @zigarn can you please rebase this PR with trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
C0urante commented on PR #13819: URL: https://github.com/apache/kafka/pull/13819#issuecomment-1594743764 Thanks guys! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12852: Enhance the robustness of the program
divijvaidya commented on PR #12852: URL: https://github.com/apache/kafka/pull/12852#issuecomment-1594740256 Hey @likeyoukang Thank you for your first contribution. Could you please rebase this PR from trunk? And also change the title of the PR to "MINOR: Use "constant equals object" instead of "object equals Constant"" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya closed pull request #12905: fix check for MinGW64 compatibility
divijvaidya closed pull request #12905: fix check for MinGW64 compatibility URL: https://github.com/apache/kafka/pull/12905 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12905: fix check for MinGW64 compatibility
divijvaidya commented on PR #12905: URL: https://github.com/apache/kafka/pull/12905#issuecomment-1594742425 We have another PR https://github.com/apache/kafka/pull/13321 opened for this. Closing this in favour of the other one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file
divijvaidya commented on PR #12797: URL: https://github.com/apache/kafka/pull/12797#issuecomment-1594727580 ping @justinrlee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write URL: https://github.com/apache/kafka/pull/12636 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
divijvaidya commented on PR #12636: URL: https://github.com/apache/kafka/pull/12636#issuecomment-1594725046 > I uploaded a new version at https://github.com/apache/kafka/pull/12662 In that case, closing this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #13257: MINOR: Add ZK migration docs to the packaged docs
mumrah commented on PR #13257: URL: https://github.com/apache/kafka/pull/13257#issuecomment-1594723400 Thanks @mimaison, updated with your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write
divijvaidya commented on PR #12662: URL: https://github.com/apache/kafka/pull/12662#issuecomment-1594720549 @cmccabe I guess this can be closed now since it is superseded by https://github.com/apache/kafka/pull/13437 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12667: KAFKA-10635: Improve logging with throwing OOOSE
divijvaidya commented on PR #12667: URL: https://github.com/apache/kafka/pull/12667#issuecomment-1594713861 @nicktelford ping -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13201: KAFKA-14596: Move TopicCommand to tools
divijvaidya commented on PR #13201: URL: https://github.com/apache/kafka/pull/13201#issuecomment-1594708038 @OmniaGM can you please rebase this with trunk and I would be happy to begin a review for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13321: MINOR: added compatibility for MinGW
divijvaidya commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594696603 > hope you test is also in pipeline for cygwin to be double sure No, we don't have any tests based on windows (or cygwin) in our pipeline. We don't officially promise to support Kafka on windows platform. No pressure but we are relying on your testing here :) > what i noticed that msys2 was not reading paths properly because path translator for cygwin was adding backslashes instead of slashes msys2 uses the same `cygpath` as cygwin so why isn't it working for msys2. Do you have any idea? My concern with using `--windows` instead of `--mixed` is that this would be a breaking change for people who have written scripts around this bug/feature of using the `/` slash. Hence, ideally I think it would be preferable to keep the existing behaviour as same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13858: MINOR: Add more information in assertion failure for non daemon threads
divijvaidya merged PR #13858: URL: https://github.com/apache/kafka/pull/13858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13858: MINOR: Add more information in assertion failure for non daemon threads
divijvaidya commented on PR #13858: URL: https://github.com/apache/kafka/pull/13858#issuecomment-1594662486 Unrelated test failures: ``` [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplicateTargetDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_8_and_Scala_2_12___testReplicateTargetDefault__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders___2/) [Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance__/) [Build / JDK 11 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance___2/) [Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testSendOffsetsToTransactionTimeout_String__quorum_kraft/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeFromEosAlphaToEosV2_true_/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_17_and_Scala_2_13___testReplication__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_17_and_Scala_2_13___testReplication___2/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_8_and_Scala_2_12___testReplication__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_8_and_Scala_2_12___testReplication___2/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplication__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplication___2/) ``` The MirrorMaker failures are newly introduced in trunk. Existing ticket https://issues.apache.org/jira/browse/KAFKA-15052 for testBalancePartitionLeaders() Existing ticket https://issues.apache.org/jira/browse/KAFKA-8529 for testCloseDuringRebalance() Added new ticket https://issues.apache.org/jira/browse/KAFKA-15095 for shouldUpgradeFromEosAlphaToEosV2() -- This is an aut
[jira] [Created] (KAFKA-15095) Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]
Divij Vaidya created KAFKA-15095: Summary: Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true] Key: KAFKA-15095 URL: https://issues.apache.org/jira/browse/KAFKA-15095 Project: Kafka Issue Type: Test Components: unit tests Reporter: Divij Vaidya {code:java} java.lang.AssertionError: Did not receive all 178 records from topic multiPartitionOutputTopic within 6 ms, currently accumulated data is [KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105), KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), KeyValue(1, 210), KeyValue(1, 231), KeyValue(1, 253), KeyValue(1, 276), KeyValue(1, 300), KeyValue(1, 325), KeyValue(1, 351), KeyValue(1, 378), KeyValue(1, 406), KeyValue(1, 435), KeyValue(2, 0), KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 435), KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105), KeyValue(0, 120), KeyValue(0, 136), KeyValue(0, 153), KeyValue(0, 171), KeyValue(0, 190), KeyValue(0, 210), KeyValue(0, 231), KeyValue(0, 253), KeyValue(0, 276), KeyValue(0, 300), KeyValue(0, 325), KeyValue(0, 351), KeyValue(0, 378), KeyValue(0, 406), KeyValue(0, 435), KeyValue(0, 325), KeyValue(0, 351), KeyValue(0, 378), KeyValue(0, 406), KeyValue(0, 435), KeyValue(3, 0), KeyValue(3, 1), KeyValue(3, 3), KeyValue(3, 6), KeyValue(3, 10), KeyValue(3, 15), KeyValue(3, 21), KeyValue(3, 28), KeyValue(3, 36), KeyValue(3, 45), KeyValue(3, 55), KeyValue(3, 66), KeyValue(3, 78), KeyValue(3, 91), KeyValue(3, 105), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 210), KeyValue(3, 231), KeyValue(3, 253), KeyValue(3, 276), KeyValue(3, 300), KeyValue(3, 325), KeyValue(3, 351), KeyValue(3, 378), KeyValue(3, 406), KeyValue(3, 435), KeyValue(1, 465), KeyValue(1, 496), KeyValue(0, 465), KeyValue(0, 496), KeyValue(3, 465), KeyValue(3, 496), KeyValue(0, 528), KeyValue(1, 528), KeyValue(0, 561), KeyValue(1, 561), KeyValue(0, 595), KeyValue(3, 528), KeyValue(2, 465), KeyValue(3, 561), KeyValue(2, 496), KeyValue(3, 595), KeyValue(2, 528), KeyValue(2, 561), KeyValue(2, 595)]Expected: is a value equal to or greater than <178> but: <164> was less than <178> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:347) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699) at org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.readResult(EosV2UpgradeIntegrationTest.java:1080) at org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.verifyUncommitted(EosV2UpgradeIntegrationTest.java:1055) at org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2(EosV2UpgradeIntegrationTest.java:705) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW
maniekes commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594651760 okay, i think i found root cause for it: https://github.com/apache/kafka/blob/trunk/bin/kafka-server-start.sh#L25 https://github.com/apache/kafka/blob/trunk/bin/zookeeper-server-start.sh#L25 cygpath is not changing it in `kafka-run-class.sh` because it's already set in main script. the same logic is also in a few other files. do you think we should modify it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15066) passing listener name config into TopicBasedRemoteLogMetadataManagerConfig
[ https://issues.apache.org/jira/browse/KAFKA-15066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-15066. --- Fix Version/s: 3.6.0 Resolution: Fixed > passing listener name config into TopicBasedRemoteLogMetadataManagerConfig > -- > > Key: KAFKA-15066 > URL: https://issues.apache.org/jira/browse/KAFKA-15066 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.6.0 > > > The `remote.log.metadata.manager.listener.name` config doesn't pass to > TopicBasedRemoteLogMetadataManagerConfig correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon merged PR #13828: URL: https://github.com/apache/kafka/pull/13828 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task
dajac commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594627878 ``` % ./gradlew rat --dry-run > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id 55373f1f) using Gradle 8.1.1, Java 17 and Scala 2.13.10 Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0 :generator:compileJava SKIPPED :generator:processResources SKIPPED :generator:classes SKIPPED :generator:jar SKIPPED :clients:processMessages SKIPPED :clients:processTestMessages SKIPPED :core:processMessages SKIPPED :group-coordinator:processMessages SKIPPED :metadata:processMessages SKIPPED :raft:processMessages SKIPPED :storage:processMessages SKIPPED :streams:processMessages SKIPPED :rat SKIPPED ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW
maniekes commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594625896 ok, good catch with MSYS2 @divijvaidya. i have installed git bash, msys2, cygwin and tested on all of them(on msys2 i testes on msys2-mingw and on pure msys2). what i noticed that msys2 was not reading paths properly because path translator for cygwin was adding backslashes instead of slashes (option --mixed vs --windows in cygpath, see my last commit). with option --windows it works on all those environments (hope you test is also in pipeline for cygwin to be double sure) ## Git bash ```bash vakom@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk) $ uname -a MINGW64_NT-10.0-22621 rafal-laptok 3.3.6-341.x86_64 2022-09-05 20:28 UTC x86_64 Msys vakom@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk) $ ./bin/kafka-server-start.sh config/server.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] [2023-06-16 14:35:28,564] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2023-06-16 14:35:28,713] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2023-06-16 14:35:28,774] INFO starting (kafka.server.KafkaServer) [2023-06-16 14:35:28,775] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2023-06-16 14:35:28,783] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) [2023-06-16 14:35:28,806] INFO Client environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT (org.apache.zookeeper.ZooKeeper) [2023-06-16 14:35:28,806] INFO Client environment:host.name=192.168.1.137 (org.apache.zookeeper.ZooKeeper) [2023-06-16 14:35:28,806] INFO Client environment:java.version=19.0.1 (org.apache.zookeeper.ZooKeeper) ... (skipped rest of logs because all looks fine) ``` ## MSYS2 ```bash vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka $ uname -a MSYS_NT-10.0-22621 rafal-laptok 3.4.6.x86_64 2023-04-01 11:43 UTC x86_64 Msys vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka $ ./bin/kafka-server-start.sh config/server.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory] log4j:WARN No appenders could be found for logger (kafka.utils.Log4jControllerRegistration$). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. (this is all) ``` **one remark**: in MSYS2 SLF4J is not printing logs, only warning that log appender cannot be found. however directories for kafka are being created properly and cluster works fine: ```bash vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka $ ls -lart /c/tmp/kafka-logs/ razem 7 drwxr-xr-x 1 vakom vakom 0 06-16 14:41 .. -rw-r--r-- 1 vakom vakom 0 06-16 14:41 .lock -rw-r--r-- 1 vakom vakom 0 06-16 14:41 cleaner-offset-checkpoint -rw-r--r-- 1 vakom vakom 0 06-16 14:41 replication-offset-checkpoint -rw-r--r-- 1 vakom vakom 94 06-16 14:41 meta.properties -rw-r--r-- 1 va
[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task
divijvaidya commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594625865 This is perfect. I was trying a similar approach but somehow my `it.tasks` wasn't returning the `processMessages`?! Can you please verify that it is working fine by running `./gradlew rat --dry-run`? It should print the processMessages tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
showuon commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1594624985 Failed tests are unrelated and the failed `testReplication` also fail in trunk build. I've identified it's caused by this change: https://github.com/apache/kafka/pull/13838 . Go ahead to merge it. ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / kafka.admin.AddPartitionsTest.testIncrementPartitions(String).quorum=zk Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task
dajac commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594618113 @divijvaidya I just push another approach. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
viktorsomogyi commented on PR #13696: URL: https://github.com/apache/kafka/pull/13696#issuecomment-1594603126 @hudeqi I can get to this early next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
viktorsomogyi commented on code in PR #13421: URL: https://github.com/apache/kafka/pull/13421#discussion_r1232168660 ## core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala: ## @@ -97,6 +97,16 @@ class ReplicaAlterLogDirsThread(name: String, } } + // For ReplicaAlterLogDirsThread, if the partition is marked as failed due to an unknown exception and the partition fetch + // is suspended, the paused cleanup logic of the partition needs to be canceled, otherwise it will lead to serious unexpected + // disk usage growth. Review Comment: ```suggestion // For ReplicaAlterLogDirsThread, if the future partition is marked as failed due to an unknown exception and the partition fetch // is suspended, the paused cleanup logic of the current partition needs to be resumed, otherwise it can lead to unexpected // disk usage growth. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi merged pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
viktorsomogyi merged PR #13719: URL: https://github.com/apache/kafka/pull/13719 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task
divijvaidya commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594554215 I just encountered another idea. Just put `rat.dependsOn compileJava` at https://github.com/apache/kafka/blob/e1d59920f4fed5c4bc890e53e249d3439e148bab/build.gradle#L211 You can verify the dependency using: ``` $ ./gradlew rat --dry-run > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id cabd6d2e) using Gradle 8.1.1, Java 17 and Scala 2.13.10 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 :compileJava SKIPPED :rat SKIPPED ``` and compileJava depends on processMessages using ``` $ ./gradlew compileJava --dry-run | grep processMessages :clients:processMessages SKIPPED :core:processMessages SKIPPED :group-coordinator:processMessages SKIPPED :metadata:processMessages SKIPPED :raft:processMessages SKIPPED :storage:processMessages SKIPPED :streams:processMessages SKIPPED ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lvillaca commented on pull request #12358: KAFKA-13988:Fix mm2 auto.offset.reset=latest not working
lvillaca commented on PR #12358: URL: https://github.com/apache/kafka/pull/12358#issuecomment-1594475316 > @yuz10 are you able to ping someone to get this moving forward? This must be blocking a lot of people of upgrading existing installs. > > Thanks. Our project is also interested in this fix, it's a major issue for us as with the legacy version (for which this feature worked fine) we could not propagate offsets. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task
divijvaidya commented on PR #13854: URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594472639 This problem is annoying and real. Thank you for starting a PR to fix this. Another example build that fails with similar problems: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13676/5/pipeline However, I am wondering if an alternative way to do it is: Could we create a new root level task called `processMessages` which will dependOn the following tasks: ``` :clients:processMessages :core:processMessages :group-coordinator:processMessages :metadata:processMessages :raft:processMessages :storage:processMessages :streams:processMessages ``` and then, `check` depends on `processMessages` and then, `rat` already has a dependency on `check` This will ensure that rat will depend on any future new subtasks for `processMessages`. Note that the problem being solved in this PR is slightly different from what was solved in https://github.com/apache/kafka/pull/13316 because over there we wanted each subtask.srcJar to depend on `processMessages` but now root is a root level task, hence, it can have a root level dependency on each subtasks's `:processMessages` separately. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14112) Expose replication-offset-lag Mirror metric
[ https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733444#comment-17733444 ] Elkhan Eminov commented on KAFKA-14112: --- [~viktorsomogyi] is this this actual? This went out of my radar for a while but if the change has not been yet contributed back by cldr (as I can see, it's not in the trunk), I'd like to resume working on it > Expose replication-offset-lag Mirror metric > --- > > Key: KAFKA-14112 > URL: https://issues.apache.org/jira/browse/KAFKA-14112 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Elkhan Eminov >Assignee: Elkhan Eminov >Priority: Minor > > The offset lag is the difference of the last replicated record's source > offset and the end offset of the source. > The offset lag is a difference (LRO-LEO), but its constituents calculated at > different points of time and place > * LEO shall be calculated during source task's poll loop (ready to get it > from the consumer) > * LRO shall be kept in an in-memory "cache", that is updated during the > task's producer callback > LRO is initialized when task is started, from the offset store. The > difference shall be calculated when the freshest LEO acquired > in the poll loop. The calculated amount shall be defined as a MirrorMaker > metric. > This would describe to amount of "to be replicated" number of records for a > certain topic-partition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14112) Expose replication-offset-lag Mirror metric
[ https://issues.apache.org/jira/browse/KAFKA-14112 ] Elkhan Eminov deleted comment on KAFKA-14112: --- was (Author: JIRAUSER285952): Hey [~viktorsomogyi], this is WIP, I'll submit a PR soon > Expose replication-offset-lag Mirror metric > --- > > Key: KAFKA-14112 > URL: https://issues.apache.org/jira/browse/KAFKA-14112 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Elkhan Eminov >Assignee: Elkhan Eminov >Priority: Minor > > The offset lag is the difference of the last replicated record's source > offset and the end offset of the source. > The offset lag is a difference (LRO-LEO), but its constituents calculated at > different points of time and place > * LEO shall be calculated during source task's poll loop (ready to get it > from the consumer) > * LRO shall be kept in an in-memory "cache", that is updated during the > task's producer callback > LRO is initialized when task is started, from the offset store. The > difference shall be calculated when the freshest LEO acquired > in the poll loop. The calculated amount shall be defined as a MirrorMaker > metric. > This would describe to amount of "to be replicated" number of records for a > certain topic-partition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] viktorsomogyi merged pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks
viktorsomogyi merged PR #13819: URL: https://github.com/apache/kafka/pull/13819 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts
[ https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733434#comment-17733434 ] Kumar Shivendu commented on KAFKA-15050: Nice! I'll pick up something else then. > Prompts in the quickstarts > -- > > Key: KAFKA-15050 > URL: https://issues.apache.org/jira/browse/KAFKA-15050 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Priority: Trivial > Labels: newbie > > In the quickstarts [Steps > 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate > the command prompt. When we start to use Kafka Connect in [Step > 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch > to {{{}>{}}}. The [Kafka Streams > quickstart|https://kafka.apache.org/documentation/streams/quickstart] also > uses {{{}>{}}}. I don't think there's a reason for this, but if there is one > (root vs user account?) it should be explained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
dajac commented on PR #13719: URL: https://github.com/apache/kafka/pull/13719#issuecomment-1594414022 @viktorsomogyi Nope. LGTM. Feel free to merge it. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method
viktorsomogyi commented on PR #13719: URL: https://github.com/apache/kafka/pull/13719#issuecomment-1594411657 @dajac do you have other comments or can I merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13321: MINOR: added compatibility for MinGW
divijvaidya commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594389639 Thank you @maniekes. MINGW64 looks good. Can you also please provide test details for MSYS? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on PR #13850: URL: https://github.com/apache/kafka/pull/13850#issuecomment-1594369804 Thank you for looking into this @satishd > But we observed an impact when there is a degradation in our remote storage(HDFS) clusters. Yes, you are right, the impact of this lock contention is felt in cases when fetching from RSM is slow. We observed something similar. > Another way is to change the current LinkedHashMap to a synchronized map and use a slot based lock instead of a global lock. This workaround is less performant than Caffiene but it does not introduce a new dependency. I understand what you are saying. I did consider that approach. So that we are on the same page, the approach looks like: ``` // Thread safe data structure that holds the locks per entry // Keys from this data structure are removed when a val entryLockMap = ConcurrentHashMap[Entry, Lock]() // Internal LRU cache based on LinkedHashMap. LinkedHashMap is made thread safe by synchronising all its operations by using SynchronizedCache (from org.apache.kafka.common.cache) val internalCache = SynchronizedCache(new LinkedHashMap()) getEntry(key)) { // return key is available in cache if (internalCache.contains(key)) { return internalCache.get(key) } // we have to update the entry in the cache. we will acquire a lock on entry, fetch it and then update in cache. createEntryLock in entryLockMap if not present // acquire exclusive lock on entry entryLockMap.get(key).lock() try { // after acquiring lock, check again for presence of key in cache, another thread may have updated it while // this thread was waiting for lock if (internalCache.contains(key)) { return internalCache.get(key) } val entry = fetchFromRSM(key) // no need to acquire a lock in internal cache since get/put methods on this cache are synchronized internalCache.put(entry) } finally { release entry lock } } ``` The advantages of this approach are: 1. No external dependency 2. Same pattern of cache used in other places in the Kafka code (see org.apache.kafka.common.cache.SynchronizedCache). But that code was written in 2015 when perhaps better alternatives didn't exist at that time. The disadvantages of this approach are: 1. Cache metrics will have to be completed manually 2. Cache eviction is done synchronously and hence, it holds the global cache lock. 3. In the absence of a lock that ensures fairness, when we synchronise the global cache, a fetch thread may get starved leading to high tail latencies. Such a scenario will occur when we have a high get() operation load and other threads for get() are getting prioritized by the lock ahead of the put() thread. That is why I am not a big fan of using data structures with coarse grained global locks. Caffeine on the other hand doesn't have any of the disadvantage mentioned above, is used in high throughput low latency systems like Cassandra and has a constant release cadence with great support. Hence, I believe that using Caffeine is the right choice here. (I know that you already agreed to this but adding more context here for other readers of the PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1231951562 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -37,88 +40,125 @@ object RemoteIndexCache { val TmpFileSuffix = ".tmp" } -class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) extends AutoCloseable { private var markedForCleanup: Boolean = false - private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock() + private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock() def lookupOffset(targetOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") else offsetIndex.lookup(targetOffset) } } def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") - val timestampOffset = timeIndex.lookup(timestamp) offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) } } def markForCleanup(): Unit = { -CoreUtils.inLock(lock.writeLock()) { +inWriteLock(entryLock) { if (!markedForCleanup) { markedForCleanup = true Array(offsetIndex, timeIndex).foreach(index => index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX +// txn index needs to be renamed separately since it's not of type AbstractIndex txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX))) } } } + /** + * Deletes the index files from the disk. Invoking #close is not required prior to this function. + */ def cleanup(): Unit = { markForCleanup() CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) } + /** + * Calls the underlying close method for each index which may lead to releasing resources such as mmap. + * This function does not delete the index files. + */ def close(): Unit = { -Array(offsetIndex, timeIndex).foreach(index => try { - index.close() -} catch { - case _: Exception => // ignore error. -}) +Utils.closeQuietly(offsetIndex, "Closing the offset index.") +Utils.closeQuietly(timeIndex, "Closing the time index.") Utils.closeQuietly(txnIndex, "Closing the transaction index.") } } /** * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid - * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. The cache is + * re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that this cache is not strictly based on a LRU policy. It is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. * * @param maxSize maximum number of segment index entries to be cached. * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. * @param logDir log directory */ +@threadsafe class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) - extends Logging with Closeable { - - val cacheDir = new File(logDir, DirName) - @volatile var closed = false - - val expiredIndexes = new LinkedBlockingQueue[Entry]() - val lock = new Object() - - val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, Entry](maxSize / 2, -0.75f, true) { -override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): Boolean = { - if (this.size() > maxSize) { -val entry = eldest.getValue -// Mark the entries for cleanup, background thread will clean them later. -entry.markForCleanup() -expiredIndexes.add(entry) -true - } else { -false - } -} - } + extends Logging with AutoCloseable { + /** + * Directory where the index files will be stored on disk. + */ + private val cache
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1231937722 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -37,88 +40,125 @@ object RemoteIndexCache { val TmpFileSuffix = ".tmp" } -class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) extends AutoCloseable { private var markedForCleanup: Boolean = false - private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock() + private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock() def lookupOffset(targetOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") else offsetIndex.lookup(targetOffset) } } def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") - val timestampOffset = timeIndex.lookup(timestamp) offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) } } def markForCleanup(): Unit = { -CoreUtils.inLock(lock.writeLock()) { +inWriteLock(entryLock) { if (!markedForCleanup) { markedForCleanup = true Array(offsetIndex, timeIndex).foreach(index => index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX +// txn index needs to be renamed separately since it's not of type AbstractIndex txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX))) } } } + /** + * Deletes the index files from the disk. Invoking #close is not required prior to this function. + */ def cleanup(): Unit = { markForCleanup() CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) } + /** + * Calls the underlying close method for each index which may lead to releasing resources such as mmap. + * This function does not delete the index files. + */ def close(): Unit = { -Array(offsetIndex, timeIndex).foreach(index => try { - index.close() -} catch { - case _: Exception => // ignore error. -}) +Utils.closeQuietly(offsetIndex, "Closing the offset index.") +Utils.closeQuietly(timeIndex, "Closing the time index.") Utils.closeQuietly(txnIndex, "Closing the transaction index.") } } /** * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid - * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. The cache is + * re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that this cache is not strictly based on a LRU policy. It is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. * * @param maxSize maximum number of segment index entries to be cached. * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. * @param logDir log directory */ +@threadsafe class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) - extends Logging with Closeable { - - val cacheDir = new File(logDir, DirName) - @volatile var closed = false - - val expiredIndexes = new LinkedBlockingQueue[Entry]() - val lock = new Object() - - val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, Entry](maxSize / 2, -0.75f, true) { -override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): Boolean = { - if (this.size() > maxSize) { -val entry = eldest.getValue -// Mark the entries for cleanup, background thread will clean them later. -entry.markForCleanup() -expiredIndexes.add(entry) -true - } else { -false - } -} - } + extends Logging with AutoCloseable { + /** + * Directory where the index files will be stored on disk. + */ + private val cache
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1231938032 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -170,44 +190,173 @@ class RemoteIndexCacheTest { assertThrows(classOf[IllegalStateException], () => cache.getIndexEntry(metadataList.head)) } + @Test + def testCloseIsIdempotent(): Unit = { +val spyCleanerThread = spy(cache.cleanerThread) +cache.cleanerThread = spyCleanerThread +cache.close() +cache.close() +// verify that cleanup is only called once +verify(spyCleanerThread).initiateShutdown() + } + + @Test + def testClose(): Unit = { +val spyInternalCache = spy(cache.internalCache) +val spyCleanerThread = spy(cache.cleanerThread) + +// replace with new spy cache +cache.internalCache = spyInternalCache +cache.cleanerThread = spyCleanerThread + +// use the cache +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId) +val entry = cache.getIndexEntry(metadataList.head) + +val spyTxnIndex = spy(entry.txnIndex) +val spyOffsetIndex = spy(entry.offsetIndex) +val spyTimeIndex = spy(entry.timeIndex) +// remove this entry and replace with spied entry +cache.internalCache.invalidateAll() +cache.internalCache.put(metadataList.head.remoteLogSegmentId().id(), new Entry(spyOffsetIndex, spyTimeIndex, spyTxnIndex)) + +// close the cache +cache.close() + +// cleaner thread should be closed properly +verify(spyCleanerThread).initiateShutdown() +verify(spyCleanerThread).awaitShutdown() + +// close for all index entries must be invoked +verify(spyTxnIndex).close() +verify(spyOffsetIndex).close() +verify(spyTimeIndex).close() + +// index files must not be deleted +verify(spyTxnIndex, times(0)).deleteIfExists() +verify(spyOffsetIndex, times(0)).deleteIfExists() +verify(spyTimeIndex, times(0)).deleteIfExists() + } + + @Test + def testConcurrentReadWriteAccessForCache(): Unit = { +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +cache.getIndexEntry(metadataList.head) +assertCacheSize(1) +verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP)) +reset(rsm) + +// Simulate a concurrency situation where one thread is reading the entry already present in the cache (cache hit) +// and the other thread is reading an entry which is not available in the cache (cache miss). The expected behaviour +// is for the former thread to succeed while latter is fetching from rsm. +// In this this test we simulate the situation using latches. We perform the following operations: +// 1. Start the CacheMiss thread and wait until it starts executing the rsm.fetchIndex +// 2. Block the CacheMiss thread inside the call to rsm.fetchIndex. +// 3. Start the CacheHit thread. Assert that it performs a successful read. +// 4. On completion of successful read by CacheHit thread, signal the CacheMiss thread to release it's block. +// 5. Validate that the test passes. If the CacheMiss thread was blocking the CacheHit thread, the test will fail. +// +val latchForCacheHit = new CountDownLatch(1) +val latchForCacheMiss = new CountDownLatch(1) + +val readerCacheHit = (() => { + // Wait for signal to start executing the read + logger.debug(s"Waiting for signal to begin read from ${Thread.currentThread()}") + latchForCacheHit.await() + val entry = cache.getIndexEntry(metadataList.head) + assertNotNull(entry) + // Signal the CacheMiss to unblock itself + logger.debug(s"Signaling CacheMiss to unblock from ${Thread.currentThread()}") + latchForCacheMiss.countDown() +}): Runnable + +when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(_ => { +logger.debug(s"Signaling CacheHit to begin read from ${Thread.currentThread()}") +latchForCacheHit.countDown() +logger.debug("Waiting for signal to complete rsm fetch from" + Thread.currentThread()) +latchForCacheMiss.await() + }) + +val readerCacheMiss = (() => { + val entry = cache.getIndexEntry(metadataList.last) + assertNotNull(entry) +}): Runnable + +val executor = Executors.newFixedThreadPool(2) +try { + executor.submit(readerCacheMiss: Runnable) + executor.submit(readerCacheHit: Runnable) + assertTrue(latchForCacheMiss.await(30, TimeUnit.SECONDS)) +} finally { + executor.shutdownNow() +} + } + @Test def testReloadCacheAfterClose(): Unit = { -val cache = new RemoteIndexCache(maxSi
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
divijvaidya commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1231931509 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -167,14 +210,14 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM init() // Start cleaner thread that will clean the expired entries - val cleanerThread: ShutdownableThread = new ShutdownableThread("remote-log-index-cleaner") { + private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread("remote-log-index-cleaner") { Review Comment: We need it as var for unit tests where we want to override this with a spy implementation so that we can verify the invocations. I can alternatively add a setter for this thread which could be used in unit tests but preferred using var to be simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache
satishd commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1231867641 ## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ## @@ -37,88 +40,125 @@ object RemoteIndexCache { val TmpFileSuffix = ".tmp" } -class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) { +class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) extends AutoCloseable { private var markedForCleanup: Boolean = false - private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock() + private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock() def lookupOffset(targetOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") else offsetIndex.lookup(targetOffset) } } def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = { -CoreUtils.inLock(lock.readLock()) { +inReadLock(entryLock) { if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup") - val timestampOffset = timeIndex.lookup(timestamp) offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset)) } } def markForCleanup(): Unit = { -CoreUtils.inLock(lock.writeLock()) { +inWriteLock(entryLock) { if (!markedForCleanup) { markedForCleanup = true Array(offsetIndex, timeIndex).foreach(index => index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX +// txn index needs to be renamed separately since it's not of type AbstractIndex txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX))) } } } + /** + * Deletes the index files from the disk. Invoking #close is not required prior to this function. + */ def cleanup(): Unit = { markForCleanup() CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists())) } + /** + * Calls the underlying close method for each index which may lead to releasing resources such as mmap. + * This function does not delete the index files. + */ def close(): Unit = { -Array(offsetIndex, timeIndex).foreach(index => try { - index.close() -} catch { - case _: Exception => // ignore error. -}) +Utils.closeQuietly(offsetIndex, "Closing the offset index.") +Utils.closeQuietly(timeIndex, "Closing the time index.") Utils.closeQuietly(txnIndex, "Closing the transaction index.") } } /** * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid - * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. + * re-fetching the index files like offset, time indexes from the remote storage for every fetch call. The cache is + * re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that this cache is not strictly based on a LRU policy. It is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. * * @param maxSize maximum number of segment index entries to be cached. * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes. * @param logDir log directory */ +@threadsafe class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String) - extends Logging with Closeable { - - val cacheDir = new File(logDir, DirName) - @volatile var closed = false - - val expiredIndexes = new LinkedBlockingQueue[Entry]() - val lock = new Object() - - val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, Entry](maxSize / 2, -0.75f, true) { -override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): Boolean = { - if (this.size() > maxSize) { -val entry = eldest.getValue -// Mark the entries for cleanup, background thread will clean them later. -entry.markForCleanup() -expiredIndexes.add(entry) -true - } else { -false - } -} - } + extends Logging with AutoCloseable { + /** + * Directory where the index files will be stored on disk. + */ + private val cacheDir
[GitHub] [kafka] jlprat commented on pull request #13824: MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde
jlprat commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-1594214535 Hi @satishd or @showuon any opinions on this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org