Lucas Brutschy created KAFKA-19280:
--------------------------------------

             Summary: NoSuchElementException in 
UnifiedLog.fetchLastStableOffsetMetadata
                 Key: KAFKA-19280
                 URL: https://issues.apache.org/jira/browse/KAFKA-19280
             Project: Kafka
          Issue Type: Bug
            Reporter: Lucas Brutschy


In FETCH requests and TXN_OFFSET_COMMIT requests, on current trunk we run into 
a race condition inside `UnifiedLog`:

 

TxnOffsetCommit:

```

[2025-05-13 22:21:39,627] ERROR [GroupCoordinator id=7061] Operation 
txn-commit-offset with 
TxnOffsetCommitRequestData(transactionalId='stream-soak-test-f3787ccf-b34d-4215-b51a-6ebd15406537-3',
 groupId='stream-soak-test', producerId=2001, producerEpoch=25732, 
generationId=39, memberId='fD-cjsEQQQeTsn1poFtVAw', 
groupInstanceId='ip-172-31-14-0.us-west-2.compute.internal-3', 
topics=[TxnOffsetCommitRequestTopic(name='windowed-node-counts', 
partitions=[TxnOffsetCommitRequestPartition(partitionIndex=2, 
committedOffset=10144990, committedLeaderEpoch=-1, 
committedMetadata='AgAAAZbLvT4a')]), 
TxnOffsetCommitRequestTopic(name='logs.syslog', 
partitions=[TxnOffsetCommitRequestPartition(partitionIndex=0, 
committedOffset=794827, committedLeaderEpoch=0, 
committedMetadata='AgAAAZbLvT9a')]), 
TxnOffsetCommitRequestTopic(name='stream-soak-test-node-name-repartition', 
partitions=[TxnOffsetCommitRequestPartition(partitionIndex=2, 
committedOffset=852193, committedLeaderEpoch=0, 
committedMetadata='AgAAAZbLvT6S')])]) hit an unexpected exception: 
java.util.NoSuchElementException: No value present. 
(org.apache.kafka.coordinator.group.GroupCoordinatorService)
java.util.concurrent.CompletionException: java.util.NoSuchElementException: No 
value present
    at 
java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
 ~[?:?]
    at 
java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
 ~[?:?]
    at 
java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1097)
 ~[?:?]
    at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
 ~[?:?]
    at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
 ~[?:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.complete(CoordinatorRuntime.java:1439)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$DeferredEventCollection.complete(CoordinatorRuntime.java:1184)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.failCurrentBatch(CoordinatorRuntime.java:844)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.flushCurrentBatch(CoordinatorRuntime.java:819)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.maybeFlushCurrentBatch(CoordinatorRuntime.java:832)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.append(CoordinatorRuntime.java:1077)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:1395)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:2151)
 [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:1390)
 [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:147)
 [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:179)
 [kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
Caused by: java.util.NoSuchElementException: No value present
    at java.base/java.util.Optional.get(Optional.java:143) ~[?:?]
    at 
org.apache.kafka.storage.internals.log.UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:650)
 ~[kafka-storage-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.storage.internals.log.UnifiedLog.fetchOffsetSnapshot(UnifiedLog.java:691)
 ~[kafka-storage-4.1.0-c60c83aaba.jar:?]
    at 
kafka.cluster.Partition.$anonfun$fetchOffsetSnapshot$1(Partition.scala:1640) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1637) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:86) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) 
~[scala-library-2.13.15.jar:?]
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) 
~[scala-library-2.13.15.jar:?]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:935) 
~[scala-library-2.13.15.jar:?]
    at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:137)
 ~[kafka-server-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:335)
 ~[kafka-server-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:186)
 ~[kafka-server-common-4.1.0-c60c83aaba.jar:?]
    at 
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:949)
 ~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:278) 
~[scala-library-2.13.15.jar:?]
    at 
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:937)
 ~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.coordinator.group.CoordinatorPartitionWriter$$anon$1.add(CoordinatorPartitionWriter.scala:67)
 ~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.server.ReplicaManager.addCompletePurgatoryAction(ReplicaManager.scala:937)
 ~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.server.ReplicaManager.appendRecordsToLeader(ReplicaManager.scala:693) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.coordinator.group.CoordinatorPartitionWriter.append(CoordinatorPartitionWriter.scala:149)
 ~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime$CoordinatorContext.flushCurrentBatch(CoordinatorRuntime.java:788)
 ~[kafka-coordinator-common-4.1.0-c60c83aaba.jar:?]
    ... 7 more

```

 

Fetch:

```

[2025-05-13 22:09:25,296] ERROR [KafkaApi-1107] Unexpected error handling 
request RequestHeader(apiKey=FETCH, apiVersion=17, 
clientId=broker-7061-fetcher-0, correlationId=1803880, headerVersion=2) -- 
FetchRequestData(clusterId=null, replicaId=-1, 
replicaState=ReplicaState(replicaId=7061, replicaEpoch=32), maxWaitMs=500, 
minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=2088504203, 
sessionEpoch=62020, topics=[FetchTopic(topic='', 
topicId=mv_Gf9ZRR2iI2Ipw7wD8Yw, partitions=[FetchPartition(partition=1, 
currentLeaderEpoch=0, fetchOffset=626722, lastFetchedEpoch=0, logStartOffset=0, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), 
FetchTopic(topic='', topicId=gN7x55dzSdON4zENjOh6lg, 
partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, 
fetchOffset=795768, lastFetchedEpoch=0, logStartOffset=793733, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), 
FetchTopic(topic='', topicId=W7B2AdLbSfa6Yxc86A0q_A, 
partitions=[FetchPartition(partition=2, currentLeaderEpoch=0, 
fetchOffset=709470, lastFetchedEpoch=0, logStartOffset=0, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), 
FetchTopic(topic='', topicId=kS6Qta42QU-VQUoT0oBKWw, 
partitions=[FetchPartition(partition=2, currentLeaderEpoch=0, 
fetchOffset=795369, lastFetchedEpoch=0, logStartOffset=794609, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)]), 
FetchTopic(topic='', topicId=GFuXy3UOSaKBKv4y4CFJ5g, 
partitions=[FetchPartition(partition=1, currentLeaderEpoch=0, 
fetchOffset=626722, lastFetchedEpoch=0, logStartOffset=0, 
partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], 
forgottenTopicsData=[], rackId='') with context 
RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, 
clientId=broker-7061-fetcher-0, correlationId=1803880, headerVersion=2), 
connectionId='172.31.2.41:9092-172.31.13.202:48758-2-64', 
clientAddress=/172.31.13.202, principal=User:ANONYMOUS, 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=unknown, 
softwareVersion=unknown), fromPrivilegedListener=true, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@43ebd30b])
 (kafka.server.KafkaApis)
java.util.NoSuchElementException: No value present
    at java.base/java.util.Optional.get(Optional.java:143) ~[?:?]
    at 
org.apache.kafka.storage.internals.log.UnifiedLog.fetchLastStableOffsetMetadata(UnifiedLog.java:651)
 ~[kafka-storage-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.storage.internals.log.UnifiedLog.fetchOffsetSnapshot(UnifiedLog.java:691)
 ~[kafka-storage-4.1.0-c60c83aaba.jar:?]
    at 
kafka.cluster.Partition.$anonfun$fetchOffsetSnapshot$1(Partition.scala:1640) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:1637) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:86) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) 
~[scala-library-2.13.15.jar:?]
    at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) 
~[scala-library-2.13.15.jar:?]
    at scala.collection.AbstractIterable.foreach(Iterable.scala:935) 
~[scala-library-2.13.15.jar:?]
    at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.server.purgatory.DelayedOperation.safeTryCompleteOrElse(DelayedOperation.java:123)
 ~[kafka-server-common-4.1.0-c60c83aaba.jar:?]
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperationPurgatory.java:148)
 ~[kafka-server-common-4.1.0-c60c83aaba.jar:?]
    at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1768) 
~[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:763) 
[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.server.KafkaApis.handle(KafkaApis.scala:169) 
[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158) 
[kafka_2.13-4.1.0-c60c83aaba.jar:?]
    at java.base/java.lang.Thread.run(Thread.java:840) [?:?]

```

[~dajac] pointed out that a likely cause is the following code-line:

```
if (firstUnstableOffsetMetadata.isPresent() && 
firstUnstableOffsetMetadata.get().messageOffset < 
highWatermarkMetadata.messageOffset) {
```
 
Which is not safe since `firstUnstableOffsetMetadata` is volatile.
[https://github.com/apache/kafka/commit/759fbbba8b977206ad189172f429acb97e98a798]

 

cc [~mimaison] 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to