Alexandre Dupriez created KAFKA-14190:
-----------------------------------------

             Summary: Corruption of Topic IDs with pre-2.8.0 admin clients
                 Key: KAFKA-14190
                 URL: https://issues.apache.org/jira/browse/KAFKA-14190
             Project: Kafka
          Issue Type: Bug
          Components: admin, core, zkclient
    Affects Versions: 3.2.1, 3.1.1, 3.2.0, 3.0.1, 3.0.0, 2.8.1, 3.1.0
            Reporter: Alexandre Dupriez


h4. Scope

The problem reported below has been verified to occur with Zookeeper 
controllers. It has not been attempted with Kraft controllers, although it is 
unlikely to be reproduced in Kraft mode given the nature of the issue and 
clients involved.
h4. Problem Description

There is a loss of topic IDs when an AdminClient of version < 2.8.0 is used to 
increase the number of partitions of a topic for a cluster with version >= 
2.8.0. This results in the controller re-creating topic IDs upon restart, 
eventually conflicting with the topic ID of broker’s partition.metadata files 
in the partition directories of the impacted topic, leading to an availability 
loss of the partitions which do not accept leadership / follower-ship when the 
topic ID indicated by a LeaderAndIsr request differ from their own locally 
cached ID.

One mitigation post-corruption is to substitute the stale topic ID in the 
partition.metadata files with the new topic ID referenced by the controller, or 
alternatively, delete the partition.metadata file altogether. 
h4. Steps to reproduce

1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode.

2. Create a topic e.g. via {{kafka-topics.sh}}

 
{noformat}
./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic 
--partitions 2 --replication-factor 2{noformat}
3. Capture the topic ID using a 2.8.0+ client.

 
{noformat}
./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic --describe

Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2 
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat}
 

4. Restart one of the broker. This will make each broker create the 
{{partition.metadata}} files in the partition directories since it will already 
have loaded the {{Log}} instance in memory.

 

5. Using a pre-2.8.0 client library, run the following command.

 
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic 
--partitions 3{noformat}
 

6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice 
the absence of topic ID from the output, where it is otherwise expected.

 
{noformat}
./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic

Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs: 
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
 

7. Using a 2.8.0+ client library, describe the topic via a broker endpoint and 
notice the topic ID changed.

 
{noformat}
./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic

Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
 

8. Restart the controller.

9. Check the state-change.log file on the controller broker. The following type 
of logs will appear.

 
{noformat}
[2022-08-25 17:44:05,308] ERROR [Broker id=0] Topic Id in memory: 
jKTRaM_TSNqocJeQI2aYOQ does not match the topic Id for partition myTopic-0 
provided in the request: nI-JQtPwQwGiylMfm8k13w. (state.change.logger){noformat}
 

10. Restart the other broker.

11. Describe the topic via the broker endpoint or Zookeeper with a 2.8.0+ 
client library

 
{noformat}
./kafka/bin/kafka-topics.sh --zookeeper :2181 --describe --topic myTopic

Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 
ReplicationFactor: 2 Configs: 
Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0
Topic: myTopic Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0{noformat}
 

Notice the abnormal state the topic is in: ISR is reduced to one single broker 
which is claimed to be the leader by the controller (here, broker 0). The 
controller believes 0 is the leader because it does not handle the error 
response from peer brokers when sending the requests for them to become a 
leader or follower of a partition.

12. Verify produce is unavailable.

 
{noformat}
./kafka/bin/kafka-console-producer.sh —bootstrap-server:9092 —topic myTopic

[2022-08-25 17:52:59,962] ERROR Error when sending message to topic myTopic 
with key: null, value: 1 bytes with error: 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.
[2022-08-25 17:52:59,964] WARN [Producer clientId=console-producer] Received 
invalid metadata error in produce request on partition myTopic-1 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now 
(org.apache.kafka.clients.producer.internals.Sender){noformat}
13. Verify consume is unavailable.

 

 
{noformat}
./kafka/bin/kafka-console-consumer.sh —bootstrap-server:9092 —topic myTopic
[2022-08-25 17:53:49,416] DEBUG [Consumer 
clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008] 
Received LIST_OFFSETS response from node 0 for request with header 
RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, 
clientId=consumer-console-consumer-25008-1, correlationId=31): 
ListOffsetsResponseData(throttleTimeMs=0, 
topics=[ListOffsetsTopicResponse(name='myTopic', 
partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=75, 
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1), 
ListOffsetsPartitionResponse(partitionIndex=2, errorCode=75, 
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1), 
ListOffsetsPartitionResponse(partitionIndex=0, errorCode=75, 
oldStyleOffsets=[], timestamp=-1, offset=-1, leaderEpoch=-1)])]) 
(org.apache.kafka.clients.NetworkClient)
[2022-08-25 17:53:49,416] DEBUG [Consumer 
clientId=consumer-console-consumer-25008-1, groupId=console-consumer-25008] 
Attempt to fetch offsets for partition myTopic-1 failed due to 
UNKNOWN_LEADER_EPOCH, retrying. 
(org.apache.kafka.clients.consumer.internals.Fetcher)
{noformat}
 
h4. Follow-up

Currently, the ID of a topic is stored in the znode 
/brokers/topics/<topic-name> along with the partition assignment. This is a 
natural choice of location, however the overwrite of the znode performed by 
“old” admin client destroys information which is later on not recovered by the 
cluster. 

It would be tempting to think about keeping the topic ID information in one 
single place, however each of the following approaches fail:
 # Keeping the information in Zookeeper only. But without being stored with the 
partition’s data, locally on brokers, there would be no way for a broker to 
know that the data associated to the partition belongs to a topic which is 
different from that currently referenced by the controller, defeating the 
purpose of topic IDs.
 # Keeping the information locally on brokers, but ensuring consistency would 
then require an extreme level of complexity, if at all possible without the use 
of a strongly consistent data store.

Therefore, topic IDs have to be maintained in both location. Given the fact 
that this information is immutable, this should not be a problem, except in the 
case encountered here. Additionally, note that any client which references the 
topic ID will be also impacted by a change in the topic ID upon the 
controller’s initiative when such an ID is found as absent.

Since pre-2.8.0 clients from the Kafka client or third-party libraries are 
still widely used, it may be worth thinking about how to remediate the problem 
for as long as Zookeeper controllers are supported.

One way to prevent destructing the information contained in the topic 
assignment znode could potentially be to store it in an out-of-band znode, 
although feasability and correctness of that approach needs to be assessed.

Such an approach would likely add a significant level of complexity without 
enough added benefit. Preserving consistency (especially if atomic reads/writes 
to the two znodes are not possible) can prove to be difficult. Keeping full 
backward compatibility with existing pre and post 2.8.0 clients, and after 
version upgrades, would add to the obstacles.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to