AnatolyPopov opened a new pull request, #15165:
URL: https://github.com/apache/kafka/pull/15165

   When partition reassignment is happening for a tiered topic in most of the 
cases it's stuck with RemoteStorageException's on follower nodes saying that it 
can not construct remote log auxilary state:
   
   ```
   [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, 
fetcherId=2] Error building remote log auxiliary state for test-24 
(kafka.server.ReplicaFetcherThread)
                                            
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't 
build the state from remote store for partition: test-24, currentLeaderEpoch: 
8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the 
previous remote log segment metadata was not found
                                                    at 
kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259)
                                                    at 
kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106)
                                                    at 
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413)
                                                    at 
scala.Option.foreach(Option.scala:437)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
                                                    at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
                                                    at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
                                                    at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
                                                    at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
                                                    at 
scala.Option.foreach(Option.scala:437)
                                                    at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
                                                    at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
                                                    at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
                                                    at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)
   ```
    
   
   
   
   
   Scenario:
   
   A cluster of 3 nodes with a single topic with 30 partitions. All partitions 
have tiered segments.
   Adding 3 more nodes to the cluster and making a reassignment to move all the 
data to new nodes.
   Behavior:
   For most of the partitions reassignment is happening smoothly.
   For some of the partitions when a new node starts to get assignments it 
reads __remote_log_metadata topic and tries to initialize the metadata cache on 
records with COPY_SEGMENT_STARTED. If it's reading such a message for the 
partition before the partition was assigned to this specific node it ignores 
the message, so skips the cache initialization and marks the partition as 
assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly 
processed.
   
   Expected behavior:
   The partitions should not be marked as assigned the cache is initialized to 
be able to re-read COPY_SEGMENT_STARTED message and initialize the cache.
   
   
   
   
   Some notes:
   This is most probably happening when there are messages in a single metadata 
partition and the order of the messages does not correspond to the order of 
assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that 
the user partition is not yet assigned to this node, skips the message, and 
marks the user partition as assigned. On the next iteration, it resets to 
beginning ONLY the metadata partitions for user partitions that were not yet 
assigned, the rest of the metadata partitions it reads from the offset it 
remembers from the previous step, and that for some reason does not help. So 
that skipped COPY_SEGMENT_STARTED seems to be never re-read, so the metadata 
cache is not initialized.
   
   
   
   One solution can be not to track the last read offset for the metadata 
partition but instead, always reset them to the beginning if reassignment has 
changed. This seems not to bring any harm since the messages that were 
processed before will be just skipped. And this seems to allow the reassignment 
to finish properly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to