[
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862925#comment-17862925
]
Luke Chen commented on KAFKA-17062:
-----------------------------------
Good quesiton that should we create new
[RemoteLogSegmentId|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L850]
when failed segment retried?
[~satishd] [~ckamal] , we'd like to hear your thought about it?
> RemoteLogManager - RemoteStorageException causes data loss
> ----------------------------------------------------------
>
> Key: KAFKA-17062
> URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
> Issue Type: Bug
> Components: Tiered-Storage
> Affects Versions: 3.8.0, 3.7.1, 3.9.0
> Reporter: Guillaume Mallet
> Assignee: Guillaume Mallet
> Priority: Major
> Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the
> amount of data stored in the filesystem and in remote storage. However a
> failure while offloading to remote storage can cause segments to be dropped
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment
> locally (the local segment) and possibly more if the remote storage is
> offline. i.e. segments in the following RemoteLogSegmentStates in the
> RemoteLogMetadataManager (RLMM) :
> * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
> * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
> * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first
> iteration of an RLMTask we will have -
> *
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
> : is called first
> ** RLMM becomes aware of Segment 4 and adds it to the metadata:
> *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
> *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
> *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
> *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> ** An exception is raised during the copy operation
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
> in RemoteStorageManager) which is caught with the error message “{{Error
> occurred while copying log segments of partition}}” and no further copy will
> be attempted for the duration of this RLMTask.
> ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}}
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being
> cleaned up when the associated segment is deleted.
> *
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
> is then called
> ** Retention size is computed in
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
> as the sum of all the segments size regardless of their state so computed
> size of the topic is 1 (local) + 4 (remote)
> ** Segment 1 as being the oldest will be dropped.
> At the second iteration after
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
> (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4
> in a {{COPY_SEGMENT_STARTED}} state each with a different
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing
> the LSO at the same time and causing the UnifiedLog to delete the local and
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <<EOF >> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> remote.log.manager.task.interval.ms=5000
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rsm.config.dir=/tmp/tieredStorage
> EOF
> {code}
> 2. Start a Kafka server with the following classpath. This is needed so we
> can use test class LocalTieredStorage as an implementation of
> RemoteStorageManager.
> {code:bash}
> export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
> export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c
> config/kraft/server.properties
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 3. In a separate shell, create the topic and produce enough records to fill
> the remote log
> {code:bash}
> bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092
> \
> --config retention.bytes=1000000000 --config segment.bytes=100000000 \
> --config remote.storage.enable=true --config local.retention.bytes=1
> bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
> --throughput -1 --record-size 1000 \
> --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> 4. In a separate shell, watch the remote log directory content
> {code:bash}
> watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
> {code}
> 5. Once all logs are sent to the remote storage (when the server logs stops,
> should take around 2min), stop the Kafka server
> 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order
> to throw a {{RemoteStorageException}} and disable the ability to store new
> remote segments.
> 7. Rebuild Kafka
> {code:bash}
> ./gradlew testJar
> {code}
> 8. Restart the Kafka server
> {code:bash}
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 9. Send enough data for one segment rollup
> {code:bash}
> bin/kafka-producer-perf-test.sh \
> --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
> --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> All data in the remote directory will start getting deleted when we would
> expect just no more writes to happen to the remote storage.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)