[ 
https://issues.apache.org/jira/browse/KAFKA-17771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bingkun updated KAFKA-17771:
----------------------------
    Attachment: image-2024-10-14-01-42-23-014.png

> Local segment doesn't gets deleted when exceeding retention
> -----------------------------------------------------------
>
>                 Key: KAFKA-17771
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17771
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.7.0, 3.7.1
>         Environment: Kafka Version 3.7.0 and 3.7.1
>            Reporter: Bingkun
>            Priority: Major
>         Attachments: image-2024-10-14-01-41-52-008.png, 
> image-2024-10-14-01-42-23-014.png
>
>
> Background:
> We're using tiered storage with Kafka version 3.7.0 and saw an issue as 
> describe below.
> Locally:
> Normally for each partition there's about 9~11 segments stored locally, but 
> sometimes for a certain partition, the cluster seems to 'forget' to delete 
> the local segments that is out of the retention policy. As a result, the 
> number of segments can grow excessively and the data size for the broker 
> would go up non-stop as well, causing the issue of high disk utilization 
> locally. It happened to 2 brokers and one of them of the leader of the 
> partition and the other one is the follower of the partition. After observing 
> the issue, restart the Kafka service in the broker who is the leader for the 
> partition and the out-of-retention segments would be purged afterwards.
> Tiered Storage: (S3)
> Another observation when the issue happened (the cluster "forget" to delete 
> the segment locally) is that the partition has way less segments in S3, 
> compared to the other partitions. From the "stuck" segments, all the segments 
> after it are not uploaded to S3. After I ran restart broker, the segment with 
> issue will be uploaded to S3 again and the segments that "piled up" locally 
> would also be uploaded to S3 normally.
> Logs:
> When the stuck segment firstly created, we can see the logs as follows:
> Roll log:
> {code:java}
> [2024-10-12 13:19:11,891] INFO [LocalLog partition=topic-5, dir=/data/kafka] 
> Rolled new log segment at offset 100740258 in 3 ms. (kafka.log.LocalLog){code}
> The log trying to copy to S3:
>  
> {code:java}
> [2024-10-12 13:19:20,331] INFO [RemoteLogManager=5 
> partition=xKbfoDniSgCvS4uoQV-kXg:topic-5] Copying 00000000000095914682.log to 
> remote storage. (kafka.log.remote.RemoteLogManager$RLMTask)
> [2024-10-12 13:19:20,360] INFO Copying log segment data, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=xKbfoDniSgCvS4uoQV-kXg:topic-5,
>  id=LWqV0RWKQjCbpA11QoqRHg}, startOffset=95914682, endOffset=100740257, 
> brokerId=5, maxTimestampMs=1728739151302, eventTimestampMs=1728739160332, 
> segmentLeaderEpochs={3=95914682}, segmentSizeInBytes=1073596272, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED} 
> (io.aiven.kafka.tieredstorage.RemoteStorageManager)
> {code}
> However I couldn't find the Successful upload log to S3 for this segment like 
> this one: (other successful upload log for other topic)
> {code:java}
> Copying log segment data completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=xTGOd9foR8GBg-qtYJx20g:other-successful-topic-128,
>  id=7Qs1o_1KSeOM88UP0m-Lsg}, startOffset=354054408, endOffset=356669503, 
> brokerId=5, maxTimestampMs=1728739112406, eventTimestampMs=1728739132877, 
> segmentLeaderEpochs={0=354054408}, segmentSizeInBytes=1073739734, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED} 
> (io.aiven.kafka.tieredstorage.RemoteStorageManager){code}
> We didn't find any other logs that can directly indicate the what the root 
> cause and we just found that there's no mark delete for the segment and no 
> delete operation for the segment as well. The last deletion log is to delete 
> the segment before it. Also after I restart the broker, all the out-of date 
> segments are deleted all at once.
> {code:java}
> ......
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topci-5, 
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=95914682, 
> size=1073596272, lastModifiedTime=1728739151875, 
> largestRecordTimestamp=1728739151302) due to local log retention time 
> 17280000ms breach based on the largest record timestamp in the segment 
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, 
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=100740258, 
> size=1073736717, lastModifiedTime=1728740051726, 
> largestRecordTimestamp=1728740051724) due to local log retention time 
> 17280000ms breach based on the largest record timestamp in the segment 
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, 
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=105583179, 
> size=1073740941, lastModifiedTime=1728740947662, 
> largestRecordTimestamp=1728740947658) due to local log retention time 
> 17280000ms breach based on the largest record timestamp in the segment 
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, 
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=110383126, 
> size=1073738319, lastModifiedTime=1728741837169, 
> largestRecordTimestamp=1728741837162) due to local log retention time 
> 17280000ms breach based on the largest record timestamp in the segment 
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, 
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=115183488, 
> size=1073737392, lastModifiedTime=1728742728361, 
> largestRecordTimestamp=1728742728347) due to local log retention time 
> 17280000ms breach based on the largest record timestamp in the segment 
> (kafka.log.UnifiedLog)
> ......{code}
> Configurations: (topic and broker configuration)
> We keep the local retention to be 20% of the total retention and the 
> configuration looks like this:{{      }}
> {code:java}
> config = { 
>     "retention.ms" = 86400000 / 8 # 3 hours 
>     "local.retention.ms" = 86400000 / 8 / 5 # 20% local data
>     "remote.storage.enable" = true 
> }{code}
>  
> {code:java}
> listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
> num.network.threads=6
> num.io.threads=16
> queued.max.requests=1000
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> num.partitions=1
> default.replication.factor=2
> num.recovery.threads.per.data.dir=64
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=60000
> zookeeper.connection.timeout.ms=300000
> group.initial.rebalance.delay.ms=0
> auto.create.topics.enable=true
> delete.topic.enable=true
> controlled.shutdown.enable=true
> compression.type=snappy
> message.max.bytes=2000000
> log.dirs=/data/kafka
> metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
> # Kafka Auth and ACL configurations
> sasl.enabled.mechanisms=PLAIN
> listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler
> listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required ;
> authorizer.class.name=kafka.security.authorizer.AclAuthorizer
> super.users=User:admin
> allow.everyone.if.no.acl.found=true
> # Remote storage configurations
> remote.log.storage.system.enable=true
> remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/*
> remote.log.storage.manager.impl.prefix=rsm.config.
> remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
> remote.log.metadata.manager.impl.prefix=rlmm.config.
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=3
> rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.chunk.size=4194304
> rsm.config.key.prefix.mask=true
> rsm.config.storage.s3.credentials.default=true
> rsm.config.storage.s3.region=us-east-1
> # Size of parts in bytes to use when uploading, 160MB here - default is 5MB:
> rsm.config.storage.s3.multipart.upload.part.size=167772160
> rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
> rsm.config.fetch.chunk.cache.path=/data/cache
> # Pick some cache size, 16 GiB here:
> rsm.config.fetch.chunk.cache.size=17179869184
> # Prefetching size, 160 MiB here:
> rsm.config.fetch.chunk.cache.prefetch.max.size=167772160
> {code}
> Cluster size:
> 70 brokers with c7g.xlarge instance.
> Traffic: 7 million messages/sec and 2.6GiB/sec.
> CPU: ~70%
> Memory: 20%
> Disk Throughtput: 100MB/s
> IOPS: 3000 IOPS/s
> The issue happened like once every 3~4 days and it's really hard to debug the 
> issue. So I'm posting it here and maybe we can have some discussion for the 
> issue. Thank you.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to