[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872462#comment-17872462
 ] 

Colin McCabe commented on KAFKA-17062:
--------------------------------------

[~showuon]: If this isn't a regression in 3.9, then it isn't a 3.9 blocker. 
However, I agree that it would be very good to get it into the upcoming 3.9 
release (and also backport it to the older releases that were also affected.)

I will leave "Fix release" at 3.9 for now, under the assumption that we can get 
this done in the next few weeks.

Note that code freeze for 3.9 hasn't happened yet, so you can merge the fix 
into 3.9 without explicit approval. (Code freeze will happen at the end of this 
month)

> RemoteLogManager - RemoteStorageException causes data loss
> ----------------------------------------------------------
>
>                 Key: KAFKA-17062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17062
>             Project: Kafka
>          Issue Type: Bug
>          Components: Tiered-Storage
>    Affects Versions: 3.8.0, 3.7.1, 3.9.0
>            Reporter: Guillaume Mallet
>            Assignee: Guillaume Mallet
>            Priority: Critical
>              Labels: tiered-storage
>             Fix For: 3.9.0
>
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of all the segments size regardless of their state so computed 
> size of the topic is 1 (local) + 4 (remote)
>  ** Segment 1 as being the oldest will be dropped.
> At the second iteration after 
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
>  (default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 
> in a {{COPY_SEGMENT_STARTED}} state each with a different 
> {{RemoteLogSegmentId}} and Segment 2 will be dropped. The same will happen to 
> Segment 3 after another iteration.
> At that point, we now have the RLMM composed of 4 copies of Segment 4 in 
> {{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing 
> the LSO at the same time and causing the UnifiedLog to delete the local and 
> remote data for Segment 4 including its metadata.
> Under those circumstances Kafka can quickly delete segments that were not 
> meant for deletion causing a data loss.
> Steps to reproduce the problem:
> 1. Enable tiered storage
> {code:bash}
> mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
> cat <<EOF >> config/kraft/server.properties
> remote.log.storage.system.enable=True
> remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> remote.log.manager.task.interval.ms=5000
> remote.log.metadata.manager.listener.name=PLAINTEXT
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rsm.config.dir=/tmp/tieredStorage
> EOF
> {code}
> 2. Start a Kafka server with the following classpath. This is needed so we 
> can use test class LocalTieredStorage as an implementation of 
> RemoteStorageManager.
> {code:bash}
> export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
> export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
> bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
> config/kraft/server.properties
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 3. In a separate shell, create the topic and produce enough records to fill 
> the remote log
> {code:bash}
> bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 
> \
>    --config retention.bytes=1000000000 --config segment.bytes=100000000 \
>    --config remote.storage.enable=true --config local.retention.bytes=1
> bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
>    --throughput -1 --record-size 1000 \
>    --producer-props acks=1 batch.size=100000  bootstrap.servers=localhost:9092
> {code}
> 4. In a separate shell, watch the remote log directory content
> {code:bash}
> watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
> {code}
> 5. Once all logs are sent to the remote storage (when the server logs stops, 
> should take around 2min), stop the Kafka server 
> 6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order 
> to throw a {{RemoteStorageException}} and disable the ability to store new 
> remote segments.
> 7. Rebuild Kafka
> {code:bash}
>  ./gradlew testJar
> {code}
> 8. Restart the Kafka server
> {code:bash}
> bin/kafka-server-start.sh config/kraft/server.properties
> {code}
> 9. Send enough data for one segment rollup
> {code:bash}
> bin/kafka-producer-perf-test.sh \
>   --topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
>   --producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
> {code}
> All data in the remote directory will start getting deleted when we would 
> expect just no more writes to happen to the remote storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to