[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869862#comment-16869862 ]
Guozhang Wang commented on KAFKA-8106: -------------------------------------- The merged PR is inspired by the original work of #6699 by [~Flower.min]. It tries to achieve the same CPU savings by reducing unnecessary byte allocation and corresponding GC. Unlike #6699 though, which depends on skipBytes of LZ4 which used a shared byte array, in this PR we create a skip buffer outside of the compressed input stream. The reason is that not all compressed inputstream's implementation is optimized, more specifically: 1. GZIP used BufferedInputStream, which has a shared buffer, sized 16KB 2. SNAPPY used its own SnappyInputStream -> InputStream, which dynamically allocate 3. LZ4 used its own KafkaLZ4BlockInputStream, which has a shared buffer of 64KB 4. ZSTD used its own ZstdInputStream, but it's own overriden skip also dynamically allocate The detailed implementation can be summarized as follows: 1. Add skipKeyValueIterator() into DefaultRecordBatch, used in LogValidator; also added PartialDefaultRecord which extends DefaultRecord. 1.a. In order make this optimization really effective, we also need to refactor the LogValidator to refactor part of the validation per record into the outer loop so that we do not need to update inPlaceAssigment inside the loop any more. And then based on this boolean we can decide whether or not to use skipKeyValueIterator or not before the loop. 1.b. Also used streaming iterator instead when skip-iterator cannot be used. 2. With SkipKeyValueIterator, pre-allocate a skip byte array with fixed size (2KB), and use this array to take the decompressed bytes through each record, validating metadata and key / value / header size, while skipping the key / value bytes. 3. Also tighten the unit tests of LogValidator to make sure scenarios like mismatched magic bytes / multiple batches per partition / discontinuous offsets / etc are indeed validated. > Reducing the allocation and copying of ByteBuffer when logValidator do > validation. > ------------------------------------------------------------------------------------ > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.2.0, 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. > Reporter: Flower.min > Assignee: Flower.min > Priority: Major > Labels: performance > Fix For: 2.4.0 > > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .After we checked and completed the performance test again, we located the > code "*ByteBuffer recordBuffer = > ByteBuffer.allocate(sizeOfBodyInBytes);*(*Class:DefaultRecord,Function:readFrom()*)” > which consumed CPU resources and caused a lot of GC .Our modified code > reduces the allocation and copying of ByteBuffer, so the test performance is > greatly improved, and the CPU's stable usage is *below 60%*. The following is > a comparison of different code test performance under the same conditions. > *Result of performance testing* > *Main config of Kafka: Single > Message:1024B;TopicPartitions:200;linger.ms:1000ms.* > | Single Message : 1024B,|Network inflow rate|CPU(%)|Messages/s| > |Source code|600M/s|97%|25,000,000| > |Modified code|1GB/s|<60%|41,660,000| > **1.Before modified code(Source code) GC:** >  > **2.After modified code(remove allocation of ByteBuffer) GC:** >  -- This message was sent by Atlassian JIRA (v7.6.3#76005)