[
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862464#comment-17862464
]
Guillaume Mallet commented on KAFKA-17062:
------------------------------------------
Hey [~divijvaidya] thanks for checking this !
I was thinking of how to fix this and thought of not accounting for segments in
a different state than copy_finished and/or delete_started as you said but this
means we could potentially breach the {{retention.bytes}} we initially planned
for.
For example, if the RemoteLogSegmentId changes the name of the file on the
remote system we could potentially have them remaining for an extended period
of time (while the Segment itself hasn't been cleaned up) causing the overall
size of the topic to go above what we would expect.
I wonder if moving failed segment to delete_started when we handle the
exception could help clean things up a bit earlier and help reducing the drift
in storage.
Another concern it could help with is keeping the size of the metadata bounded
if the failure happens only on the write path (in case of complete failure we
still have an unbounded growth with one new segment every
[{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395])
What do you think ?
> RemoteLogManager - RemoteStorageException causes data loss
> ----------------------------------------------------------
>
> Key: KAFKA-17062
> URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
> Issue Type: Bug
> Components: Tiered-Storage
> Affects Versions: 3.8.0, 3.7.1, 3.9.0
> Reporter: Guillaume Mallet
> Assignee: Guillaume Mallet
> Priority: Major
> Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the
> amount of data stored in the filesystem and in remote storage. However a
> failure while offloading to remote storage can cause segments to be dropped
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment
> locally (the local segment) and possibly more if the remote storage is
> offline. i.e. segments in the following RemoteLogSegmentStates in the
> RemoteLogMetadataManager (RLMM) :
> * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
> * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
> * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first
> iteration of an RLMTask we will have -
> *
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
> : is called first
> ** RLMM becomes aware of Segment 4 and adds it to the metadata:
> *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
> *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
> *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
> *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> ** An exception is raised during the copy operation
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
> in RemoteStorageManager) which is caught with the error message “{{Error
> occurred while copying log segments of partition}}” and no further copy will
> be attempted for the duration of this RLMTask.
> ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}}
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being
> cleaned up when the associated segment is deleted.
> *
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
> is then called
> ** Retention size is computed in
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
> as the sum of all the segments size regardless of their state so computed
> size of the topic is 1 (local) + 4 (remote)
> ** Segment 1 as being the oldest will be dropped.
> At the second iteration after
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
> (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4
> in a {{COPY_SEGMENT_STARTED}} state each with a different
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing
> the LSO at the same time and causing the UnifiedLog to delete the local and
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <<EOF >> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> remote.log.manager.task.interval.ms=5000
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rsm.config.dir=/tmp/tieredStorage
> EOF
> {code}
> 2. Start a Kafka server with the following classpath. This is needed so we
> can use test class LocalTieredStorage as an implementation of
> RemoteStorageManager.
> {code:bash}
> export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
> export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c
> config/kraft/server.properties
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 3. In a separate shell, create the topic and produce enough records to fill
> the remote log
> {code:bash}
> bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092
> \
> --config retention.bytes=1000000000 --config segment.bytes=100000000 \
> --config remote.storage.enable=true --config local.retention.bytes=1
> bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
> --throughput -1 --record-size 1000 \
> --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> 4. In a separate shell, watch the remote log directory content
> {code:bash}
> watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
> {code}
> 5. Once all logs are sent to the remote storage (when the server logs stops,
> should take around 2min), stop the Kafka server
> 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order
> to throw a {{RemoteStorageException}} and disable the ability to store new
> remote segments.
> 7. Rebuild Kafka
> {code:bash}
> ./gradlew testJar
> {code}
> 8. Restart the Kafka server
> {code:bash}
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 9. Send enough data for one segment rollup
> {code:bash}
> bin/kafka-producer-perf-test.sh \
> --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
> --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> All data in the remote directory will start getting deleted when we would
> expect just no more writes to happen to the remote storage.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)