[ 
https://issues.apache.org/jira/browse/KAFKA-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898616#comment-17898616
 ] 

Edoardo Comar commented on KAFKA-18020:
---------------------------------------

Client snippet to reproduce.

Start a local Kafka and create a topic 'mytopic'

 
 
{code:java}
package mysamples.jira.kafka18020;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.server.config.QuotaConfig;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MyAdminKafka18020 {
    public static void main(String[] args) throws Exception {
        String topicName = "mytopic";
        try (AdminClient adminClient = 
AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"))) {
            for (int i=0; i<10; i++) {
                StringBuilder sb = new StringBuilder();
                for(int j=1000*i; j<1000*(i+1); j++) {
                    sb.append(j).append(":").append(j).append(",");
                }
                sb.setLength(sb.length()-1);
                String throttles = sb.toString(); //"0:0,...,999:999" for i=0; 
then "1000:1000,...,1999:1999" and so on

                Map<ConfigResource, Collection<AlterConfigOp>> configs = new 
HashMap<>();
                List<AlterConfigOp> ops = new ArrayList<>();
                ops.add(new AlterConfigOp(new 
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, 
throttles), AlterConfigOp.OpType.APPEND));
                ops.add(new AlterConfigOp(new 
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, 
throttles), AlterConfigOp.OpType.APPEND));
                configs.put(new ConfigResource(ConfigResource.Type.TOPIC, 
topicName), ops);

                System.out.println(new 
StringBuilder().append("i=").append(i).append(" throttles=").
                        append(throttles.substring(0, 
10)).append("...").append(throttles.substring(throttles.length()-10, 
throttles.length())));

                adminClient.incrementalAlterConfigs(configs).all().get();
                Thread.sleep(1000);
            }
        }
    }
}
  {code}
output :
{code:java}
i=0 throttles=0:0,1:1,2:...98,999:999
i=1 throttles=1000:1000,...,1999:1999
i=2 throttles=2000:2000,...,2999:2999
i=3 throttles=3000:3000,...,3999:3999
Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
    at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155)
    at 
mysamples.jira.kafka18020.MyAdminKafka18020.main(MyAdminKafka18020.java:36)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server 
experienced an unexpected error when processing the request
 {code}
server logs:
{code:java}
[2024-11-15 13:05:44,001] ERROR Encountered quorum controller fault: 
incrementalAlterConfigs: event failed with RuntimeException (treated as 
UnknownServerException) at epoch 5 in 40005 microseconds. Renouncing leadership 
and reverting to the last committed offset 7788. 
(org.apache.kafka.server.fault.LoggingFaultHandler)
java.lang.RuntimeException: 'value' field is too long to be serialized
    at 
org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192)
    at org.apache.kafka.common.protocol.Message.size(Message.java:51)
    at 
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66)
    at 
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43)
    at 
org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340)
    at 
org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136)
    at 
org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186)
    at 
org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146)
    at org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334)
    at 
org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320)
    at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806)
    at 
org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891)
    at 
org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)
    at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)
    at java.base/java.lang.Thread.run(Thread.java:840)
[2024-11-15 13:05:44,003] INFO [RaftManager id=1] Received user request to 
resign from the current epoch 5 (org.apache.kafka.raft.KafkaRaftClient)
[2024-11-15 13:05:44,004] INFO [RaftManager id=1] Completed transition to 
ResignedState(localId=1, epoch=5, voters=[1], electionTimeoutMs=1906, 
unackedVoters=[], preferredSuccessors=[]) from 
Leader(localReplicaKey=ReplicaKey(id=1, 
directoryId=Optional[rT810KS9zfczOsLnqapVBQ]), epoch=5, epochStartOffset=6568, 
highWatermark=Optional[LogOffsetMetadata(offset=7789, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])], 
voterStates={1=ReplicaState(replicaKey=ReplicaKey(id=1, 
directoryId=Optional.empty), endOffset=Optional[LogOffsetMetadata(offset=7789, 
metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])], 
lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) 
(org.apache.kafka.raft.QuorumState)
[2024-11-15 13:05:45,564] ERROR [ControllerApis nodeId=1] Unexpected error 
handling request RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1, 
clientId=1, correlationId=1915, headerVersion=2) -- 
BrokerHeartbeatRequestData(brokerId=1, brokerEpoch=7, 
currentMetadataOffset=7788, wantFence=false, wantShutDown=false, 
offlineLogDirs=[]) with context 
RequestContext(header=RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1, 
clientId=1, correlationId=1915, headerVersion=2), 
connectionId='127.0.0.1:9093-127.0.0.1:54455-0-25', clientAddress=/127.0.0.1, 
principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), 
securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=4.0.0-SNAPSHOT), fromPrivilegedListener=false, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@16a3a3da])
 (kafka.server.ControllerApis)
org.apache.kafka.common.errors.NotControllerException: The active controller 
appears to be node 1.
[2024-11-15 13:05:45,565] INFO [NodeToControllerChannelManager id=1 
name=heartbeat] Client requested disconnect from node 1 
(org.apache.kafka.clients.NetworkClient)
[2024-11-15 13:05:45,565] INFO 
[broker-1-to-controller-heartbeat-channel-manager]: Recorded new KRaft 
controller, from now on will use node localhost:9093 (id: 1 rack: null) 
(kafka.server.NodeToControllerRequestThread)
 {code}

> Encountered quorum controller fault: incrementalAlterConfigs .. 
> RuntimeException: 'value' field is too long to be serialized
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-18020
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18020
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>    Affects Versions: 4.0.0
>            Reporter: Edoardo Comar
>            Priority: Major
>
> On a topic with a large number of partitions (5000) by repeatedly invoking
> {color:#000000}ReassignPartitionsCommand{color}{color:#00627a}.modifyTopicThrottles
>  {color}
> each time with a new set of replica throttles,
> the Quorum controller attempts to write a ConfigRecord that is too large :
>  
> {{[2024-11-14 15:34:41,612] ERROR Encountered quorum controller fault: 
> incrementalAlterConfigs: event failed with RuntimeException (treated as 
> UnknownServerException) at epoch 24 in 75784 microseconds. Renouncing 
> leadership and reverting to the last committed offset 214588. 
> (org.apache.kafka.server.fault.LoggingFaultHandler)}}
> {{java.lang.RuntimeException: 'value' field is too long to be serialized}}
> {{    at 
> org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192)}}
> {{    at org.apache.kafka.common.protocol.Message.size(Message.java:51)}}
> {{    at 
> org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66)}}
> {{    at 
> org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43)}}
> {{    at 
> org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340)}}
> {{    at 
> org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136)}}
> {{    at 
> org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186)}}
> {{    at 
> org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146)}}
> {{    at 
> org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334)}}
> {{    at 
> org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320)}}
> {{    at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806)}}
> {{    at 
> org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891)}}
> {{    at 
> org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)}}
> {{    at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)}}
> {{    at java.base/java.lang.Thread.run(Thread.java:840)}}
>  
> the adminClient receives an  UnknownServerException:
>  
> {{Error: org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request.}}
> {{java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.}}
> {{    at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)}}
> {{    at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)}}
> {{    at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155)}}
> {{    at 
> org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles(ReassignPartitionsCommand.java:1112)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to