yuzawa-san opened a new pull request #9499:
URL: https://github.com/apache/kafka/pull/9499


   Zstd-jni 1.4.5-6 allocates large internal buffers inside of ZstdInputStream 
and ZstdOutputStream. This caused a lot of allocation and GC activity when 
creating and closing the streams. It also does not buffer the reads or writes. 
This causes inefficiency when DefaultRecord.writeTo() does a series of small 
single bytes reads using various ByteUtils methods. The JNI is more efficient 
if the writes of uncompressed data were flushed in large pieces rather than for 
each byte. This is due to the the expense of context switching between the Java 
code and the native code. This is also the case when reading as well. Per 
https://github.com/luben/zstd-jni/issues/141 the maintainer of zstd-jni and I 
agreed to not buffer reads and writes in favor of having the caller do that, so 
here we are updating the caller. Here is part of a flame graph of  CPU time 
spent in the single byte writes from Kafka:
   
   
![image](https://user-images.githubusercontent.com/1082334/97130235-3113cd00-1717-11eb-835c-cc145d549604.png)
   
   In this patch, I upgraded to the most recent zstd-jni version with the 
buffer reuse built-in. This was done in 
https://github.com/luben/zstd-jni/pull/143 and 
https://github.com/luben/zstd-jni/pull/146 Since we decided not to add 
additional buffering of input/output with zstd-jni, I added the 
BufferedInputStream and BufferedOutputStream to CompressionType.ZSTD just like 
we currently do for CompressionType.GZIP which also is inefficient for single 
byte reads and writes. I used the same buffer sizes as that existing 
implementation.
   
   NOTE: if so desired we could pass a wrapped BufferSupplier into the 
Zstd*Stream classes to have Kafka decide how the buffer recycling occurs. This 
functionality was added in the latter PR linked above. I am holding off on this 
since based on jmh benchmarking the performance gains were not clear and 
personally I don't know if it worth the complexity of trying to hack around the 
reflection at this point in time. The zstd-jni uses a very similar default 
recycler as snappy does currently which seems to provide decent efficiency. 
While this PR fixes the defect, I feel that using BufferSupplier in both 
zstd-jni and snappy is outside of the scope of this bugfix and should be 
considered a separate improvement. I would prefer this change get merged in on 
its own since the performance gains here are very significant relative to the 
more incremental and minor optimizations which could be achieved by doing that 
separate work.
   
   There are some noticeable improvements in the JMH benchmarks (excerpt):
   
   BEFORE:
   ```
   Benchmark                                                                    
                                                (bufferSupplierStr)  (bytes)  
(compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt  
     Score     Error   Units
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed
                                                CREATE   RANDOM               
ZSTD             200           1000                 2  thrpt   15   27743.260 ± 
673.869   ops/s
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate
                                 CREATE   RANDOM               ZSTD             
200           1000                 2  thrpt   15    3399.966 ±  82.608  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm
                            CREATE   RANDOM               ZSTD             200  
         1000                 2  thrpt   15  134968.010 ±   0.012    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15    3850.985 ±  84.476  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm
                   CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15  152881.128 ± 942.189    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15     174.241 ±   3.486  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space.norm
               CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15    6917.758 ±  82.522    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15    1689.000            
counts
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time
                                       CREATE   RANDOM               ZSTD       
      200           1000                 2  thrpt   15   82621.000              
  ms
   JMH benchmarks done
   
   Benchmark                                                                    
                                (bufferSupplierStr)  (bytes)  (compressionType) 
 (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score       
Error   Units
   RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage       
                                             CREATE   RANDOM               ZSTD 
            200           1000                 2  thrpt   15   24095.711 ±   
895.866   ops/s
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate
                                     CREATE   RANDOM               ZSTD         
    200           1000                 2  thrpt   15    2932.289 ±   109.465  
MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm
                                CREATE   RANDOM               ZSTD             
200           1000                 2  thrpt   15  134032.012 ±     0.013    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space
                            CREATE   RANDOM               ZSTD             200  
         1000                 2  thrpt   15    3282.912 ±   115.042  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm
                       CREATE   RANDOM               ZSTD             200       
    1000                 2  thrpt   15  150073.914 ±  1342.235    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15     149.697 ±     5.786  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm
                   CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15    6842.462 ±    64.515    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count
                                          CREATE   RANDOM               ZSTD    
         200           1000                 2  thrpt   15    1449.000           
   counts
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time 
                                          CREATE   RANDOM               ZSTD    
         200           1000                 2  thrpt   15   82518.000           
       ms
   RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize        
                                             CREATE   RANDOM               ZSTD 
            200           1000                 2  thrpt   15    1449.060 ±   
230.498   ops/s
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15     198.051 ±    31.532  
MB/sec
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm
                                 CREATE   RANDOM               ZSTD             
200           1000                 2  thrpt   15  150502.519 ±     0.186    B/op
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space
                             CREATE   RANDOM               ZSTD             200 
          1000                 2  thrpt   15     200.064 ±    31.879  MB/sec
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15  152569.341 ± 13826.686    B/op
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count 
                                          CREATE   RANDOM               ZSTD    
         200           1000                 2  thrpt   15      91.000           
   counts
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time  
                                          CREATE   RANDOM               ZSTD    
         200           1000                 2  thrpt   15   75869.000           
       ms
   RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize   
                                             CREATE   RANDOM               ZSTD 
            200           1000                 2  thrpt   15    2609.660 ±  
1145.160   ops/s
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate
                                 CREATE   RANDOM               ZSTD             
200           1000                 2  thrpt   15     815.441 ±   357.818  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm
                            CREATE   RANDOM               ZSTD             200  
         1000                 2  thrpt   15  344309.097 ±     0.238    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15     808.952 ±   354.975  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm
                   CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15  345712.061 ± 51434.034    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen
                           CREATE   RANDOM               ZSTD             200   
        1000                 2  thrpt   15       0.019 ±     0.042  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm
                      CREATE   RANDOM               ZSTD             200        
   1000                 2  thrpt   15      18.615 ±    42.045    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15      24.132 ±    12.254  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space.norm
               CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15   13540.960 ± 14649.192    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15     148.000              
counts
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time
                                       CREATE   RANDOM               ZSTD       
      200           1000                 2  thrpt   15   23848.000              
    ms
   JMH benchmarks done
   ```
   
   AFTER:
   ```
   Benchmark                                                                    
                                            (bufferSupplierStr)  (bytes)  
(compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt  
     Score      Error   Units
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed
                                            CREATE   RANDOM               ZSTD  
           200           1000                 2  thrpt   15  147792.454 ± 
2721.318   ops/s
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate
                             CREATE   RANDOM               ZSTD             200 
          1000                 2  thrpt   15    2708.481 ±   50.012  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15   20184.002 ±    0.002    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15    2732.667 ±   59.258  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm
               CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15   20363.460 ±  120.585    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen
                       CREATE   RANDOM               ZSTD             200       
    1000                 2  thrpt   15       0.042 ±    0.033  MB/sec
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen.norm
                  CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15       0.316 ±    0.249    B/op
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count
                                  CREATE   RANDOM               ZSTD            
 200           1000                 2  thrpt   15     833.000             counts
   
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time
                                   CREATE   RANDOM               ZSTD           
  200           1000                 2  thrpt   15    8390.000                 
ms
   JMH benchmarks done
   
   Benchmark                                                                    
                            (bufferSupplierStr)  (bytes)  (compressionType)  
(maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      
Error   Units
   RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage       
                                         CREATE   RANDOM               ZSTD     
        200           1000                 2  thrpt   15  166786.092 ± 3285.702 
  ops/s
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate
                                 CREATE   RANDOM               ZSTD             
200           1000                 2  thrpt   15    2926.914 ±   57.464  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm
                            CREATE   RANDOM               ZSTD             200  
         1000                 2  thrpt   15   19328.002 ±    0.002    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15    2938.541 ±   66.850  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm
                   CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15   19404.357 ±  177.485    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen
                           CREATE   RANDOM               ZSTD             200   
        1000                 2  thrpt   15       0.516 ±    0.100  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen.norm
                      CREATE   RANDOM               ZSTD             200        
   1000                 2  thrpt   15       3.409 ±    0.657    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15       0.032 ±    0.131  MB/sec
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm
               CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15       0.207 ±    0.858    B/op
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15     834.000             
counts
   
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time 
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15    9370.000               
  ms
   RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize        
                                         CREATE   RANDOM               ZSTD     
        200           1000                 2  thrpt   15   15988.116 ±  137.427 
  ops/s
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate
                                  CREATE   RANDOM               ZSTD            
 200           1000                 2  thrpt   15     448.636 ±    3.851  MB/sec
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm
                             CREATE   RANDOM               ZSTD             200 
          1000                 2  thrpt   15   30907.698 ±    0.020    B/op
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space
                         CREATE   RANDOM               ZSTD             200     
      1000                 2  thrpt   15     450.905 ±    5.587  MB/sec
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15   31064.113 ±  291.190    B/op
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen
                            CREATE   RANDOM               ZSTD             200  
         1000                 2  thrpt   15       0.043 ±    0.007  MB/sec
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm
                       CREATE   RANDOM               ZSTD             200       
    1000                 2  thrpt   15       2.931 ±    0.493    B/op
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count 
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15     790.000             
counts
   
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time  
                                      CREATE   RANDOM               ZSTD        
     200           1000                 2  thrpt   15     999.000               
  ms
   RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize   
                                         CREATE   RANDOM               ZSTD     
        200           1000                 2  thrpt   15   11345.169 ±  206.528 
  ops/s
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate
                             CREATE   RANDOM               ZSTD             200 
          1000                 2  thrpt   15    2314.800 ±   42.094  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm
                        CREATE   RANDOM               ZSTD             200      
     1000                 2  thrpt   15  224714.266 ±    0.028    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space
                    CREATE   RANDOM               ZSTD             200          
 1000                 2  thrpt   15    2320.213 ±   45.521  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm
               CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15  225235.965 ±  803.309    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen
                       CREATE   RANDOM               ZSTD             200       
    1000                 2  thrpt   15       0.026 ±    0.005  MB/sec
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm
                  CREATE   RANDOM               ZSTD             200           
1000                 2  thrpt   15       2.551 ±    0.455    B/op
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count
                                  CREATE   RANDOM               ZSTD            
 200           1000                 2  thrpt   15     994.000             counts
   
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time
                                   CREATE   RANDOM               ZSTD           
  200           1000                 2  thrpt   15    1189.000                 
ms
   JMH benchmarks done
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to