[ https://issues.apache.org/jira/browse/KAFKA-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17898616#comment-17898616 ]
Edoardo Comar commented on KAFKA-18020: --------------------------------------- Client snippet to reproduce. Start a local Kafka and create a topic 'mytopic' {code:java} package mysamples.jira.kafka18020; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.server.config.QuotaConfig; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; public class MyAdminKafka18020 { public static void main(String[] args) throws Exception { String topicName = "mytopic"; try (AdminClient adminClient = AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"))) { for (int i=0; i<10; i++) { StringBuilder sb = new StringBuilder(); for(int j=1000*i; j<1000*(i+1); j++) { sb.append(j).append(":").append(j).append(","); } sb.setLength(sb.length()-1); String throttles = sb.toString(); //"0:0,...,999:999" for i=0; then "1000:1000,...,1999:1999" and so on Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(); List<AlterConfigOp> ops = new ArrayList<>(); ops.add(new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, throttles), AlterConfigOp.OpType.APPEND)); ops.add(new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, throttles), AlterConfigOp.OpType.APPEND)); configs.put(new ConfigResource(ConfigResource.Type.TOPIC, topicName), ops); System.out.println(new StringBuilder().append("i=").append(i).append(" throttles="). append(throttles.substring(0, 10)).append("...").append(throttles.substring(throttles.length()-10, throttles.length()))); adminClient.incrementalAlterConfigs(configs).all().get(); Thread.sleep(1000); } } } } {code} output : {code:java} i=0 throttles=0:0,1:1,2:...98,999:999 i=1 throttles=1000:1000,...,1999:1999 i=2 throttles=2000:2000,...,2999:2999 i=3 throttles=3000:3000,...,3999:3999 Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155) at mysamples.jira.kafka18020.MyAdminKafka18020.main(MyAdminKafka18020.java:36) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request {code} server logs: {code:java} [2024-11-15 13:05:44,001] ERROR Encountered quorum controller fault: incrementalAlterConfigs: event failed with RuntimeException (treated as UnknownServerException) at epoch 5 in 40005 microseconds. Renouncing leadership and reverting to the last committed offset 7788. (org.apache.kafka.server.fault.LoggingFaultHandler) java.lang.RuntimeException: 'value' field is too long to be serialized at org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192) at org.apache.kafka.common.protocol.Message.size(Message.java:51) at org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66) at org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43) at org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340) at org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136) at org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186) at org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146) at org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334) at org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806) at org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891) at org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186) at java.base/java.lang.Thread.run(Thread.java:840) [2024-11-15 13:05:44,003] INFO [RaftManager id=1] Received user request to resign from the current epoch 5 (org.apache.kafka.raft.KafkaRaftClient) [2024-11-15 13:05:44,004] INFO [RaftManager id=1] Completed transition to ResignedState(localId=1, epoch=5, voters=[1], electionTimeoutMs=1906, unackedVoters=[], preferredSuccessors=[]) from Leader(localReplicaKey=ReplicaKey(id=1, directoryId=Optional[rT810KS9zfczOsLnqapVBQ]), epoch=5, epochStartOffset=6568, highWatermark=Optional[LogOffsetMetadata(offset=7789, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])], voterStates={1=ReplicaState(replicaKey=ReplicaKey(id=1, directoryId=Optional.empty), endOffset=Optional[LogOffsetMetadata(offset=7789, metadata=Optional[(segmentBaseOffset=0,relativePositionInSegment=777628)])], lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) (org.apache.kafka.raft.QuorumState) [2024-11-15 13:05:45,564] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1, clientId=1, correlationId=1915, headerVersion=2) -- BrokerHeartbeatRequestData(brokerId=1, brokerEpoch=7, currentMetadataOffset=7788, wantFence=false, wantShutDown=false, offlineLogDirs=[]) with context RequestContext(header=RequestHeader(apiKey=BROKER_HEARTBEAT, apiVersion=1, clientId=1, correlationId=1915, headerVersion=2), connectionId='127.0.0.1:9093-127.0.0.1:54455-0-25', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=4.0.0-SNAPSHOT), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@16a3a3da]) (kafka.server.ControllerApis) org.apache.kafka.common.errors.NotControllerException: The active controller appears to be node 1. [2024-11-15 13:05:45,565] INFO [NodeToControllerChannelManager id=1 name=heartbeat] Client requested disconnect from node 1 (org.apache.kafka.clients.NetworkClient) [2024-11-15 13:05:45,565] INFO [broker-1-to-controller-heartbeat-channel-manager]: Recorded new KRaft controller, from now on will use node localhost:9093 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread) {code} > Encountered quorum controller fault: incrementalAlterConfigs .. > RuntimeException: 'value' field is too long to be serialized > ---------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-18020 > URL: https://issues.apache.org/jira/browse/KAFKA-18020 > Project: Kafka > Issue Type: Bug > Components: kraft > Affects Versions: 4.0.0 > Reporter: Edoardo Comar > Priority: Major > > On a topic with a large number of partitions (5000) by repeatedly invoking > {color:#000000}ReassignPartitionsCommand{color}{color:#00627a}.modifyTopicThrottles > {color} > each time with a new set of replica throttles, > the Quorum controller attempts to write a ConfigRecord that is too large : > > {{[2024-11-14 15:34:41,612] ERROR Encountered quorum controller fault: > incrementalAlterConfigs: event failed with RuntimeException (treated as > UnknownServerException) at epoch 24 in 75784 microseconds. Renouncing > leadership and reverting to the last committed offset 214588. > (org.apache.kafka.server.fault.LoggingFaultHandler)}} > {{java.lang.RuntimeException: 'value' field is too long to be serialized}} > {{ at > org.apache.kafka.common.metadata.ConfigRecord.addSize(ConfigRecord.java:192)}} > {{ at org.apache.kafka.common.protocol.Message.size(Message.java:51)}} > {{ at > org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:66)}} > {{ at > org.apache.kafka.server.common.serialization.AbstractApiMessageSerde.recordSize(AbstractApiMessageSerde.java:43)}} > {{ at > org.apache.kafka.raft.internals.BatchBuilder.bytesNeededForRecords(BatchBuilder.java:340)}} > {{ at > org.apache.kafka.raft.internals.BatchBuilder.bytesNeeded(BatchBuilder.java:136)}} > {{ at > org.apache.kafka.raft.internals.BatchAccumulator.maybeAllocateBatch(BatchAccumulator.java:186)}} > {{ at > org.apache.kafka.raft.internals.BatchAccumulator.append(BatchAccumulator.java:146)}} > {{ at > org.apache.kafka.raft.KafkaRaftClient.append(KafkaRaftClient.java:3334)}} > {{ at > org.apache.kafka.raft.KafkaRaftClient.prepareAppend(KafkaRaftClient.java:3320)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.lambda$run$0(QuorumController.java:806)}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:891)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:800)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:132)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:215)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:186)}} > {{ at java.base/java.lang.Thread.run(Thread.java:840)}} > > the adminClient receives an UnknownServerException: > > {{Error: org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request.}} > {{java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request.}} > {{ at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)}} > {{ at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)}} > {{ at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:155)}} > {{ at > org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles(ReassignPartitionsCommand.java:1112)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)