Guillaume Mallet created KAFKA-17062:
----------------------------------------
Summary: RemoteLogManager - RemoteStorageException causes data loss
Key: KAFKA-17062
URL: https://issues.apache.org/jira/browse/KAFKA-17062
Project: Kafka
Issue Type: Bug
Components: Tiered-Storage
Reporter: Guillaume Mallet
When Tiered Storage is configured, retention.bytes defines the limit for the
amount of data stored in the filesystem and in remote storage. However a
failure while offloading to remote storage can cause segments to be dropped
before the retention limit is met.
What happens
Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a
{{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would
expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment
locally (the local segment) and possibly more if the remote storage is offline.
i.e. segments in the following RemoteLogSegmentStates in the
RemoteLogMetadataManager (RLMM) :
* Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
* Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
* Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
Let's assume the RLMM starts failing when segment 4 rolls. At the first
iteration of an RLMTask we will have -
*
[{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
: is called first
** RLMM becomes aware of Segment 4 and adds it to the metadata:
*** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
*** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
*** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
*** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
** An exception is raised during the copy operation
([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
in RemoteStorageManager) which is caught with the error message “{{Error
occurred while copying log segments of partition}}” and no further copy will be
attempted for the duration of this RLMTask.
** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} but
will transition to {{DELETE_SEGMENT_STARTED}} eventually before being cleaned
up when the associated segment is deleted.
*
[{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
is then called
** Retention size is computed in
[{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
as the sum of all the segments size regardless of their state so computed size
of the topic is 1 (local) + 4 (remote)
** Segment 1 as being the oldest will be dropped.
At the second iteration after
[{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
(default: 30s), the same will happen. The RLMM will now have 2 x Segment 4 in
a {{COPY_SEGMENT_STARTED}} state each with a different {{RemoteLogSegmentId}}
and Segment 2 will be dropped. The same will happen to Segment 3 after another
iteration.
At that point, we now have the RLMM composed of 4 copies of Segment 4 in
{{COPY_SEGMENT_STARTED}} state. Segment 4 is marked for deletion increasing the
LSO at the same time and causing the UnifiedLog to delete the local and remote
data for Segment 4 including its metadata.
Under those circumstances Kafka can quickly delete segments that were not meant
for deletion causing a data loss.
Steps to reproduce the problem:
1. Enable tiered storage
{code:bash}
mkdir -p /tmp/tieredStorage/kafka-tiered-storage/
cat <<EOF >> config/kraft/server.properties
remote.log.storage.system.enable=True
remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
remote.log.manager.task.interval.ms=5000
remote.log.metadata.manager.listener.name=PLAINTEXT
rlmm.config.remote.log.metadata.topic.replication.factor=1
rsm.config.dir=/tmp/tieredStorage
EOF
{code}
2. Start a Kafka server with the following classpath. This is needed so we can
use test class LocalTieredStorage as an implementation of RemoteStorageManager.
{code:bash}
export CLASSPATH="$(pwd)/storage/build/libs/{*}:$(pwd)/clients/build/libs/{*}"
export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c
config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
{code}
3. In a separate shell, create the topic and produce enough records to fill the
remote log
{code:bash}
bin/kafka-topics.sh --create --topic bug-ts --bootstrap-server localhost:9092 \
--config retention.bytes=1000000000 --config segment.bytes=100000000 \
--config remote.storage.enable=true --config local.retention.bytes=1
bin/kafka-producer-perf-test.sh --topic bug-ts --num-records=1000000 \
--throughput -1 --record-size 1000 \
--producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
{code}
4. In a separate shell, watch the remote log directory content
{code:bash}
watch -n 1 – s -R /tmp/tieredStorage/kafka-tiered-storage/
{code}
5. Once all logs are sent to the remote storage (when the server logs stops,
should take around 2min), stop the Kafka server
6. Edit the file LocalTieredStorage#L309 in {{copyLogSegmentData()}} in order
to throw a {{RemoteStorageException}} and disable the ability to store new
remote segments.
7. Rebuild Kafka
{code:bash}
./gradlew testJar
{code}
8. Restart the Kafka server
{code:bash}
bin/kafka-server-start.sh config/kraft/server.properties
{code}
9. Send enough data for one segment rollup
{code:bash}
bin/kafka-producer-perf-test.sh \
--topic bug-ts --num-records=10000 --throughput -1 --record-size 10000 \
--producer-props acks=1 batch.size=100000 bootstrap.servers=localhost:9092
{code}
All data in the remote directory will start getting deleted when we would
expect just no more writes to happen to the remote storage.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)