[
https://issues.apache.org/jira/browse/KAFKA-17771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bingkun updated KAFKA-17771:
----------------------------
Description:
Background:
We're using tiered storage with Kafka version 3.7.0 and saw an issue as
describe below.
Locally:
Normally for each partition there's about 9~11 segments stored locally, but
sometimes for a certain partition, the cluster seems to 'forget' to delete the
local segments that is out of the retention policy. As a result, the number of
segments can grow excessively and the data size for the broker would go up
non-stop as well, causing the issue of high disk utilization locally. It
happened to 2 brokers and one of them of the leader of the partition and the
other one is the follower of the partition. After observing the issue, restart
the Kafka service in the broker who is the leader for the partition and the
out-of-retention segments would be purged afterwards.
Tiered Storage: (S3)
Another observation when the issue happened (the cluster "forget" to delete the
segment locally) is that the partition has way less segments in S3, compared to
the other partitions. From the "stuck" segments, all the segments after it are
not uploaded to S3. After I ran restart broker, the segment with issue will be
uploaded to S3 again and the segments that "piled up" locally would also be
uploaded to S3 normally.
Logs:
When the stuck segment firstly created, we can see the logs as follows:
Roll log:
{code:java}
[2024-10-12 13:19:11,891] INFO [LocalLog partition=topic-5, dir=/data/kafka]
Rolled new log segment at offset 100740258 in 3 ms. (kafka.log.LocalLog){code}
The log trying to copy to S3:
{code:java}
[2024-10-12 13:19:20,331] INFO [RemoteLogManager=5
partition=xKbfoDniSgCvS4uoQV-kXg:topic-5] Copying 00000000000095914682.log to
remote storage. (kafka.log.remote.RemoteLogManager$RLMTask)
[2024-10-12 13:19:20,360] INFO Copying log segment data, metadata:
RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=xKbfoDniSgCvS4uoQV-kXg:topic-5,
id=LWqV0RWKQjCbpA11QoqRHg}, startOffset=95914682, endOffset=100740257,
brokerId=5, maxTimestampMs=1728739151302, eventTimestampMs=1728739160332,
segmentLeaderEpochs={3=95914682}, segmentSizeInBytes=1073596272,
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}
(io.aiven.kafka.tieredstorage.RemoteStorageManager)
{code}
However I couldn't find the Successful upload log to S3 for this segment.
We didn't find any other logs that can directly indicate the what the root
cause and we just found that there's no mark delete for the segment and no
delete operation for the segment as well. The last deletion log is to delete
the segment before it. Also after I restart the broker, all the out-of date
segments are deleted all at once.
{code:java}
......
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topci-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=95914682, size=1073596272,
lastModifiedTime=1728739151875, largestRecordTimestamp=1728739151302) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=100740258, size=1073736717,
lastModifiedTime=1728740051726, largestRecordTimestamp=1728740051724) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=105583179, size=1073740941,
lastModifiedTime=1728740947662, largestRecordTimestamp=1728740947658) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=110383126, size=1073738319,
lastModifiedTime=1728741837169, largestRecordTimestamp=1728741837162) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=115183488, size=1073737392,
lastModifiedTime=1728742728361, largestRecordTimestamp=1728742728347) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
......{code}
Configurations: (topic and broker configuration)
We keep the local retention to be 20% of the total retention and the
configuration looks like this:{{ }}
{code:java}
config = {
"retention.ms" = 86400000 / 8 # 3 hours
"local.retention.ms" = 86400000 / 8 / 5 # 20% local data
"remote.storage.enable" = true
}{code}
{code:java}
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
num.network.threads=6
num.io.threads=16
queued.max.requests=1000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=64
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=60000
zookeeper.connection.timeout.ms=300000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topic.enable=true
controlled.shutdown.enable=true
compression.type=snappy
message.max.bytes=2000000
log.dirs=/data/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
# Kafka Auth and ACL configurations
sasl.enabled.mechanisms=PLAIN
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required ;
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
# Remote storage configurations
remote.log.storage.system.enable=true
remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/*
remote.log.storage.manager.impl.prefix=rsm.config.
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.metadata.manager.impl.prefix=rlmm.config.
remote.log.metadata.manager.listener.name=PLAINTEXT
rlmm.config.remote.log.metadata.topic.replication.factor=3
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.chunk.size=4194304
rsm.config.key.prefix.mask=true
rsm.config.storage.s3.credentials.default=true
rsm.config.storage.s3.region=us-east-1
# Size of parts in bytes to use when uploading, 160MB here - default is 5MB:
rsm.config.storage.s3.multipart.upload.part.size=167772160
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/data/cache
# Pick some cache size, 16 GiB here:
rsm.config.fetch.chunk.cache.size=17179869184
# Prefetching size, 160 MiB here:
rsm.config.fetch.chunk.cache.prefetch.max.size=167772160
{code}
Cluster size:
70 brokers with c7g.xlarge instance.
Traffic: 7 million messages/sec and 2.6GiB/sec.
CPU: ~70%
Memory: 20%
Disk Throughtput: 100MB/s
IOPS: 3000 IOPS/s
The issue happened like once every 3~4 days and it's really hard to debug the
issue. So I'm posting it here and maybe we can have some discussion for the
issue. Thank you.
was:
Background:
We're using tiered storage with Kafka version 3.7.0 and saw an issue as
describe below.
Locally:
Normally for each partition there's about 9~11 segments stored locally, but
sometimes for a certain partition, the cluster seems to 'forget' to delete the
local segments that is out of the retention policy. As a result, the number of
segments can grow excessively and the data size for the broker would go up
non-stop as well, causing the issue of high disk utilization locally. It
happened to 2 brokers and one of them of the leader of the partition and the
other one is the follower of the partition. After observing the issue, restart
the Kafka service in the broker who is the leader for the partition and the
out-of-retention segments would be purged afterwards.
Tiered Storage: (S3)
Another observation when the issue happened (the cluster "forget" to delete the
segment locally) is that the partition has way less segments in S3, compared to
the other partitions. From the "stuck" segments, all the segments after it are
not uploaded to S3. After I ran restart broker, the segment with issue will be
uploaded to S3 again and the segments that "piled up" locally would also be
uploaded to S3 normally.
Logs:
Currently for the logs, we didn't find any logs that can directly indicate the
what the root cause and we just found that there's no mark delete for the
segment and no delete operation for the segment as well. The last deletion log
is to delete the segment before it. Also after I restart the broker, all the
out-of date segments are deleted all at once.
{code:java}
......
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topci-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=95914682, size=1073596272,
lastModifiedTime=1728739151875, largestRecordTimestamp=1728739151302) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=100740258, size=1073736717,
lastModifiedTime=1728740051726, largestRecordTimestamp=1728740051724) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=105583179, size=1073740941,
lastModifiedTime=1728740947662, largestRecordTimestamp=1728740947658) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=110383126, size=1073738319,
lastModifiedTime=1728741837169, largestRecordTimestamp=1728741837162) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
[2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5, dir=/data/kafka]
Deleting segment LogSegment(baseOffset=115183488, size=1073737392,
lastModifiedTime=1728742728361, largestRecordTimestamp=1728742728347) due to
local log retention time 17280000ms breach based on the largest record
timestamp in the segment (kafka.log.UnifiedLog)
......{code}
Configurations: (topic and broker configuration)
We keep the local retention to be 20% of the total retention and the
configuration looks like this:{{ }}
{code:java}
config = {
"retention.ms" = 86400000 / 8 # 3 hours
"local.retention.ms" = 86400000 / 8 / 5 # 20% local data
"remote.storage.enable" = true
}{code}
{code:java}
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
num.network.threads=6
num.io.threads=16
queued.max.requests=1000
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=64
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=60000
zookeeper.connection.timeout.ms=300000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable=true
delete.topic.enable=true
controlled.shutdown.enable=true
compression.type=snappy
message.max.bytes=2000000
log.dirs=/data/kafka
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
# Kafka Auth and ACL configurations
sasl.enabled.mechanisms=PLAIN
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required ;
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
# Remote storage configurations
remote.log.storage.system.enable=true
remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/*
remote.log.storage.manager.impl.prefix=rsm.config.
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.metadata.manager.impl.prefix=rlmm.config.
remote.log.metadata.manager.listener.name=PLAINTEXT
rlmm.config.remote.log.metadata.topic.replication.factor=3
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.chunk.size=4194304
rsm.config.key.prefix.mask=true
rsm.config.storage.s3.credentials.default=true
rsm.config.storage.s3.region=us-east-1
# Size of parts in bytes to use when uploading, 160MB here - default is 5MB:
rsm.config.storage.s3.multipart.upload.part.size=167772160
rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
rsm.config.fetch.chunk.cache.path=/data/cache
# Pick some cache size, 16 GiB here:
rsm.config.fetch.chunk.cache.size=17179869184
# Prefetching size, 160 MiB here:
rsm.config.fetch.chunk.cache.prefetch.max.size=167772160
{code}
Cluster size:
70 brokers with c7g.xlarge instance.
Traffic: 7 million messages/sec and 2.6GiB/sec.
CPU: ~70%
Memory: 20%
Disk Throughtput: 100MB/s
IOPS: 3000 IOPS/s
The issue happened like once every 3~4 days and it's really hard to debug the
issue. So I'm posting it here and maybe we can have some discussion for the
issue. Thank you.
> Local segment doesn't gets deleted when exceeding retention
> -----------------------------------------------------------
>
> Key: KAFKA-17771
> URL: https://issues.apache.org/jira/browse/KAFKA-17771
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 3.7.0, 3.7.1
> Environment: Kafka Version 3.7.0 and 3.7.1
> Reporter: Bingkun
> Priority: Major
>
> Background:
> We're using tiered storage with Kafka version 3.7.0 and saw an issue as
> describe below.
> Locally:
> Normally for each partition there's about 9~11 segments stored locally, but
> sometimes for a certain partition, the cluster seems to 'forget' to delete
> the local segments that is out of the retention policy. As a result, the
> number of segments can grow excessively and the data size for the broker
> would go up non-stop as well, causing the issue of high disk utilization
> locally. It happened to 2 brokers and one of them of the leader of the
> partition and the other one is the follower of the partition. After observing
> the issue, restart the Kafka service in the broker who is the leader for the
> partition and the out-of-retention segments would be purged afterwards.
> Tiered Storage: (S3)
> Another observation when the issue happened (the cluster "forget" to delete
> the segment locally) is that the partition has way less segments in S3,
> compared to the other partitions. From the "stuck" segments, all the segments
> after it are not uploaded to S3. After I ran restart broker, the segment with
> issue will be uploaded to S3 again and the segments that "piled up" locally
> would also be uploaded to S3 normally.
> Logs:
> When the stuck segment firstly created, we can see the logs as follows:
> Roll log:
> {code:java}
> [2024-10-12 13:19:11,891] INFO [LocalLog partition=topic-5, dir=/data/kafka]
> Rolled new log segment at offset 100740258 in 3 ms. (kafka.log.LocalLog){code}
> The log trying to copy to S3:
>
> {code:java}
> [2024-10-12 13:19:20,331] INFO [RemoteLogManager=5
> partition=xKbfoDniSgCvS4uoQV-kXg:topic-5] Copying 00000000000095914682.log to
> remote storage. (kafka.log.remote.RemoteLogManager$RLMTask)
> [2024-10-12 13:19:20,360] INFO Copying log segment data, metadata:
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=xKbfoDniSgCvS4uoQV-kXg:topic-5,
> id=LWqV0RWKQjCbpA11QoqRHg}, startOffset=95914682, endOffset=100740257,
> brokerId=5, maxTimestampMs=1728739151302, eventTimestampMs=1728739160332,
> segmentLeaderEpochs={3=95914682}, segmentSizeInBytes=1073596272,
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}
> (io.aiven.kafka.tieredstorage.RemoteStorageManager)
> {code}
>
> However I couldn't find the Successful upload log to S3 for this segment.
> We didn't find any other logs that can directly indicate the what the root
> cause and we just found that there's no mark delete for the segment and no
> delete operation for the segment as well. The last deletion log is to delete
> the segment before it. Also after I restart the broker, all the out-of date
> segments are deleted all at once.
> {code:java}
> ......
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topci-5,
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=95914682,
> size=1073596272, lastModifiedTime=1728739151875,
> largestRecordTimestamp=1728739151302) due to local log retention time
> 17280000ms breach based on the largest record timestamp in the segment
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5,
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=100740258,
> size=1073736717, lastModifiedTime=1728740051726,
> largestRecordTimestamp=1728740051724) due to local log retention time
> 17280000ms breach based on the largest record timestamp in the segment
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5,
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=105583179,
> size=1073740941, lastModifiedTime=1728740947662,
> largestRecordTimestamp=1728740947658) due to local log retention time
> 17280000ms breach based on the largest record timestamp in the segment
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5,
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=110383126,
> size=1073738319, lastModifiedTime=1728741837169,
> largestRecordTimestamp=1728741837162) due to local log retention time
> 17280000ms breach based on the largest record timestamp in the segment
> (kafka.log.UnifiedLog)
> [2024-10-13 02:17:42,356] INFO [UnifiedLog partition=topic-5,
> dir=/data/kafka] Deleting segment LogSegment(baseOffset=115183488,
> size=1073737392, lastModifiedTime=1728742728361,
> largestRecordTimestamp=1728742728347) due to local log retention time
> 17280000ms breach based on the largest record timestamp in the segment
> (kafka.log.UnifiedLog)
> ......{code}
> Configurations: (topic and broker configuration)
> We keep the local retention to be 20% of the total retention and the
> configuration looks like this:{{ }}
> {code:java}
> config = {
> "retention.ms" = 86400000 / 8 # 3 hours
> "local.retention.ms" = 86400000 / 8 / 5 # 20% local data
> "remote.storage.enable" = true
> }{code}
>
> {code:java}
> listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
> num.network.threads=6
> num.io.threads=16
> queued.max.requests=1000
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> num.partitions=1
> default.replication.factor=2
> num.recovery.threads.per.data.dir=64
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=60000
> zookeeper.connection.timeout.ms=300000
> group.initial.rebalance.delay.ms=0
> auto.create.topics.enable=true
> delete.topic.enable=true
> controlled.shutdown.enable=true
> compression.type=snappy
> message.max.bytes=2000000
> log.dirs=/data/kafka
> metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
> # Kafka Auth and ACL configurations
> sasl.enabled.mechanisms=PLAIN
> listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=liftoff.kafka.auth.PlainServerCustomizedCallbackHandler
> listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
> required ;
> authorizer.class.name=kafka.security.authorizer.AclAuthorizer
> super.users=User:admin
> allow.everyone.if.no.acl.found=true
> # Remote storage configurations
> remote.log.storage.system.enable=true
> remote.log.storage.manager.class.path=/etc/kafka/libs/remote_storage_libs/*
> remote.log.storage.manager.impl.prefix=rsm.config.
> remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
> remote.log.metadata.manager.impl.prefix=rlmm.config.
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=3
> rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.chunk.size=4194304
> rsm.config.key.prefix.mask=true
> rsm.config.storage.s3.credentials.default=true
> rsm.config.storage.s3.region=us-east-1
> # Size of parts in bytes to use when uploading, 160MB here - default is 5MB:
> rsm.config.storage.s3.multipart.upload.part.size=167772160
> rsm.config.fetch.chunk.cache.class=io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache
> rsm.config.fetch.chunk.cache.path=/data/cache
> # Pick some cache size, 16 GiB here:
> rsm.config.fetch.chunk.cache.size=17179869184
> # Prefetching size, 160 MiB here:
> rsm.config.fetch.chunk.cache.prefetch.max.size=167772160
> {code}
> Cluster size:
> 70 brokers with c7g.xlarge instance.
> Traffic: 7 million messages/sec and 2.6GiB/sec.
> CPU: ~70%
> Memory: 20%
> Disk Throughtput: 100MB/s
> IOPS: 3000 IOPS/s
> The issue happened like once every 3~4 days and it's really hard to debug the
> issue. So I'm posting it here and maybe we can have some discussion for the
> issue. Thank you.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)