[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595613347

   > Sorry I'm late to the party but I had a question on this:
   > 
   > > It seems like @clolov is right, I tested it both in quorum and zk mode, 
Kafka successfully reconciles the questionable case (when X-1 on B comes back 
after A has compacted the logs), so I think it's fine to merge in this PR.
   > > I was also thinking of creating some integration test for this but it's 
hard to simulate disk errors in Java and we can't have any assumptions about 
where the tests run, so I think that should be a separate task as it's out of 
scope for this one. If you folks know a good fault injection framework, I'm all 
ears.
   > 
   > Did we confirm that if B comes back it is cleaned and or resumes correctly?
   
   Yes, I found this issue online. 
   I used this PR to fix our online code and found that it solved the problem 
very well. 
   But online issue belongs to delete policy's case, and for compact policy, 
I'll test it next week. @jolshan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595608359

   > @viktorsomogyi Trogdor has some fault injection capabilities. I'm not sure 
if disk errors are among them, but it could probably be added. See 
https://github.com/apache/kafka/tree/trunk/trogdor/src/main/java/org/apache/kafka/trogdor/fault
 for the types of faults we currently support.
   
   Thanks! @mumrah , I'm going to learn about trogdor to see how to use it. 
   In addition, if we add integration tests, put them in this PR, or need to 
open another PR? @viktorsomogyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] iblislin opened a new pull request, #13871: fix doc typo of StreamsBuilder.addGlobalStore

2023-06-16 Thread via GitHub


iblislin opened a new pull request, #13871:
URL: https://github.com/apache/kafka/pull/13871

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15100) Unsafe to call tryCompleteFetchResponse on request timeout

2023-06-16 Thread Jira
José Armando García Sancio created KAFKA-15100:
--

 Summary: Unsafe to call tryCompleteFetchResponse on request timeout
 Key: KAFKA-15100
 URL: https://issues.apache.org/jira/browse/KAFKA-15100
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


When the fetch request times out the future is completed from the 
"raft-expiration-executor" SystemTimer thread. KafkaRaftClient assumes that 
tryCompleteFetchResponse is always called from the same thread. This invariant 
is violated in this case.
{code:java}
           return future.handle((completionTimeMs, exception) -> {
              if (exception != null) {
                  Throwable cause = exception instanceof ExecutionException ?
                      exception.getCause() : exception;                  // If 
the fetch timed out in purgatory, it means no new data is available,
                  // and we will complete the fetch successfully. Otherwise, if 
there was
                  // any other error, we need to return it.
                  Errors error = Errors.forException(cause);
                  if (error != Errors.REQUEST_TIMED_OUT) {
                      logger.info("Failed to handle fetch from {} at {} due to 
{}",
                          replicaId, fetchPartition.fetchOffset(), error);
                      return buildEmptyFetchResponse(error, Optional.empty());
                  }
              }              // FIXME: `completionTimeMs`, which can be null
              logger.trace("Completing delayed fetch from {} starting at offset 
{} at {}",
                  replicaId, fetchPartition.fetchOffset(), completionTimeMs);   
           return tryCompleteFetchRequest(replicaId, fetchPartition, 
time.milliseconds());
          });
{code}
One solution is to always build an empty response if the future was completed 
exceptionally. This works because the ExpirationService completes the future 
with a `TimeoutException`.

A longer-term solution is to use a more flexible event executor service. This 
would be a service that allows more kinds of event to get scheduled/submitted 
to the KRaft thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-16 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +75,63 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (buffer.isPresent()) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   If you find it easier to understand sure. that is fine



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1256,10 +1261,25 @@ private  KStream doStreamTableJoin(final 
KTable table,
 final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
 final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+Optional> buffer = Optional.empty();
+
+if (joined.gracePeriod() != null) {

Review Comment:
   I see what you mean but we actually were going to create the store and leave 
it empty for zero duration. The point was to make it easier to change the grace 
period if desired so the store isn't getting created and destroyed. Something 
about making it more backward compatible.



##
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   yep sure.
   
   (Edit: The class it extents also uses junit 4 and I cant change this one 
without also chaining that and all other join integration tests)



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +75,63 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (buffer.isPresent()) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+//cast doesn't matter, it is just because the processor is 
deprecated. The context gets converted back with 
StoreToProcessorContextAdapter.adapt(context)
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timesta

[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


jolshan commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859633


##
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##
@@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest {
 assertEquals(0, thread.partitionCount)
   }
 
+  @Test
+  def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = {
+val brokerId = 1
+val partitionId = 0
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, 
"localhost:1234"))
+
+val partition = Mockito.mock(classOf[Partition])
+val replicaManager = Mockito.mock(classOf[ReplicaManager])
+val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+val futureLog = Mockito.mock(classOf[UnifiedLog])
+val logManager = Mockito.mock(classOf[LogManager])
+
+val logs = new Pool[TopicPartition, UnifiedLog]()
+logs.put(t1p0, futureLog)
+val logCleaner = new LogCleaner(new CleanerConfig(true),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = logs,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = new MockTime)
+
+val leaderEpoch = 5
+val logEndOffset = 0
+
+when(partition.partitionId).thenReturn(partitionId)
+when(replicaManager.metadataCache).thenReturn(metadataCache)
+when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
+when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
+when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition))
+when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+when(replicaManager.logManager).thenReturn(logManager)
+doAnswer(_ => {
+  logCleaner.abortAndPauseCleaning(t1p0)
+}).when(logManager).abortAndPauseCleaning(t1p0)
+doAnswer(_ => {
+  logCleaner.resumeCleaning(Seq(t1p0))
+}).when(logManager).resumeCleaning(t1p0)
+
+when(quotaManager.isQuotaExceeded).thenReturn(false)
+
+when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, 
fetchOnlyFromLeader = false))
+  .thenReturn(new EpochEndOffset()
+.setPartition(partitionId)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(leaderEpoch)
+.setEndOffset(logEndOffset))
+when(partition.futureLocalLogOrException).thenReturn(futureLog)
+doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
+when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false)
+
+when(futureLog.logStartOffset).thenReturn(0L)
+when(futureLog.logEndOffset).thenReturn(0L)
+when(futureLog.latestEpoch).thenReturn(None)
+
+val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
+  config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
+val responseData = new FetchPartitionData(
+  Errors.NONE,
+  0L,
+  0L,
+  MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, 
"foo".getBytes(StandardCharsets.UTF_8))),
+  Optional.empty(),
+  OptionalLong.empty(),
+  Optional.empty(),
+  OptionalInt.empty(),
+  false)
+mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, 
responseData)
+
+val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, 
quotaManager)
+val thread = new ReplicaAlterLogDirsThread(
+  "alter-logs-dirs-thread",
+  leader,
+  failedPartitions,
+  replicaManager,
+  quotaManager,
+  new BrokerTopicStats,
+  config.replicaFetchBackoffMs)
+
+// before starting the fetch, pause the clean of the partition.
+logManager.abortAndPauseCleaning(t1p0)
+thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, 
leaderEpoch)))
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+
+// first test: get handle without exception
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenReturn(None)
+
+thread.doWork()
+
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+assertTrue(logCleaner.isCleaningInStatePaused(t1p0))
+
+// second test: process partition data with throwing a 
KafkaStorageException.
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error"))

Review Comment:
   Do we have a way to verify the log is cleaned up after it is in failed 
state? Or in a state to resume correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


jolshan commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595410641

   Sorry I'm late to the party but I had a question on this:
   >It seems like @clolov is right, I tested it both in quorum and zk mode, 
Kafka successfully reconciles the questionable case (when X-1 on B comes back 
after A has compacted the logs), so I think it's fine to merge in this PR.
   I was also thinking of creating some integration test for this but it's hard 
to simulate disk errors in Java and we can't have any assumptions about where 
the tests run, so I think that should be a separate task as it's out of scope 
for this one. If you folks know a good fault injection framework, I'm all ears.
   
   Did we confirm that if B comes back it is cleaned and or resumes correctly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


jolshan commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859633


##
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##
@@ -196,6 +201,110 @@ class ReplicaAlterLogDirsThreadTest {
 assertEquals(0, thread.partitionCount)
   }
 
+  @Test
+  def shouldResumeCleanLogDirAfterMarkPartitionFailed(): Unit = {
+val brokerId = 1
+val partitionId = 0
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, 
"localhost:1234"))
+
+val partition = Mockito.mock(classOf[Partition])
+val replicaManager = Mockito.mock(classOf[ReplicaManager])
+val quotaManager = Mockito.mock(classOf[ReplicationQuotaManager])
+val futureLog = Mockito.mock(classOf[UnifiedLog])
+val logManager = Mockito.mock(classOf[LogManager])
+
+val logs = new Pool[TopicPartition, UnifiedLog]()
+logs.put(t1p0, futureLog)
+val logCleaner = new LogCleaner(new CleanerConfig(true),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = logs,
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = new MockTime)
+
+val leaderEpoch = 5
+val logEndOffset = 0
+
+when(partition.partitionId).thenReturn(partitionId)
+when(replicaManager.metadataCache).thenReturn(metadataCache)
+when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
+when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
+when(replicaManager.onlinePartition(t1p0)).thenReturn(Some(partition))
+when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+when(replicaManager.logManager).thenReturn(logManager)
+doAnswer(_ => {
+  logCleaner.abortAndPauseCleaning(t1p0)
+}).when(logManager).abortAndPauseCleaning(t1p0)
+doAnswer(_ => {
+  logCleaner.resumeCleaning(Seq(t1p0))
+}).when(logManager).resumeCleaning(t1p0)
+
+when(quotaManager.isQuotaExceeded).thenReturn(false)
+
+when(partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, 
fetchOnlyFromLeader = false))
+  .thenReturn(new EpochEndOffset()
+.setPartition(partitionId)
+.setErrorCode(Errors.NONE.code)
+.setLeaderEpoch(leaderEpoch)
+.setEndOffset(logEndOffset))
+when(partition.futureLocalLogOrException).thenReturn(futureLog)
+doNothing().when(partition).truncateTo(offset = 0, isFuture = true)
+when(partition.maybeReplaceCurrentWithFutureReplica()).thenReturn(false)
+
+when(futureLog.logStartOffset).thenReturn(0L)
+when(futureLog.logEndOffset).thenReturn(0L)
+when(futureLog.latestEpoch).thenReturn(None)
+
+val requestData = new FetchRequest.PartitionData(topicId, 0L, 0L,
+  config.replicaFetchMaxBytes, Optional.of(leaderEpoch))
+val responseData = new FetchPartitionData(
+  Errors.NONE,
+  0L,
+  0L,
+  MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(1000, 
"foo".getBytes(StandardCharsets.UTF_8))),
+  Optional.empty(),
+  OptionalLong.empty(),
+  Optional.empty(),
+  OptionalInt.empty(),
+  false)
+mockFetchFromCurrentLog(tid1p0, requestData, config, replicaManager, 
responseData)
+
+val endPoint = new BrokerEndPoint(0, "localhost", 1000)
+val leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, 
quotaManager)
+val thread = new ReplicaAlterLogDirsThread(
+  "alter-logs-dirs-thread",
+  leader,
+  failedPartitions,
+  replicaManager,
+  quotaManager,
+  new BrokerTopicStats,
+  config.replicaFetchBackoffMs)
+
+// before starting the fetch, pause the clean of the partition.
+logManager.abortAndPauseCleaning(t1p0)
+thread.addPartitions(Map(t1p0 -> initialFetchState(fetchOffset = 0L, 
leaderEpoch)))
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+
+// first test: get handle without exception
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenReturn(None)
+
+thread.doWork()
+
+assertTrue(thread.fetchState(t1p0).isDefined)
+assertEquals(1, thread.partitionCount)
+assertTrue(logCleaner.isCleaningInStatePaused(t1p0))
+
+// second test: process partition data with throwing a 
KafkaStorageException.
+when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
ArgumentMatchers.eq(true))).thenThrow(new KafkaStorageException("disk error"))

Review Comment:
   Do we have a way to verify the log is cleaned up after it is in failed state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


jolshan commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1232859045


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -706,6 +706,8 @@ abstract class AbstractFetcherThread(name: String,
*- the request succeeded or
*- it was fenced and this thread hasn't received new epoch, which means 
we need not backoff and retry as the
*partition is moved to failed state.
+   *- for ReplicaAlterLogDirsThread, "OutOfRange" error is generated when 
starting fetch to leader (source dir,

Review Comment:
   Just for my understanding, the new replica starts with trying to fetch 
offset 0, and in the case 0 is not the first offset, throws OffsetOutOfRange?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…

2023-06-16 Thread via GitHub


jolshan commented on PR #13721:
URL: https://github.com/apache/kafka/pull/13721#issuecomment-1595392012

   Yup, looks like this is what the KIP mentioned. It's very possible this was 
just missed in the initial implementation. I don't think this should cause 
major issues since the defaults are still valid (retry backoff is only 100 ms)
   Though I do wonder if it is super necessary to include. In the case where 
the delivery timeout is just slightly larger than the request timeout, we 
wouldn't have the full amount of time to retry anyway. In that KIP, we don't 
account for the time to await sending either.
   
   On the other hand, I don't think it hurts a lot to have unless someone set 
their retry backoff super high. This could potentially cause more failures than 
before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


mumrah commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1595373852

   @viktorsomogyi Trogdor has some fault injection capabilities. I'm not sure 
if disk errors are among them, but it could probably be added.  See 
https://github.com/apache/kafka/tree/trunk/trogdor/src/main/java/org/apache/kafka/trogdor/fault
 for the types of faults we currently support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13870: KAFKA-14500; [5/N]

2023-06-16 Thread via GitHub


jeffkbkim opened a new pull request, #13870:
URL: https://github.com/apache/kafka/pull/13870

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13848: MINOR: Make offsets topic creation more reliable in tests (zk mode)

2023-06-16 Thread via GitHub


jolshan commented on PR #13848:
URL: https://github.com/apache/kafka/pull/13848#issuecomment-1595350950

   Btw, @dajac do we typically autocreate this topic? Just curious because I've 
seen some issues with __transaction_state not having enough replicas in tests 
(taking 10 or so seconds to get a valid isr) Just wondering if pre-creating the 
topic is a valid approach for tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15099) Flaky Test kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft

2023-06-16 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15099:
--

 Summary: Flaky Test 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(String).quorum=kraft
 Key: KAFKA-15099
 URL: https://issues.apache.org/jira/browse/KAFKA-15099
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan


This one often fails with: 

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 6ms 
while awaiting InitProducerId

seems like a Kraft only issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-16 Thread via GitHub


jolshan commented on PR #13868:
URL: https://github.com/apache/kafka/pull/13868#issuecomment-1595345954

   Thanks for the fix. The thread leaks have been bugging me.
   
   Did we also want to set the hw argument to false here too? 
https://github.com/apache/kafka/blob/48a3aab85469836f107f4fd2af03d9fa3cb4e670/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala#L4033
   
   I also noticed some lines didn't explicitly name the argument, but not sure 
if that's as important: 
https://github.com/apache/kafka/blob/48a3aab85469836f107f4fd2af03d9fa3cb4e670/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala#L142
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-16 Thread via GitHub


jolshan commented on code in PR #13868:
URL: https://github.com/apache/kafka/pull/13868#discussion_r1232797807


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -342,9 +347,7 @@ class ReplicaManagerTest {
   rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
   assertTrue(appendResult.hasFired)
-} finally {
-  rm.shutdown(checkpointHW = false)
-}
+} finally rm.shutdown(checkpointHW = false)

Review Comment:
   should we be consistent with the new line vs no new line formatting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-16 Thread via GitHub


jolshan commented on code in PR #13868:
URL: https://github.com/apache/kafka/pull/13868#discussion_r1232796642


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -84,7 +84,7 @@ import scala.jdk.CollectionConverters._
 class ReplicaManagerTest {
 
   val topic = "test-topic"
-  val topicId = Uuid.randomUuid()
+  val topicId: Uuid = Uuid.randomUuid()

Review Comment:
   did we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set

2023-06-16 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15098:
--
Description: 
[ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to 
fatal exception
java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
does not yet support authorizers. Remove authorizer.class.name before 
performing a migration.


  was:
java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
does not yet support authorizers. Remove authorizer.class.name before 
performing a migration.



> KRaft migration does not proceed and broker dies if authorizer.class.name is 
> set
> 
>
> Key: KAFKA-15098
> URL: https://issues.apache.org/jira/browse/KAFKA-15098
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.0
>Reporter: Ron Dagostino
>Assignee: David Arthur
>Priority: Blocker
>
> [ERROR] 2023-06-16 20:14:14,298 [main] kafka.Kafka$ - Exiting Kafka due to 
> fatal exception
> java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
> does not yet support authorizers. Remove authorizer.class.name before 
> performing a migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15098) KRaft migration does not proceed and broker dies if authorizer.class.name is set

2023-06-16 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-15098:
-

 Summary: KRaft migration does not proceed and broker dies if 
authorizer.class.name is set
 Key: KAFKA-15098
 URL: https://issues.apache.org/jira/browse/KAFKA-15098
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.5.0
Reporter: Ron Dagostino
Assignee: David Arthur


java.lang.IllegalArgumentException: requirement failed: ZooKeeper migration 
does not yet support authorizers. Remove authorizer.class.name before 
performing a migration.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vp224 closed pull request #13869: Ktls test

2023-06-16 Thread via GitHub


vp224 closed pull request #13869: Ktls test
URL: https://github.com/apache/kafka/pull/13869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vp224 opened a new pull request, #13869: Ktls test

2023-06-16 Thread via GitHub


vp224 opened a new pull request, #13869:
URL: https://github.com/apache/kafka/pull/13869

   Description: Rebased the repo with ktls-jni changes and modified the 
ktls-jni library version
   
   JIRA Ticket: https://jira01.corp.linkedin.com:8443/browse/LIKAFKA-53140
   
   Changes:
   - Added log in SslTransportLayer.java to log when the write is performed 
with kernel tls enabled.
   - Changed the version of ktls-jni to 0.0.2
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1595092684

   @ijuma do you have any other questions wrt this PR? This is a useful feature 
to debug our flaky tests and I would like to ensure that your questions are 
addressed since you were involved in this review earlier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-16 Thread via GitHub


machi1990 commented on PR #13865:
URL: https://github.com/apache/kafka/pull/13865#issuecomment-1595032501

   Thanks for the review @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13868: MINOR: Close ReplicaManager correctly in ReplicaManagerTest

2023-06-16 Thread via GitHub


divijvaidya opened a new pull request, #13868:
URL: https://github.com/apache/kafka/pull/13868

   Fixes thread leaks by closing the ReplicaManager properly at the end of each 
test. The leaks were leading to flaky test failures in ReplicaManagerTest with 
errors such as:
   ```
   org.opentest4j.AssertionFailedError: Found unexpected 1 NonDaemon 
threads=kafka.server.ReplicaManagerTest:ReplicaFetcherThread-0-1 ==> expected: 
<0> but was: <1>
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:560)
at 
app//kafka.utils.TestUtils$.assertNoNonDaemonThreads(TestUtils.scala:1349)
at 
app//kafka.server.ReplicaManagerTest.testBecomeFollowerWhileOldClientFetchInPurgatory(ReplicaManagerTest.scala:1911)
   ```
   
   **Note to reviewers:** Since I have added try/finally for a lot of tests, I 
would recommend to use the "Hide Whitespaces" option of GitHub while looking at 
the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tkuramoto33 opened a new pull request, #13867: MINOR: Fix help message for kafka-metadata-shell.

2023-06-16 Thread via GitHub


tkuramoto33 opened a new pull request, #13867:
URL: https://github.com/apache/kafka/pull/13867

   The current help message for kafka-metadata-shell.sh is as follows.
   
   ```
   $ ~/kafka_2.13-3.5.0/bin/kafka-metadata-shell.sh -h
   usage: metadata-tool [-h] [--snapshot SNAPSHOT] [command [command ...]]
   
   The Apache Kafka metadata tool
   
   (omitted.)
   ```
   
   However, it should be as follows.
   
   ```
   $ ~/kafka_2.13-3.5.0/bin/kafka-metadata-shell.sh -h
   usage: kafka-metadata-shell [-h] [--snapshot SNAPSHOT] [command [command 
...]]
   
   The Apache Kafka metadata shell
   
   (omitted.)
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


dajac merged PR #13854:
URL: https://github.com/apache/kafka/pull/13854


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-16 Thread Elkhan Eminov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elkhan Eminov updated KAFKA-14112:
--
Description: 
The offset lag is the difference of the last replicated record's (LRO) source 
offset and the end offset of the source (LEO).
The offset lag is a difference (LRO-LEO), but its constituents calculated at 
different points of time and place
 * LEO shall be calculated during source task's poll loop (ready to get it from 
the consumer)
 * LRO shall be kept in an in-memory "cache", that is updated during the task's 
producer callback

LRO is initialized when task is started, from the offset store. The difference 
shall be calculated when the freshest LEO acquired
in the poll loop. The calculated amount shall be defined as a MirrorMaker 
metric.

This would describe to amount of "to be replicated" number of records for a 
certain topic-partition.

  was:
The offset lag is the difference of the last replicated record's source offset 
and the end offset of the source.
The offset lag is a difference (LRO-LEO), but its constituents calculated at 
different points of time and place
 * LEO shall be calculated during source task's poll loop (ready to get it from 
the consumer)
 * LRO shall be kept in an in-memory "cache", that is updated during the task's 
producer callback

LRO is initialized when task is started, from the offset store. The difference 
shall be calculated when the freshest LEO acquired
in the poll loop. The calculated amount shall be defined as a MirrorMaker 
metric.

This would describe to amount of "to be replicated" number of records for a 
certain topic-partition.


> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's (LRO) source 
> offset and the end offset of the source (LEO).
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


dajac commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594960723

   The number if failed tests is scary but none of them are related to this 
change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13866: MINOR: update LogCleaner.scala javadoc with a link to OffsetMap to help with code navigation in IDE

2023-06-16 Thread via GitHub


machi1990 opened a new pull request, #13866:
URL: https://github.com/apache/kafka/pull/13866

   I was going through the LogCleaner.scala file to familiarise myself with 
that part of the codebase and I noticed this minor improvement to help with 
code navigation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13839: MINOR:Fill missing parameter annotations for LogCleaner methods

2023-06-16 Thread via GitHub


hudeqi commented on PR #13839:
URL: https://github.com/apache/kafka/pull/13839#issuecomment-1594953593

   
   
   
   > Thanks for the PR @hudeqi. I think it's always good to improve JavaDocs or 
ScalaDocs.
   > 
   > I was looking at the `LogCleaner.scala` file and I saw there are plenty of 
methods that are public which have only a very general documentation and they 
don't have any parameter annotation with documentation. For example 
`abortCleaning` in line 220 and some of the following methods.
   > 
   > For the sake of completion, would you be up to adding the missing 
annotations to the methods that are public? Extra mile for all the ones that 
are package-log-protected (`private[log]`)
   
   Hi, thanks for the comments, I have added the comments of the methods you 
mentioned, please help to review, thank you! @jlprat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.

2023-06-16 Thread Mukesh Mishra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukesh Mishra updated KAFKA-15097:
--
Description: 
Currently we are facing NoSuchFileException in LogCleaner, which is critical 
error as result platform get shutown.
We are running Kafka in Kraft Mode.
The cluster is having 3 Node.
Kafka version - 3.3.1
We are  facing issue systematically, which occurs after retention reached.

Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
NoSuchFileException on 24 may 2023

Config :

[^config.txt]

^Logs :^

^Server Node 1 [^server.log]^

Log Cleaner :
[^log-cleaner.log]



^__consumer_offset_49 directory logs :^

^!__consumer_offset_49.png!^

we suspect that swap operation is culprit as we can see swap file for consumer 
offset 4146 for __consumer_offset_49

  was:
Currently we are facing NoSuchFileException in LogCleaner, which is critical 
error as result platform get shutown.
We are running Kafka in Kraft Mode.
The cluster is having 3 Node.
Kafka version - 3.3.1
We are  facing issue systematically, which occurs after retention reached.

Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
NoSuchFileException on 24 may 2023


Config :

[^config.txt]

^Logs :^

^Server Node 1 [^server.log]^

^Log Cleaner :
[^log-cleaner.log]
^

^__consumer_offset_49 directory logs :^

^!__consumer_offset_49.png!^

we suspect that swap operation is culprit as we can see swap file for consumer 
offset 4146 for __consumer_offset_49


> NoSuchFileException in LogCleaner Operation.
> 
>
> Key: KAFKA-15097
> URL: https://issues.apache.org/jira/browse/KAFKA-15097
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.3.1
>Reporter: Mukesh Mishra
>Priority: Blocker
> Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, 
> server.log
>
>
> Currently we are facing NoSuchFileException in LogCleaner, which is critical 
> error as result platform get shutown.
> We are running Kafka in Kraft Mode.
> The cluster is having 3 Node.
> Kafka version - 3.3.1
> We are  facing issue systematically, which occurs after retention reached.
> Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
> NoSuchFileException on 24 may 2023
> Config :
> [^config.txt]
> ^Logs :^
> ^Server Node 1 [^server.log]^
> Log Cleaner :
> [^log-cleaner.log]
> ^__consumer_offset_49 directory logs :^
> ^!__consumer_offset_49.png!^
> we suspect that swap operation is culprit as we can see swap file for 
> consumer offset 4146 for __consumer_offset_49



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.

2023-06-16 Thread Mukesh Mishra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukesh Mishra updated KAFKA-15097:
--
Affects Version/s: 3.3.1

> NoSuchFileException in LogCleaner Operation.
> 
>
> Key: KAFKA-15097
> URL: https://issues.apache.org/jira/browse/KAFKA-15097
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.3.1
>Reporter: Mukesh Mishra
>Priority: Blocker
> Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, 
> server.log
>
>
> Currently we are facing NoSuchFileException in LogCleaner, which is critical 
> error as result platform get shutown.
> We are running Kafka in Kraft Mode.
> The cluster is having 3 Node.
> Kafka version - 3.3.1
> We are  facing issue systematically, which occurs after retention reached.
> Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
> NoSuchFileException on 24 may 2023
> Config :
> [^config.txt]
> ^Logs :^
> ^Server Node 1 [^server.log]^
> ^Log Cleaner :
> [^log-cleaner.log]
> ^
> ^__consumer_offset_49 directory logs :^
> ^!__consumer_offset_49.png!^
> we suspect that swap operation is culprit as we can see swap file for 
> consumer offset 4146 for __consumer_offset_49



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13864: Revert "MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits"

2023-06-16 Thread via GitHub


C0urante commented on PR #13864:
URL: https://github.com/apache/kafka/pull/13864#issuecomment-1594925376

   Thanks Josep! Will merge without awaiting CI results as this is a clean 
revert and I've verified locally that the 
`MirrorConnectorsIntegrationTransactionsTest::testReplication` test case passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka

2023-06-16 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733576#comment-17733576
 ] 

Manyanda Chitimbo edited comment on KAFKA-15096 at 6/16/23 4:09 PM:


Thank you for reporting the issue [~Sasikumarms] an PR has been opened in 
[https://github.com/apache/kafka/pull/13865]
to bump the version. 
Once merged, I'll let the release managers determine how far the fix can be 
backported. 


was (Author: JIRAUSER299903):
https://github.com/apache/kafka/pull/13865

> CVE 2023-34455 - Vulnerability identified with Apache kafka
> ---
>
> Key: KAFKA-15096
> URL: https://issues.apache.org/jira/browse/KAFKA-15096
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sasikumar Muthukrishnan Sampath
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> A new vulnerability CVE-2023-34455 is identified with apache kafka 
> dependency. The vulnerability is coming from snappy-java:1.1.8.4
> Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
> snappy-java version to fix this issue
>  
> snappy-java is a fast compressor/decompressor for Java. Due to use of an 
> unchecked chunk length, an unrecoverable fatal error can occur in versions 
> prior to 1.1.10.1.
> The code in the function hasNextChunk in the fileSnappyInputStream.java 
> checks if a given stream has more chunks to read. It does that by attempting 
> to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function 
> returns false. Otherwise, if 4 bytes were available, the code treats them as 
> the length of the next chunk.
> In the case that the `compressed` variable is null, a byte array is allocated 
> with the size given by the input data. Since the code doesn’t test the 
> legality of the `chunkSize` variable, it is possible to pass a negative 
> number (such as 0x which is -1), which will cause the code to raise a 
> `java.lang.NegativeArraySizeException` exception. A worse case would happen 
> when passing a huge positive value (such as 0x7FFF), which would raise 
> the fatal `java.lang.OutOfMemoryError` error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.

2023-06-16 Thread Mukesh Mishra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mukesh Mishra updated KAFKA-15097:
--
Priority: Blocker  (was: Critical)

> NoSuchFileException in LogCleaner Operation.
> 
>
> Key: KAFKA-15097
> URL: https://issues.apache.org/jira/browse/KAFKA-15097
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Mukesh Mishra
>Priority: Blocker
> Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, 
> server.log
>
>
> Currently we are facing NoSuchFileException in LogCleaner, which is critical 
> error as result platform get shutown.
> We are running Kafka in Kraft Mode.
> The cluster is having 3 Node.
> Kafka version - 3.3.1
> We are  facing issue systematically, which occurs after retention reached.
> Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
> NoSuchFileException on 24 may 2023
> Config :
> [^config.txt]
> ^Logs :^
> ^Server Node 1 [^server.log]^
> ^Log Cleaner :
> [^log-cleaner.log]
> ^
> ^__consumer_offset_49 directory logs :^
> ^!__consumer_offset_49.png!^
> we suspect that swap operation is culprit as we can see swap file for 
> consumer offset 4146 for __consumer_offset_49



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-16 Thread via GitHub


machi1990 commented on PR #13865:
URL: https://github.com/apache/kafka/pull/13865#issuecomment-1594915227

   @showuon @mimaison @divijvaidya can one of you have a look? Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13865: KAFKA-15096: Update snappy-java to 1.1.10.1

2023-06-16 Thread via GitHub


machi1990 opened a new pull request, #13865:
URL: https://github.com/apache/kafka/pull/13865

   Upgrade to 1.1.10.1 to fix CVE 2023-34455
   
   The release notes are available at 
https://github.com/xerial/snappy-java/releases/tag/v1.1.10.1
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15097) NoSuchFileException in LogCleaner Operation.

2023-06-16 Thread Mukesh Mishra (Jira)
Mukesh Mishra created KAFKA-15097:
-

 Summary: NoSuchFileException in LogCleaner Operation.
 Key: KAFKA-15097
 URL: https://issues.apache.org/jira/browse/KAFKA-15097
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Reporter: Mukesh Mishra
 Attachments: __consumer_offset_49.png, config.txt, log-cleaner.log, 
server.log

Currently we are facing NoSuchFileException in LogCleaner, which is critical 
error as result platform get shutown.
We are running Kafka in Kraft Mode.
The cluster is having 3 Node.
Kafka version - 3.3.1
We are  facing issue systematically, which occurs after retention reached.

Here's the logs of Node 1 for consumer offset 4146 for which we have faced 
NoSuchFileException on 24 may 2023


Config :

[^config.txt]

^Logs :^

^Server Node 1 [^server.log]^

^Log Cleaner :
[^log-cleaner.log]
^

^__consumer_offset_49 directory logs :^

^!__consumer_offset_49.png!^

we suspect that swap operation is culprit as we can see swap file for consumer 
offset 4146 for __consumer_offset_49



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka

2023-06-16 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo reassigned KAFKA-15096:
-

Assignee: Manyanda Chitimbo

> CVE 2023-34455 - Vulnerability identified with Apache kafka
> ---
>
> Key: KAFKA-15096
> URL: https://issues.apache.org/jira/browse/KAFKA-15096
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sasikumar Muthukrishnan Sampath
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> A new vulnerability CVE-2023-34455 is identified with apache kafka 
> dependency. The vulnerability is coming from snappy-java:1.1.8.4
> Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
> snappy-java version to fix this issue
>  
> snappy-java is a fast compressor/decompressor for Java. Due to use of an 
> unchecked chunk length, an unrecoverable fatal error can occur in versions 
> prior to 1.1.10.1.
> The code in the function hasNextChunk in the fileSnappyInputStream.java 
> checks if a given stream has more chunks to read. It does that by attempting 
> to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function 
> returns false. Otherwise, if 4 bytes were available, the code treats them as 
> the length of the next chunk.
> In the case that the `compressed` variable is null, a byte array is allocated 
> with the size given by the input data. Since the code doesn’t test the 
> legality of the `chunkSize` variable, it is possible to pass a negative 
> number (such as 0x which is -1), which will cause the code to raise a 
> `java.lang.NegativeArraySizeException` exception. A worse case would happen 
> when passing a huge positive value (such as 0x7FFF), which would raise 
> the fatal `java.lang.OutOfMemoryError` error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW

2023-06-16 Thread via GitHub


maniekes commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594900986

   some extra research. i rolled back changes for cygpath --windows
   
   in MSYS2 MINGW64 i got following error:
   ```bash
   *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka
   $ ./bin/kafka-server-start.sh config/server.properties
   [0.003s][error][logging] Error opening log file 
'C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M':
 Invalid argument
   [0.003s][error][logging] Initialization of output 
'file=C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M'
 using options '(null)' failed.
   Invalid -Xlog option 
'-Xlog:gc*:file=C;C:\msys64\rafal\git\maniekes-kafka\logs\kafkaServer-gc.log;time,tags;filecount=10,filesize=100M',
 see error log for details.
   Error: Could not create the Java Virtual Machine.
   Error: A fatal exception has occurred. Program will exit.
   ```
   
   after i add this to script it work fine(and logs are now properly displayed):
   ```bash
   export MSYS2_ARG_CONV_EXCL="*"
   # this will do the same, but only for 2 faulty arguments:
   export MSYS2_ARG_CONV_EXCL="-Xlog:gc*:file=;-Dlog4j.configuration=" (since i 
preffer lean solutions i added this verion to script)
   ```
   
   in raw MSYS2 i got following error:
   ```bash
   *@@rafal-laptok MSYS /c/rafal/git/maniekes-kafka
   $ ./bin/kafka-server-start.sh config/server.properties
   /c/rafal/git/maniekes-kafka/bin/kafka-run-class.sh: linia 352: C:\Program 
Files\java\jdk-19.0.1/bin/java: Bad address
   /c/rafal/git/maniekes-kafka/bin/kafka-run-class.sh: linia 352: C:\Program 
Files\java\jdk-19.0.1/bin/java: No error
   ```
   
   it was caused by this line:
   https://github.com/apache/kafka/blob/trunk/bin/kafka-run-class.sh#L243
   for some strange reason MSYS2 is not allowing to set `KAFKA_LOG4J_OPTS` in 
this way. was getting "Bad address" even if i removed usages of 
`KAFKA_LOG4J_OPTS` from the very bottom of the script. i tried to workaround 
this and finally i decided just to change name:
   ```bash
   KAFKA_LOG4J_CMD_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
   ```
   wchich worked perfectly for all 4 shelll.
   
   here are test results after my commit:
   
   
   ## git bash
   ```bash
   *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk)
   $ uname -a
   MINGW64_NT-10.0-22621 rafal-laptok 3.3.6-341.x86_64 2022-09-05 20:28 UTC 
x86_64 Msys
   *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk)
   $ ./bin/kafka-server-start.sh config/server.properties
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
   [2023-06-16 17:43:05,794] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
   [2023-06-16 17:43:05,950] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
   [2023-06-16 17:43:06,014] INFO starting (kafka.server.KafkaServer)
   [2023-06-16 17:43:06,015] INFO Connecting to zookeeper on localhost:2181 
(kafka.server.KafkaServer)
   [2023-06-16 17:43:06,023] INFO [ZooKeeperClient Kafka server] Initializing a 
new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
   [2023-06-16 17:43:06,042] INFO Client 
environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, 
built on 12/18/2022 18:10 GMT (org.apache.zookeeper.ZooKeeper)
   [2023-06-16 17:43:06,042] INFO Client environment:host.name=192.168.1.137 
(org.apache.zookeeper.ZooKeeper)
   [2023-06-16 17:43:06,044] INFO Client environment:java.version=19.0.1 
(org.apache.zookeeper.ZooKeeper)
   [2023-06-16 17:43:06,044] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
   [2023-06-16 17:43:06,044] INFO Client environment:java.home=C:\Program 
Files\java\jdk-19.0.1 (org.apache.zookeeper.ZooKeeper)
   (... rest of logs skipped, all works fine)
   ```
   
   ## MSYS2 MINGW
   ```bash
   *@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka
   $ uname -a
   MINGW64_NT-10.0-22621 r

[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1594893813

   Thanks for your comment, I have merged the latest trunk to see if it can 
pass the CI check. @viktorsomogyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka

2023-06-16 Thread Sasikumar Muthukrishnan Sampath (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasikumar Muthukrishnan Sampath updated KAFKA-15096:

Description: 
A new vulnerability CVE-2023-34455 is identified with apache kafka dependency. 
The vulnerability is coming from snappy-java:1.1.8.4

Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
snappy-java version to fix this issue

 
snappy-java is a fast compressor/decompressor for Java. Due to use of an 
unchecked chunk length, an unrecoverable fatal error can occur in versions 
prior to 1.1.10.1.
The code in the function hasNextChunk in the fileSnappyInputStream.java checks 
if a given stream has more chunks to read. It does that by attempting to read 4 
bytes. If it wasn’t possible to read the 4 bytes, the function returns false. 
Otherwise, if 4 bytes were available, the code treats them as the length of the 
next chunk.
In the case that the `compressed` variable is null, a byte array is allocated 
with the size given by the input data. Since the code doesn’t test the legality 
of the `chunkSize` variable, it is possible to pass a negative number (such as 
0x which is -1), which will cause the code to raise a 
`java.lang.NegativeArraySizeException` exception. A worse case would happen 
when passing a huge positive value (such as 0x7FFF), which would raise the 
fatal `java.lang.OutOfMemoryError` error.

  was:
A new vulnerability CVE-2023-34455 is identified with camel-kafka dependencies. 
The vulnerability is coming from snappy-java:1.1.8.4

Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
snappy-java version to fix this issue

 
snappy-java is a fast compressor/decompressor for Java. Due to use of an 
unchecked chunk length, an unrecoverable fatal error can occur in versions 
prior to 1.1.10.1.
The code in the function hasNextChunk in the fileSnappyInputStream.java checks 
if a given stream has more chunks to read. It does that by attempting to read 4 
bytes. If it wasn’t possible to read the 4 bytes, the function returns false. 
Otherwise, if 4 bytes were available, the code treats them as the length of the 
next chunk.
In the case that the `compressed` variable is null, a byte array is allocated 
with the size given by the input data. Since the code doesn’t test the legality 
of the `chunkSize` variable, it is possible to pass a negative number (such as 
0x which is -1), which will cause the code to raise a 
`java.lang.NegativeArraySizeException` exception. A worse case would happen 
when passing a huge positive value (such as 0x7FFF), which would raise the 
fatal `java.lang.OutOfMemoryError` error.


> CVE 2023-34455 - Vulnerability identified with Apache kafka
> ---
>
> Key: KAFKA-15096
> URL: https://issues.apache.org/jira/browse/KAFKA-15096
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sasikumar Muthukrishnan Sampath
>Priority: Major
>
> A new vulnerability CVE-2023-34455 is identified with apache kafka 
> dependency. The vulnerability is coming from snappy-java:1.1.8.4
> Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
> snappy-java version to fix this issue
>  
> snappy-java is a fast compressor/decompressor for Java. Due to use of an 
> unchecked chunk length, an unrecoverable fatal error can occur in versions 
> prior to 1.1.10.1.
> The code in the function hasNextChunk in the fileSnappyInputStream.java 
> checks if a given stream has more chunks to read. It does that by attempting 
> to read 4 bytes. If it wasn’t possible to read the 4 bytes, the function 
> returns false. Otherwise, if 4 bytes were available, the code treats them as 
> the length of the next chunk.
> In the case that the `compressed` variable is null, a byte array is allocated 
> with the size given by the input data. Since the code doesn’t test the 
> legality of the `chunkSize` variable, it is possible to pass a negative 
> number (such as 0x which is -1), which will cause the code to raise a 
> `java.lang.NegativeArraySizeException` exception. A worse case would happen 
> when passing a huge positive value (such as 0x7FFF), which would raise 
> the fatal `java.lang.OutOfMemoryError` error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15096) CVE 2023-34455 - Vulnerability identified with Apache kafka

2023-06-16 Thread Sasikumar Muthukrishnan Sampath (Jira)
Sasikumar Muthukrishnan Sampath created KAFKA-15096:
---

 Summary: CVE 2023-34455 - Vulnerability identified with Apache 
kafka
 Key: KAFKA-15096
 URL: https://issues.apache.org/jira/browse/KAFKA-15096
 Project: Kafka
  Issue Type: Bug
Reporter: Sasikumar Muthukrishnan Sampath


A new vulnerability CVE-2023-34455 is identified with camel-kafka dependencies. 
The vulnerability is coming from snappy-java:1.1.8.4

Version 1.1.10.1 contains a patch for this issue. Please upgrade the 
snappy-java version to fix this issue

 
snappy-java is a fast compressor/decompressor for Java. Due to use of an 
unchecked chunk length, an unrecoverable fatal error can occur in versions 
prior to 1.1.10.1.
The code in the function hasNextChunk in the fileSnappyInputStream.java checks 
if a given stream has more chunks to read. It does that by attempting to read 4 
bytes. If it wasn’t possible to read the 4 bytes, the function returns false. 
Otherwise, if 4 bytes were available, the code treats them as the length of the 
next chunk.
In the case that the `compressed` variable is null, a byte array is allocated 
with the size given by the input data. Since the code doesn’t test the legality 
of the `chunkSize` variable, it is possible to pass a negative number (such as 
0x which is -1), which will cause the code to raise a 
`java.lang.NegativeArraySizeException` exception. A worse case would happen 
when passing a huge positive value (such as 0x7FFF), which would raise the 
fatal `java.lang.OutOfMemoryError` error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac opened a new pull request, #13863: MINOR: Reduce flakiness of testRackAwareRangeAssignor, second try

2023-06-16 Thread via GitHub


dajac opened a new pull request, #13863:
URL: https://github.com/apache/kafka/pull/13863

   This test still fails regularly with the following error:
   
   ```
   Error
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Timed out while awaiting expected 
assignment Set(topicWithAllPartitionsOnAllRacks-0, 
topicWithSingleRackPartitions-0). The current assignment is []
   Stacktrace
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Timed out while awaiting expected 
assignment Set(topicWithAllPartitionsOnAllRacks-0, 
topicWithSingleRackPartitions-0). The current assignment is []
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(FetchFromFollowerIntegrationTest.scala:211)
at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9$adapted(FetchFromFollowerIntegrationTest.scala:211)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
   ```
   
   I propose to increase the timeouts to 30 secs to mitigate it. The test 
already uses 30 secs timeouts in many places. This patch uses 30 secs 
everywhere.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13257: MINOR: Add ZK migration docs to the packaged docs

2023-06-16 Thread via GitHub


mumrah merged PR #13257:
URL: https://github.com/apache/kafka/pull/13257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12976: Improve TestUtils for temporary properties file

2023-06-16 Thread via GitHub


divijvaidya commented on code in PR #12976:
URL: https://github.com/apache/kafka/pull/12976#discussion_r1232345504


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -156,6 +156,21 @@ object TestUtils extends Logging {
*/
   def tempFile(prefix: String, suffix: String): File = 
JTestUtils.tempFile(prefix, suffix)
 
+  /**
+   * Create a file with the given contents in the default temporary-file 
directory,
+   * using `kafka` as the prefix and `tmp` as the suffix to generate its name.
+   */
+  def tempFile(contents: String): File = JTestUtils.tempFile(contents)
+
+  def tempPropertiesFile(properties: Properties): File = {
+return tempPropertiesFile(properties.asScala)
+  }
+
+  def tempPropertiesFile(properties: Map[String, String]): File = {
+val content = properties.map{case (k, v) => k + "=" + v}.mkString("\n")

Review Comment:
   please use `System.lineSeparator()` instead of `\n`. It's a much safer way 
to write newline characters that make the code system independent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-16 Thread via GitHub


tombentley commented on code in PR #13862:
URL: https://github.com/apache/kafka/pull/13862#discussion_r1232343003


##
docs/streams/quickstart.html:
##
@@ -91,8 +91,8 @@ Step
 https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz";
 title="Kafka downloads">Download the {{fullDotVersion}} release and un-tar 
it.
 Note that there are multiple downloadable Scala versions and we choose to use 
the recommended version ({{scalaVersion}}) here:
 
-> tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
-> cd kafka_{{scalaVersion}}-{{fullDotVersion}}
+  $ tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz

Review Comment:
   Do we need the initial two spaces?



##
docs/quickstart.html:
##
@@ -215,19 +215,17 @@ 
 Edit the config/connect-standalone.properties file, add or 
change the plugin.path configuration property match the following, 
and save the file:
 
 
-
-> echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar"
+$ echo 
"plugin.path=libs/connect-file-{{fullDotVersion}}.jar"

Review Comment:
   ```suggestion
   $ echo 
"plugin.path=libs/connect-file-{{fullDotVersion}}.jar" >> 
config/connect-standalone.properties
   ```



##
docs/quickstart.html:
##
@@ -215,19 +215,17 @@ 
 Edit the config/connect-standalone.properties file, add or 
change the plugin.path configuration property match the following, 
and save the file:

Review Comment:
   We don't actually mean it "should match the following" if _the following_ is 
some bash commands. 
   
   ```suggestion
   Edit the config/connect-standalone.properties file, adding 
or changing the plugin.path configuration property to include 
libs/connect-file-{{fullDotVersion}}.jar and save the file. 
Alternatively, in your terminal session you can use the following command:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on pull request #12976: Improve TestUtils for temporary properties file

2023-06-16 Thread via GitHub


zigarn commented on PR #12976:
URL: https://github.com/apache/kafka/pull/12976#issuecomment-1594798209

   @divijvaidya: rebased!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] justinrlee commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file

2023-06-16 Thread via GitHub


justinrlee commented on PR #12797:
URL: https://github.com/apache/kafka/pull/12797#issuecomment-1594792962

   Sorry for the delay here! I recently went on paternity leave but will pick 
this up (and add tests) when I get back in a month or two.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13099:
URL: https://github.com/apache/kafka/pull/13099#issuecomment-1594751739

   @showuon rebase from trunk please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12976: Improve TestUtils for temporary properties file

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12976:
URL: https://github.com/apache/kafka/pull/12976#issuecomment-1594744324

   @zigarn can you please rebase this PR with trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-16 Thread via GitHub


C0urante commented on PR #13819:
URL: https://github.com/apache/kafka/pull/13819#issuecomment-1594743764

   Thanks guys!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12852: Enhance the robustness of the program

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12852:
URL: https://github.com/apache/kafka/pull/12852#issuecomment-1594740256

   Hey @likeyoukang 
   Thank you for your first contribution. Could you please rebase this PR from 
trunk? And also change the title of the PR to "MINOR: Use "constant equals 
object" instead of "object equals Constant""


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya closed pull request #12905: fix check for MinGW64 compatibility

2023-06-16 Thread via GitHub


divijvaidya closed pull request #12905: fix check for MinGW64 compatibility
URL: https://github.com/apache/kafka/pull/12905


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12905: fix check for MinGW64 compatibility

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12905:
URL: https://github.com/apache/kafka/pull/12905#issuecomment-1594742425

   We have another PR https://github.com/apache/kafka/pull/13321 opened for 
this. Closing this in favour of the other one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12797: MINOR: Remove requirement to specify --bootstrap-server in utility scripts if specified in property file

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12797:
URL: https://github.com/apache/kafka/pull/12797#issuecomment-1594727580

   ping @justinrlee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2023-06-16 Thread via GitHub


divijvaidya closed pull request #12636: KAFKA-14214: Convert StandardAuthorizer 
to copy-on-write
URL: https://github.com/apache/kafka/pull/12636


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1594725046

   > I uploaded a new version at https://github.com/apache/kafka/pull/12662
   
   In that case, closing this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13257: MINOR: Add ZK migration docs to the packaged docs

2023-06-16 Thread via GitHub


mumrah commented on PR #13257:
URL: https://github.com/apache/kafka/pull/13257#issuecomment-1594723400

   Thanks @mimaison, updated with your suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12662: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12662:
URL: https://github.com/apache/kafka/pull/12662#issuecomment-1594720549

   @cmccabe I guess this can be closed now since it is superseded by 
https://github.com/apache/kafka/pull/13437 ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12667: KAFKA-10635: Improve logging with throwing OOOSE

2023-06-16 Thread via GitHub


divijvaidya commented on PR #12667:
URL: https://github.com/apache/kafka/pull/12667#issuecomment-1594713861

   @nicktelford ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13201: KAFKA-14596: Move TopicCommand to tools

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13201:
URL: https://github.com/apache/kafka/pull/13201#issuecomment-1594708038

   @OmniaGM can you please rebase this with trunk and I would be happy to begin 
a review for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13321: MINOR: added compatibility for MinGW

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594696603

   > hope you test is also in pipeline for cygwin to be double sure
   
   No, we don't have any tests based on windows (or cygwin) in our pipeline. We 
don't officially promise to support Kafka on windows platform. No pressure but 
we are relying on your testing here :) 
   
   > what i noticed that msys2 was not reading paths properly because path 
translator for cygwin was adding backslashes instead of slashes
   
   msys2 uses the same `cygpath` as cygwin so why isn't it working for msys2. 
Do you have any idea? My concern with using `--windows` instead of `--mixed` is 
that this would be a breaking change for people who have written scripts around 
this bug/feature of using the `/` slash. Hence, ideally I think it would be 
preferable to keep the existing behaviour as same.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13858: MINOR: Add more information in assertion failure for non daemon threads

2023-06-16 Thread via GitHub


divijvaidya merged PR #13858:
URL: https://github.com/apache/kafka/pull/13858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13858: MINOR: Add more information in assertion failure for non daemon threads

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13858:
URL: https://github.com/apache/kafka/pull/13858#issuecomment-1594662486

   Unrelated test failures:
   ```
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplicateTargetDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_8_and_Scala_2_12___testReplicateTargetDefault__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders___2/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance__/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.ConsumerBounceTest.testCloseDuringRebalance()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_11_and_Scala_2_13___testCloseDuringRebalance___2/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsTest.testSendOffsetsToTransactionTimeout(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/kafka.api/TransactionsTest/Build___JDK_11_and_Scala_2_13___testSendOffsetsToTransactionTimeout_String__quorum_kraft/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeFromEosAlphaToEosV2_true_/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_17_and_Scala_2_13___testReplication__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_17_and_Scala_2_13___testReplication___2/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_8_and_Scala_2_12___testReplication__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_8_and_Scala_2_12___testReplication___2/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13858/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplication___2/)
   ```
   
   The MirrorMaker failures are newly introduced in trunk.
   Existing ticket https://issues.apache.org/jira/browse/KAFKA-15052  for 
testBalancePartitionLeaders()
   Existing ticket https://issues.apache.org/jira/browse/KAFKA-8529 for 
testCloseDuringRebalance()
   Added new ticket https://issues.apache.org/jira/browse/KAFKA-15095 for 
shouldUpgradeFromEosAlphaToEosV2()


-- 
This is an aut

[jira] [Created] (KAFKA-15095) Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]

2023-06-16 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15095:


 Summary: Flaky test 
EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]
 Key: KAFKA-15095
 URL: https://issues.apache.org/jira/browse/KAFKA-15095
 Project: Kafka
  Issue Type: Test
  Components: unit tests
Reporter: Divij Vaidya


{code:java}
java.lang.AssertionError: Did not receive all 178 records from topic 
multiPartitionOutputTopic within 6 ms,  currently accumulated data is 
[KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 
10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), 
KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78), KeyValue(1, 
91), KeyValue(1, 105), KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), 
KeyValue(1, 171), KeyValue(1, 190), KeyValue(1, 120), KeyValue(1, 136), 
KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), KeyValue(1, 210), 
KeyValue(1, 231), KeyValue(1, 253), KeyValue(1, 276), KeyValue(1, 300), 
KeyValue(1, 325), KeyValue(1, 351), KeyValue(1, 378), KeyValue(1, 406), 
KeyValue(1, 435), KeyValue(2, 0), KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 
6), KeyValue(2, 10), KeyValue(2, 15), KeyValue(2, 21), KeyValue(2, 28), 
KeyValue(2, 36), KeyValue(2, 45), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 
78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 55), KeyValue(2, 66), 
KeyValue(2, 78), KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), 
KeyValue(2, 136), KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), 
KeyValue(2, 210), KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), 
KeyValue(2, 300), KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), 
KeyValue(2, 406), KeyValue(2, 435), KeyValue(2, 435), KeyValue(0, 0), 
KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 
15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), 
KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 
55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105), 
KeyValue(0, 120), KeyValue(0, 136), KeyValue(0, 153), KeyValue(0, 171), 
KeyValue(0, 190), KeyValue(0, 210), KeyValue(0, 231), KeyValue(0, 253), 
KeyValue(0, 276), KeyValue(0, 300), KeyValue(0, 325), KeyValue(0, 351), 
KeyValue(0, 378), KeyValue(0, 406), KeyValue(0, 435), KeyValue(0, 325), 
KeyValue(0, 351), KeyValue(0, 378), KeyValue(0, 406), KeyValue(0, 435), 
KeyValue(3, 0), KeyValue(3, 1), KeyValue(3, 3), KeyValue(3, 6), KeyValue(3, 
10), KeyValue(3, 15), KeyValue(3, 21), KeyValue(3, 28), KeyValue(3, 36), 
KeyValue(3, 45), KeyValue(3, 55), KeyValue(3, 66), KeyValue(3, 78), KeyValue(3, 
91), KeyValue(3, 105), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), 
KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 120), KeyValue(3, 136), 
KeyValue(3, 153), KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 210), 
KeyValue(3, 231), KeyValue(3, 253), KeyValue(3, 276), KeyValue(3, 300), 
KeyValue(3, 325), KeyValue(3, 351), KeyValue(3, 378), KeyValue(3, 406), 
KeyValue(3, 435), KeyValue(1, 465), KeyValue(1, 496), KeyValue(0, 465), 
KeyValue(0, 496), KeyValue(3, 465), KeyValue(3, 496), KeyValue(0, 528), 
KeyValue(1, 528), KeyValue(0, 561), KeyValue(1, 561), KeyValue(0, 595), 
KeyValue(3, 528), KeyValue(2, 465), KeyValue(3, 561), KeyValue(2, 496), 
KeyValue(3, 595), KeyValue(2, 528), KeyValue(2, 561), KeyValue(2, 
595)]Expected: is a value equal to or greater than <178> but: <164> was 
less than <178>   at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730)
  at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) 
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:347) 
 at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726)
   at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699)
   at 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.readResult(EosV2UpgradeIntegrationTest.java:1080)
   at 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.verifyUncommitted(EosV2UpgradeIntegrationTest.java:1055)
at 
org.apache.kafka.streams.integration.EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2(EosV2UpgradeIntegrationTest.java:705)
  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)  

[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW

2023-06-16 Thread via GitHub


maniekes commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594651760

   okay, i think i found root cause for it:
   
   https://github.com/apache/kafka/blob/trunk/bin/kafka-server-start.sh#L25
   https://github.com/apache/kafka/blob/trunk/bin/zookeeper-server-start.sh#L25
   
   cygpath is not changing it in `kafka-run-class.sh` because it's already set 
in main script. the same logic is also in a few other files.
   do you think we should modify it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15066) passing listener name config into TopicBasedRemoteLogMetadataManagerConfig

2023-06-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15066.
---
Fix Version/s: 3.6.0
   Resolution: Fixed

> passing listener name config into TopicBasedRemoteLogMetadataManagerConfig
> --
>
> Key: KAFKA-15066
> URL: https://issues.apache.org/jira/browse/KAFKA-15066
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.6.0
>
>
> The `remote.log.metadata.manager.listener.name` config doesn't pass to 
> TopicBasedRemoteLogMetadataManagerConfig correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-16 Thread via GitHub


showuon merged PR #13828:
URL: https://github.com/apache/kafka/pull/13828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


dajac commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594627878

   ```
   % ./gradlew rat --dry-run
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id 55373f1f) using Gradle 
8.1.1, Java 17 and Scala 2.13.10
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   :generator:compileJava SKIPPED
   :generator:processResources SKIPPED
   :generator:classes SKIPPED
   :generator:jar SKIPPED
   :clients:processMessages SKIPPED
   :clients:processTestMessages SKIPPED
   :core:processMessages SKIPPED
   :group-coordinator:processMessages SKIPPED
   :metadata:processMessages SKIPPED
   :raft:processMessages SKIPPED
   :storage:processMessages SKIPPED
   :streams:processMessages SKIPPED
   :rat SKIPPED
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] maniekes commented on pull request #13321: MINOR: added compatibility for MinGW

2023-06-16 Thread via GitHub


maniekes commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594625896

   ok, good catch with MSYS2 @divijvaidya. i have installed git bash, msys2, 
cygwin and tested on all of them(on msys2 i testes on msys2-mingw and on pure 
msys2). 
   
   what i noticed that msys2 was not reading paths properly because path 
translator for cygwin was adding backslashes instead of slashes (option --mixed 
vs --windows in cygpath, see my last commit). with option --windows it works on 
all those environments (hope you test is also in pipeline for cygwin to be 
double sure)
   
   ## Git bash
   ```bash
   vakom@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk)
   $ uname -a
   MINGW64_NT-10.0-22621 rafal-laptok 3.3.6-341.x86_64 2022-09-05 20:28 UTC 
x86_64 Msys
   
   vakom@rafal-laptok MINGW64 /c/rafal/git/maniekes-kafka (trunk)
   $ ./bin/kafka-server-start.sh config/server.properties
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
   [2023-06-16 14:35:28,564] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
   [2023-06-16 14:35:28,713] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
   [2023-06-16 14:35:28,774] INFO starting (kafka.server.KafkaServer)
   [2023-06-16 14:35:28,775] INFO Connecting to zookeeper on localhost:2181 
(kafka.server.KafkaServer)
   [2023-06-16 14:35:28,783] INFO [ZooKeeperClient Kafka server] Initializing a 
new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
   [2023-06-16 14:35:28,806] INFO Client 
environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, 
built on 12/18/2022 18:10 GMT (org.apache.zookeeper.ZooKeeper)
   [2023-06-16 14:35:28,806] INFO Client environment:host.name=192.168.1.137 
(org.apache.zookeeper.ZooKeeper)
   [2023-06-16 14:35:28,806] INFO Client environment:java.version=19.0.1 
(org.apache.zookeeper.ZooKeeper)
   ... (skipped rest of logs because all looks fine)
   ```
   
   ## MSYS2
   ```bash
   vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka
   $ uname -a
   MSYS_NT-10.0-22621 rafal-laptok 3.4.6.x86_64 2023-04-01 11:43 UTC x86_64 Msys
   
   vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka
   $ ./bin/kafka-server-start.sh config/server.properties
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/tools/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/trogdor/build/dependant-libs-2.13.10/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/runtime/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in 
[jar:file:/C:/rafal/git/maniekes-kafka/connect/mirror/build/dependant-libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
   log4j:WARN No appenders could be found for logger 
(kafka.utils.Log4jControllerRegistration$).
   log4j:WARN Please initialize the log4j system properly.
   log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for 
more info.
   (this is all)
   ```
   **one remark**: in MSYS2 SLF4J is not printing logs, only warning that log 
appender cannot be found. however directories for kafka are being created 
properly and cluster works fine:
   
   ```bash
   vakom@rafal-laptok MSYS /c/rafal/git/maniekes-kafka
   $ ls -lart /c/tmp/kafka-logs/
   razem 7
   drwxr-xr-x 1 vakom vakom  0 06-16 14:41 ..
   -rw-r--r-- 1 vakom vakom  0 06-16 14:41 .lock
   -rw-r--r-- 1 vakom vakom  0 06-16 14:41 cleaner-offset-checkpoint
   -rw-r--r-- 1 vakom vakom  0 06-16 14:41 replication-offset-checkpoint
   -rw-r--r-- 1 vakom vakom 94 06-16 14:41 meta.properties
   -rw-r--r-- 1 va

[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594625865

   This is perfect. I was trying a similar approach but somehow my `it.tasks` 
wasn't returning the `processMessages`?! 
   
   Can you please verify that it is working fine by running `./gradlew rat 
--dry-run`? It should print the processMessages tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-16 Thread via GitHub


showuon commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1594624985

   Failed tests are unrelated and the failed `testReplication` also fail in 
trunk build. I've identified it's caused by this change: 
https://github.com/apache/kafka/pull/13838 . Go ahead to merge it.
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.AddPartitionsTest.testIncrementPartitions(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplication()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


dajac commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594618113

   @divijvaidya I just push another approach. Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-06-16 Thread via GitHub


viktorsomogyi commented on PR #13696:
URL: https://github.com/apache/kafka/pull/13696#issuecomment-1594603126

   @hudeqi I can get to this early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-06-16 Thread via GitHub


viktorsomogyi commented on code in PR #13421:
URL: https://github.com/apache/kafka/pull/13421#discussion_r1232168660


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -97,6 +97,16 @@ class ReplicaAlterLogDirsThread(name: String,
 }
   }
 
+  // For ReplicaAlterLogDirsThread, if the partition is marked as failed due 
to an unknown exception and the partition fetch
+  // is suspended, the paused cleanup logic of the partition needs to be 
canceled, otherwise it will lead to serious unexpected
+  // disk usage growth.

Review Comment:
   ```suggestion
 // For ReplicaAlterLogDirsThread, if the future partition is marked as 
failed due to an unknown exception and the partition fetch
 // is suspended, the paused cleanup logic of the current partition needs 
to be resumed, otherwise it can lead to unexpected
 // disk usage growth.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi merged pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-16 Thread via GitHub


viktorsomogyi merged PR #13719:
URL: https://github.com/apache/kafka/pull/13719


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594554215

   I just encountered another idea.
   
   Just put `rat.dependsOn compileJava` at 
https://github.com/apache/kafka/blob/e1d59920f4fed5c4bc890e53e249d3439e148bab/build.gradle#L211
   
   You can verify the dependency using:
   ```
   $ ./gradlew rat --dry-run 
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id cabd6d2e) using Gradle 
8.1.1, Java 17 and Scala 2.13.10
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   :compileJava SKIPPED
   :rat SKIPPED
   ```
   
   and compileJava depends on processMessages using
   ```
   $ ./gradlew compileJava --dry-run | grep processMessages
   :clients:processMessages SKIPPED
   :core:processMessages SKIPPED
   :group-coordinator:processMessages SKIPPED
   :metadata:processMessages SKIPPED
   :raft:processMessages SKIPPED
   :storage:processMessages SKIPPED
   :streams:processMessages SKIPPED
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lvillaca commented on pull request #12358: KAFKA-13988:Fix mm2 auto.offset.reset=latest not working

2023-06-16 Thread via GitHub


lvillaca commented on PR #12358:
URL: https://github.com/apache/kafka/pull/12358#issuecomment-1594475316

   > @yuz10 are you able to ping someone to get this moving forward? This must 
be blocking a lot of people of upgrading existing installs.
   > 
   > Thanks.
   
   Our project is also interested in this fix, it's a major issue for us as 
with the legacy version (for which this feature worked fine) we could not 
propagate offsets. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13854: MINOR: rat should depend on processMessages task

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13854:
URL: https://github.com/apache/kafka/pull/13854#issuecomment-1594472639

   This problem is annoying and real. Thank you for starting a PR to fix this. 
Another example build that fails with similar problems: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13676/5/pipeline
 
   
   However, I am wondering if an alternative way to do it is:
   
   Could we create a new root level task called `processMessages` which will 
dependOn the following tasks:
   ```
   :clients:processMessages
   :core:processMessages
   :group-coordinator:processMessages
   :metadata:processMessages
   :raft:processMessages
   :storage:processMessages
   :streams:processMessages
   ```
   and then, `check` depends on `processMessages`
   and then, `rat` already has a dependency on `check`
   
   This will ensure that rat will depend on any future new subtasks for 
`processMessages`. Note that the problem being solved in this PR is slightly 
different from what was solved in https://github.com/apache/kafka/pull/13316 
because over there we wanted each subtask.srcJar to depend on `processMessages` 
but now root is a root level task, hence, it can have a root level dependency 
on each subtasks's `:processMessages` separately.
   
   Thoughts?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-16 Thread Elkhan Eminov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733444#comment-17733444
 ] 

Elkhan Eminov commented on KAFKA-14112:
---

[~viktorsomogyi] is this this actual? This went out of my radar for a while but 
if the change has not been yet contributed back by cldr (as I can see, it's not 
in the trunk), I'd like to resume working on it

> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's source 
> offset and the end offset of the source.
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14112) Expose replication-offset-lag Mirror metric

2023-06-16 Thread Elkhan Eminov (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14112 ]


Elkhan Eminov deleted comment on KAFKA-14112:
---

was (Author: JIRAUSER285952):
Hey [~viktorsomogyi], this is WIP, I'll submit a PR soon

> Expose replication-offset-lag Mirror metric
> ---
>
> Key: KAFKA-14112
> URL: https://issues.apache.org/jira/browse/KAFKA-14112
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Elkhan Eminov
>Assignee: Elkhan Eminov
>Priority: Minor
>
> The offset lag is the difference of the last replicated record's source 
> offset and the end offset of the source.
> The offset lag is a difference (LRO-LEO), but its constituents calculated at 
> different points of time and place
>  * LEO shall be calculated during source task's poll loop (ready to get it 
> from the consumer)
>  * LRO shall be kept in an in-memory "cache", that is updated during the 
> task's producer callback
> LRO is initialized when task is started, from the offset store. The 
> difference shall be calculated when the freshest LEO acquired
> in the poll loop. The calculated amount shall be defined as a MirrorMaker 
> metric.
> This would describe to amount of "to be replicated" number of records for a 
> certain topic-partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi merged pull request #13819: KAFKA-15059: Remove pending rebalance check when fencing zombie source connector tasks

2023-06-16 Thread via GitHub


viktorsomogyi merged PR #13819:
URL: https://github.com/apache/kafka/pull/13819


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts

2023-06-16 Thread Kumar Shivendu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17733434#comment-17733434
 ] 

Kumar Shivendu commented on KAFKA-15050:


Nice! I'll pick up something else then. 

> Prompts in the quickstarts
> --
>
> Key: KAFKA-15050
> URL: https://issues.apache.org/jira/browse/KAFKA-15050
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Priority: Trivial
>  Labels: newbie
>
> In the quickstarts [Steps 
> 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate 
> the command prompt. When we start to use Kafka Connect in [Step 
> 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch 
> to {{{}>{}}}. The [Kafka Streams 
> quickstart|https://kafka.apache.org/documentation/streams/quickstart] also 
> uses {{{}>{}}}. I don't think there's a reason for this, but if there is one 
> (root vs user account?) it should be explained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-16 Thread via GitHub


dajac commented on PR #13719:
URL: https://github.com/apache/kafka/pull/13719#issuecomment-1594414022

   @viktorsomogyi Nope. LGTM. Feel free to merge it. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13719: MINOR:Fix illogical log in fetchOffsetAndTruncate method

2023-06-16 Thread via GitHub


viktorsomogyi commented on PR #13719:
URL: https://github.com/apache/kafka/pull/13719#issuecomment-1594411657

   @dajac do you have other comments or can I merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13321: MINOR: added compatibility for MinGW

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1594389639

   Thank you @maniekes. MINGW64 looks good. Can you also please provide test 
details for MSYS?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


divijvaidya commented on PR #13850:
URL: https://github.com/apache/kafka/pull/13850#issuecomment-1594369804

   Thank you for looking into this @satishd 
   
   > But we observed an impact when there is a degradation in our remote 
storage(HDFS) clusters. 
   
   Yes, you are right, the impact of this lock contention is felt in cases when 
fetching from RSM is slow. We observed something similar.
   
   > Another way is to change the current LinkedHashMap to a synchronized map 
and use a slot based lock instead of a global lock. This workaround is less 
performant than Caffiene but it does not introduce a new dependency.
   
   I understand what you are saying. I did consider that approach. So that we 
are on the same page, the approach looks like:
   ```
   // Thread safe data structure that holds the locks per entry
   // Keys from this data structure are removed when a 
   val entryLockMap = ConcurrentHashMap[Entry, Lock]()
   
   // Internal LRU cache based on LinkedHashMap. LinkedHashMap is made thread 
safe by synchronising all its operations by using SynchronizedCache (from 
org.apache.kafka.common.cache)
   val internalCache = SynchronizedCache(new LinkedHashMap())
   
   getEntry(key)) {
   // return key is available in cache
   if (internalCache.contains(key)) {
   return internalCache.get(key)
   }
   
   // we have to update the entry in the cache. we will acquire a lock on 
entry, fetch it and then update in cache.
   createEntryLock in entryLockMap if not present
   
   // acquire exclusive lock on entry
   entryLockMap.get(key).lock()
   
   try {
   // after acquiring lock, check again for presence of key in cache, 
another thread may have updated it while 
   // this thread was waiting for lock
   if (internalCache.contains(key)) {
   return internalCache.get(key)
   } 
   val entry = fetchFromRSM(key)
   // no need to acquire a lock in internal cache since get/put methods 
on this cache are synchronized
   internalCache.put(entry)
   } finally {
   release entry lock
   }
   }
   ```
   The advantages of this approach are:
   1. No external dependency
   2. Same pattern of cache used in other places in the Kafka code (see 
org.apache.kafka.common.cache.SynchronizedCache). But that code was written in 
2015 when perhaps better alternatives didn't exist at that time.
   
   The disadvantages of this approach are:
   1. Cache metrics will have to be completed manually
   2. Cache eviction is done synchronously and hence, it holds the global cache 
lock. 
   3. In the absence of a lock that ensures fairness, when we synchronise the 
global cache, a fetch thread may get starved leading to high tail latencies. 
Such a scenario will occur when we have a high get() operation load and other 
threads for get() are getting prioritized by the lock ahead of the put() 
thread. That is why I am not a big fan of using data structures with coarse 
grained global locks.
   
   Caffeine on the other hand doesn't have any of the disadvantage mentioned 
above, is used in high throughput low latency systems like Cassandra and has a 
constant release cadence with great support. Hence, I believe that using 
Caffeine is the right choice here. (I know that you already agreed to this but 
adding more context here for other readers of the PR)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1231951562


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -37,88 +40,125 @@ object RemoteIndexCache {
   val TmpFileSuffix = ".tmp"
 }
 
-class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) {
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) extends AutoCloseable {
   private var markedForCleanup: Boolean = false
-  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
+  private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   def lookupOffset(targetOffset: Long): OffsetPosition = {
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
   else offsetIndex.lookup(targetOffset)
 }
   }
 
   def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = 
{
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
-
   val timestampOffset = timeIndex.lookup(timestamp)
   offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
 }
   }
 
   def markForCleanup(): Unit = {
-CoreUtils.inLock(lock.writeLock()) {
+inWriteLock(entryLock) {
   if (!markedForCleanup) {
 markedForCleanup = true
 Array(offsetIndex, timeIndex).foreach(index =>
   index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", 
LogFileUtils.DELETED_FILE_SUFFIX
+// txn index needs to be renamed separately since it's not of type 
AbstractIndex
 txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, 
"",
   LogFileUtils.DELETED_FILE_SUFFIX)))
   }
 }
   }
 
+  /**
+   * Deletes the index files from the disk. Invoking #close is not required 
prior to this function.
+   */
   def cleanup(): Unit = {
 markForCleanup()
 CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => 
timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
   }
 
+  /**
+   * Calls the underlying close method for each index which may lead to 
releasing resources such as mmap.
+   * This function does not delete the index files.
+   */
   def close(): Unit = {
-Array(offsetIndex, timeIndex).foreach(index => try {
-  index.close()
-} catch {
-  case _: Exception => // ignore error.
-})
+Utils.closeQuietly(offsetIndex, "Closing the offset index.")
+Utils.closeQuietly(timeIndex, "Closing the time index.")
 Utils.closeQuietly(txnIndex, "Closing the transaction index.")
   }
 }
 
 /**
  * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
- * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call. The cache is
+ * re-initialized from the index files on disk on startup, if the index files 
are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that this cache is not strictly based on a LRU policy. It is based on 
the default implementation of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
  *
  * @param maxSize  maximum number of segment index entries to be 
cached.
  * @param remoteStorageManager RemoteStorageManager instance, to be used in 
fetching indexes.
  * @param logDir   log directory
  */
+@threadsafe
 class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: 
RemoteStorageManager, logDir: String)
-  extends Logging with Closeable {
-
-  val cacheDir = new File(logDir, DirName)
-  @volatile var closed = false
-
-  val expiredIndexes = new LinkedBlockingQueue[Entry]()
-  val lock = new Object()
-
-  val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, 
Entry](maxSize / 2,
-0.75f, true) {
-override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): 
Boolean = {
-  if (this.size() > maxSize) {
-val entry = eldest.getValue
-// Mark the entries for cleanup, background thread will clean them 
later.
-entry.markForCleanup()
-expiredIndexes.add(entry)
-true
-  } else {
-false
-  }
-}
-  }
+  extends Logging with AutoCloseable {
+  /**
+   * Directory where the index files will be stored on disk.
+   */
+  private val cache

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1231937722


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -37,88 +40,125 @@ object RemoteIndexCache {
   val TmpFileSuffix = ".tmp"
 }
 
-class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) {
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) extends AutoCloseable {
   private var markedForCleanup: Boolean = false
-  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
+  private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   def lookupOffset(targetOffset: Long): OffsetPosition = {
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
   else offsetIndex.lookup(targetOffset)
 }
   }
 
   def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = 
{
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
-
   val timestampOffset = timeIndex.lookup(timestamp)
   offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
 }
   }
 
   def markForCleanup(): Unit = {
-CoreUtils.inLock(lock.writeLock()) {
+inWriteLock(entryLock) {
   if (!markedForCleanup) {
 markedForCleanup = true
 Array(offsetIndex, timeIndex).foreach(index =>
   index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", 
LogFileUtils.DELETED_FILE_SUFFIX
+// txn index needs to be renamed separately since it's not of type 
AbstractIndex
 txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, 
"",
   LogFileUtils.DELETED_FILE_SUFFIX)))
   }
 }
   }
 
+  /**
+   * Deletes the index files from the disk. Invoking #close is not required 
prior to this function.
+   */
   def cleanup(): Unit = {
 markForCleanup()
 CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => 
timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
   }
 
+  /**
+   * Calls the underlying close method for each index which may lead to 
releasing resources such as mmap.
+   * This function does not delete the index files.
+   */
   def close(): Unit = {
-Array(offsetIndex, timeIndex).foreach(index => try {
-  index.close()
-} catch {
-  case _: Exception => // ignore error.
-})
+Utils.closeQuietly(offsetIndex, "Closing the offset index.")
+Utils.closeQuietly(timeIndex, "Closing the time index.")
 Utils.closeQuietly(txnIndex, "Closing the transaction index.")
   }
 }
 
 /**
  * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
- * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call. The cache is
+ * re-initialized from the index files on disk on startup, if the index files 
are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that this cache is not strictly based on a LRU policy. It is based on 
the default implementation of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
  *
  * @param maxSize  maximum number of segment index entries to be 
cached.
  * @param remoteStorageManager RemoteStorageManager instance, to be used in 
fetching indexes.
  * @param logDir   log directory
  */
+@threadsafe
 class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: 
RemoteStorageManager, logDir: String)
-  extends Logging with Closeable {
-
-  val cacheDir = new File(logDir, DirName)
-  @volatile var closed = false
-
-  val expiredIndexes = new LinkedBlockingQueue[Entry]()
-  val lock = new Object()
-
-  val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, 
Entry](maxSize / 2,
-0.75f, true) {
-override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): 
Boolean = {
-  if (this.size() > maxSize) {
-val entry = eldest.getValue
-// Mark the entries for cleanup, background thread will clean them 
later.
-entry.markForCleanup()
-expiredIndexes.add(entry)
-true
-  } else {
-false
-  }
-}
-  }
+  extends Logging with AutoCloseable {
+  /**
+   * Directory where the index files will be stored on disk.
+   */
+  private val cache

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1231938032


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -170,44 +190,173 @@ class RemoteIndexCacheTest {
 assertThrows(classOf[IllegalStateException], () => 
cache.getIndexEntry(metadataList.head))
   }
 
+  @Test
+  def testCloseIsIdempotent(): Unit = {
+val spyCleanerThread = spy(cache.cleanerThread)
+cache.cleanerThread = spyCleanerThread
+cache.close()
+cache.close()
+// verify that cleanup is only called once
+verify(spyCleanerThread).initiateShutdown()
+  }
+
+  @Test
+  def testClose(): Unit = {
+val spyInternalCache = spy(cache.internalCache)
+val spyCleanerThread = spy(cache.cleanerThread)
+
+// replace with new spy cache
+cache.internalCache = spyInternalCache
+cache.cleanerThread = spyCleanerThread
+
+// use the cache
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+val entry = cache.getIndexEntry(metadataList.head)
+
+val spyTxnIndex = spy(entry.txnIndex)
+val spyOffsetIndex = spy(entry.offsetIndex)
+val spyTimeIndex = spy(entry.timeIndex)
+// remove this entry and replace with spied entry
+cache.internalCache.invalidateAll()
+cache.internalCache.put(metadataList.head.remoteLogSegmentId().id(), new 
Entry(spyOffsetIndex, spyTimeIndex, spyTxnIndex))
+
+// close the cache
+cache.close()
+
+// cleaner thread should be closed properly
+verify(spyCleanerThread).initiateShutdown()
+verify(spyCleanerThread).awaitShutdown()
+
+// close for all index entries must be invoked
+verify(spyTxnIndex).close()
+verify(spyOffsetIndex).close()
+verify(spyTimeIndex).close()
+
+// index files must not be deleted
+verify(spyTxnIndex, times(0)).deleteIfExists()
+verify(spyOffsetIndex, times(0)).deleteIfExists()
+verify(spyTimeIndex, times(0)).deleteIfExists()
+  }
+
+  @Test
+  def testConcurrentReadWriteAccessForCache(): Unit = {
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, 
IndexType.TIMESTAMP))
+reset(rsm)
+
+// Simulate a concurrency situation where one thread is reading the entry 
already present in the cache (cache hit)
+// and the other thread is reading an entry which is not available in the 
cache (cache miss). The expected behaviour
+// is for the former thread to succeed while latter is fetching from rsm.
+// In this this test we simulate the situation using latches. We perform 
the following operations:
+// 1. Start the CacheMiss thread and wait until it starts executing the 
rsm.fetchIndex
+// 2. Block the CacheMiss thread inside the call to rsm.fetchIndex.
+// 3. Start the CacheHit thread. Assert that it performs a successful read.
+// 4. On completion of successful read by CacheHit thread, signal the 
CacheMiss thread to release it's block.
+// 5. Validate that the test passes. If the CacheMiss thread was blocking 
the CacheHit thread, the test will fail.
+//
+val latchForCacheHit = new CountDownLatch(1)
+val latchForCacheMiss = new CountDownLatch(1)
+
+val readerCacheHit = (() => {
+  // Wait for signal to start executing the read
+  logger.debug(s"Waiting for signal to begin read from 
${Thread.currentThread()}")
+  latchForCacheHit.await()
+  val entry = cache.getIndexEntry(metadataList.head)
+  assertNotNull(entry)
+  // Signal the CacheMiss to unblock itself
+  logger.debug(s"Signaling CacheMiss to unblock from 
${Thread.currentThread()}")
+  latchForCacheMiss.countDown()
+}): Runnable
+
+when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+  .thenAnswer(_ => {
+logger.debug(s"Signaling CacheHit to begin read from 
${Thread.currentThread()}")
+latchForCacheHit.countDown()
+logger.debug("Waiting for signal to complete rsm fetch from" + 
Thread.currentThread())
+latchForCacheMiss.await()
+  })
+
+val readerCacheMiss = (() => {
+  val entry = cache.getIndexEntry(metadataList.last)
+  assertNotNull(entry)
+}): Runnable
+
+val executor = Executors.newFixedThreadPool(2)
+try {
+  executor.submit(readerCacheMiss: Runnable)
+  executor.submit(readerCacheHit: Runnable)
+  assertTrue(latchForCacheMiss.await(30, TimeUnit.SECONDS))
+} finally {
+  executor.shutdownNow()
+}
+  }
+
   @Test
   def testReloadCacheAfterClose(): Unit = {
-val cache = new RemoteIndexCache(maxSi

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1231931509


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -167,14 +210,14 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {

Review Comment:
   We need it as var for unit tests where we want to override this with a spy 
implementation so that we can verify the invocations. I can alternatively add a 
setter for this thread which could be used in unit tests but preferred using 
var to be simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13850: KAFKA-15084: Remove lock contention from RemoteIndexCache

2023-06-16 Thread via GitHub


satishd commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1231867641


##
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##
@@ -37,88 +40,125 @@ object RemoteIndexCache {
   val TmpFileSuffix = ".tmp"
 }
 
-class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) {
+class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val 
txnIndex: TransactionIndex) extends AutoCloseable {
   private var markedForCleanup: Boolean = false
-  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
+  private val entryLock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
 
   def lookupOffset(targetOffset: Long): OffsetPosition = {
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
   else offsetIndex.lookup(targetOffset)
 }
   }
 
   def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = 
{
-CoreUtils.inLock(lock.readLock()) {
+inReadLock(entryLock) {
   if (markedForCleanup) throw new IllegalStateException("This entry is 
marked for cleanup")
-
   val timestampOffset = timeIndex.lookup(timestamp)
   offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
 }
   }
 
   def markForCleanup(): Unit = {
-CoreUtils.inLock(lock.writeLock()) {
+inWriteLock(entryLock) {
   if (!markedForCleanup) {
 markedForCleanup = true
 Array(offsetIndex, timeIndex).foreach(index =>
   index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", 
LogFileUtils.DELETED_FILE_SUFFIX
+// txn index needs to be renamed separately since it's not of type 
AbstractIndex
 txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, 
"",
   LogFileUtils.DELETED_FILE_SUFFIX)))
   }
 }
   }
 
+  /**
+   * Deletes the index files from the disk. Invoking #close is not required 
prior to this function.
+   */
   def cleanup(): Unit = {
 markForCleanup()
 CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => 
timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
   }
 
+  /**
+   * Calls the underlying close method for each index which may lead to 
releasing resources such as mmap.
+   * This function does not delete the index files.
+   */
   def close(): Unit = {
-Array(offsetIndex, timeIndex).foreach(index => try {
-  index.close()
-} catch {
-  case _: Exception => // ignore error.
-})
+Utils.closeQuietly(offsetIndex, "Closing the offset index.")
+Utils.closeQuietly(timeIndex, "Closing the time index.")
 Utils.closeQuietly(txnIndex, "Closing the transaction index.")
   }
 }
 
 /**
  * This is a LRU cache of remote index files stored in 
`$logdir/remote-log-index-cache`. This is helpful to avoid
- * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call.
+ * re-fetching the index files like offset, time indexes from the remote 
storage for every fetch call. The cache is
+ * re-initialized from the index files on disk on startup, if the index files 
are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that this cache is not strictly based on a LRU policy. It is based on 
the default implementation of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency";>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
  *
  * @param maxSize  maximum number of segment index entries to be 
cached.
  * @param remoteStorageManager RemoteStorageManager instance, to be used in 
fetching indexes.
  * @param logDir   log directory
  */
+@threadsafe
 class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: 
RemoteStorageManager, logDir: String)
-  extends Logging with Closeable {
-
-  val cacheDir = new File(logDir, DirName)
-  @volatile var closed = false
-
-  val expiredIndexes = new LinkedBlockingQueue[Entry]()
-  val lock = new Object()
-
-  val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, 
Entry](maxSize / 2,
-0.75f, true) {
-override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): 
Boolean = {
-  if (this.size() > maxSize) {
-val entry = eldest.getValue
-// Mark the entries for cleanup, background thread will clean them 
later.
-entry.markForCleanup()
-expiredIndexes.add(entry)
-true
-  } else {
-false
-  }
-}
-  }
+  extends Logging with AutoCloseable {
+  /**
+   * Directory where the index files will be stored on disk.
+   */
+  private val cacheDir 

[GitHub] [kafka] jlprat commented on pull request #13824: MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde

2023-06-16 Thread via GitHub


jlprat commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-1594214535

   Hi @satishd or @showuon any opinions on this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org