[ https://issues.apache.org/jira/browse/KAFKA-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Brutschy reassigned KAFKA-19280: -------------------------------------- Assignee: Lucas Brutschy > NoSuchElementException in UnifiedLog.fetchLastStableOffsetMetadata > ------------------------------------------------------------------ > > Key: KAFKA-19280 > URL: https://issues.apache.org/jira/browse/KAFKA-19280 > Project: Kafka > Issue Type: Bug > Affects Versions: 4.1.0 > Reporter: Lucas Brutschy > Assignee: Lucas Brutschy > Priority: Major > > In {{FETCH}} requests and {{TXN_OFFSET_COMMIT}} requests, on current trunk we > run into a race condition inside {{{}UnifiedLog{}}}: > > TxnOffsetCommit: > > {code:java} > [2025-05-13 22:21:39,627] ERROR [GroupCoordinator id=7061] Operation > txn-commit-offset with > TxnOffsetCommitRequestData(transactionalId='stream-soak-test-f3787ccf-b34d-4215-b51a-6ebd15406537-3', > groupId='stream-soak-test', producerId=2001, producerEpoch=25732, > generationId=39, memberId='fD-cjsEQQQeTsn1poFtVAw', > groupInstanceId='ip-172-31-14-0.us-west-2.compute.internal-3', > topics=[TxnOffsetCommitRequestTopic(name='windowed-node-counts', > partitions=[TxnOffsetCommitRequestPartition(partitionIndex=2, > committedOffset=10144990, committedLeaderEpoch=-1, > committedMetadata='AgAAAZbLvT4a')]), > TxnOffsetCommitRequestTopic(name='logs.syslog', > partitions=[TxnOffsetCommitRequestPartition(partitionIndex=0, > committedOffset=794827, committedLeaderEpoch=0, > committedMetadata='AgAAAZbLvT9a')]), > TxnOffsetCommitRequestTopic(name='stream-soak-test-node-name-repartition', > partitions=[TxnOffsetCommitRequestPartition(partitionIndex=2, > committedOffset=852193, committedLeaderEpoch=0, > committedMetadata='AgAAAZbLvT6S')])]) hit an unexpected exception: > java.util.NoSuchElementException: No value present. > (org.apache.kafka.coordinator.group.GroupCoordinatorService) > java.util.concurrent.CompletionException: java.util.NoSuchElementException: > No value present > at > java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1097) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) > ~[?:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.complete(CoordinatorRuntime.java:1439) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$DeferredEventCollection.complete(CoordinatorRuntime.java:1184) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.failCurrentBatch(CoordinatorRuntime.java:844) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.flushCurrentBatch(CoordinatorRuntime.java:819) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.maybeFlushCurrentBatch(CoordinatorRuntime.java:832) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.append(CoordinatorRuntime.java:1077) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:1395) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:2151) > [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:1390) > [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:147) > [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:179) > [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > Caused by: java.util.NoSuchElementException: No value present > at java.base/java.util.Optional.get(Optional.java:143) ~[?:?] > at > org.apache.kafka.storage.internals.log.UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:650) > ~[kafka-storage-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.storage.internals.log.UnifiedLog.fetchOffsetSnapshot(UnifiedLog.java:691) > ~[kafka-storage-4.1.0-c60c83aaba.jar:?] > at > kafka.cluster.Partition.$anonfun$fetchOffsetSnapshot$1(Partition.scala:1640) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1637) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:86) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) > ~[scala-library-2.13.15.jar:?] > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) > ~[scala-library-2.13.15.jar:?] > at scala.collection.AbstractIterable.foreach(Iterable.scala:935) > ~[scala-library-2.13.15.jar:?] > at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:137) > ~[kafka-server-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:335) > ~[kafka-server-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:186) > ~[kafka-server-common-4.1.0-c60c83aaba.jar:?] > at > kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:949) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at scala.collection.immutable.Map$Map1.foreach(Map.scala:278) > ~[scala-library-2.13.15.jar:?] > at > kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:937) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.coordinator.group.CoordinatorPartitionWriter$$anon$1.add(CoordinatorPartitionWriter.scala:67) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.ReplicaManager.addCompletePurgatoryAction(ReplicaManager.scala:937) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.ReplicaManager.appendRecordsToLeader(ReplicaManager.scala:693) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.coordinator.group.CoordinatorPartitionWriter.append(CoordinatorPartitionWriter.scala:149) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.flushCurrentBatch(CoordinatorRuntime.java:788) > ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?] > ... 7 more{code} > > > > Fetch: > > {code:java} > [2025-05-13 22:09:25,296] ERROR [KafkaApi-1107] Unexpected error handling > request RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=broker-7061-fetcher-0, correlationId=1803880, headerVersion=2) – > FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=7061, replicaEpoch=32), maxWaitMs=500, > minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=2088504203, > sessionEpoch=62020, topics=[FetchTopic(topic='', > topicId=mv_Gf9ZRR2iI2Ipw7wD8Yw, partitions=[FetchPartition(partition=1, > currentLeaderEpoch=0, fetchOffset=626722, lastFetchedEpoch=0, > logStartOffset=0, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), FetchTopic(topic='', > topicId=gN7x55dzSdON4zENjOh6lg, partitions=[FetchPartition(partition=0, > currentLeaderEpoch=0, fetchOffset=795768, lastFetchedEpoch=0, > logStartOffset=793733, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), FetchTopic(topic='', > topicId=W7B2AdLbSfa6Yxc86A0q_A, partitions=[FetchPartition(partition=2, > currentLeaderEpoch=0, fetchOffset=709470, lastFetchedEpoch=0, > logStartOffset=0, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), FetchTopic(topic='', > topicId=kS6Qta42QU-VQUoT0oBKWw, partitions=[FetchPartition(partition=2, > currentLeaderEpoch=0, fetchOffset=795369, lastFetchedEpoch=0, > logStartOffset=794609, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), FetchTopic(topic='', > topicId=GFuXy3UOSaKBKv4y4CFJ5g, partitions=[FetchPartition(partition=1, > currentLeaderEpoch=0, fetchOffset=626722, lastFetchedEpoch=0, > logStartOffset=0, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], > rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, > apiVersion=17, clientId=broker-7061-fetcher-0, correlationId=1803880, > headerVersion=2), connectionId='172.31.2.41:9092-172.31.13.202:48758-2-64', > clientAddress=/172.31.13.202, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=unknown, > softwareVersion=unknown), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@43ebd30b]) > (kafka.server.KafkaApis) > java.util.NoSuchElementException: No value present > at java.base/java.util.Optional.get(Optional.java:143) ~[?:?] > at > org.apache.kafka.storage.internals.log.UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:651) > ~[kafka-storage-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.storage.internals.log.UnifiedLog.fetchOffsetSnapshot(UnifiedLog.java:691) > ~[kafka-storage-4.1.0-c60c83aaba.jar:?] > at > kafka.cluster.Partition.$anonfun$fetchOffsetSnapshot$1(Partition.scala:1640) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1637) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:86) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) > ~[scala-library-2.13.15.jar:?] > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) > ~[scala-library-2.13.15.jar:?] > at scala.collection.AbstractIterable.foreach(Iterable.scala:935) > ~[scala-library-2.13.15.jar:?] > at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.server.purgatory.DelayedOperation.safeTryCompleteOrElse(DelayedOperation.java:123) > ~[kafka-server-common-4.1.0-c60c83aaba.jar:?] > at > org.apache.kafka.server.purgatory.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperationPurgatory.java:148) > ~[kafka-server-common-4.1.0-c60c83aaba.jar:?] > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1768) > ~[kafka_2.13-4.1.0-c60c83aaba.jar:?] > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:763) > [kafka_2.13-4.1.0-c60c83aaba.jar:?] > at kafka.server.KafkaApis.handle(KafkaApis.scala:169) > [kafka_2.13-4.1.0-c60c83aaba.jar:?] > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158) > [kafka_2.13-4.1.0-c60c83aaba.jar:?] > at java.base/java.lang.Thread.run(Thread.java:840) [?:?]{code} > > > > [~dajac] pointed out that a likely cause is the following code-line: > > {code:java} > if (firstUnstableOffsetMetadata.isPresent() && > firstUnstableOffsetMetadata.get().messageOffset < > highWatermarkMetadata.messageOffset) {{code} > > Which is not safe since `firstUnstableOffsetMetadata` is volatile. > > The line was introduced in > [https://github.com/apache/kafka/commit/759fbbba8b977206ad189172f429acb97e98a798] > cc [~mimaison] > -- This message was sent by Atlassian Jira (v8.20.10#820010)