yuzawa-san opened a new pull request #9499: URL: https://github.com/apache/kafka/pull/9499
Zstd-jni 1.4.5-6 allocates large internal buffers inside of ZstdInputStream and ZstdOutputStream. This caused a lot of allocation and GC activity when creating and closing the streams. It also does not buffer the reads or writes. This causes inefficiency when DefaultRecord.writeTo() does a series of small single bytes reads using various ByteUtils methods. The JNI is more efficient if the writes of uncompressed data were flushed in large pieces rather than for each byte. This is due to the the expense of context switching between the Java code and the native code. This is also the case when reading as well. Per https://github.com/luben/zstd-jni/issues/141 the maintainer of zstd-jni and I agreed to not buffer reads and writes in favor of having the caller do that, so here we are updating the caller. Here is part of a flame graph of CPU time spent in the single byte writes from Kafka:  In this patch, I upgraded to the most recent zstd-jni version with the buffer reuse built-in. This was done in https://github.com/luben/zstd-jni/pull/143 and https://github.com/luben/zstd-jni/pull/146 Since we decided not to add additional buffering of input/output with zstd-jni, I added the BufferedInputStream and BufferedOutputStream to CompressionType.ZSTD just like we currently do for CompressionType.GZIP which also is inefficient for single byte reads and writes. I used the same buffer sizes as that existing implementation. NOTE: if so desired we could pass a wrapped BufferSupplier into the Zstd*Stream classes to have Kafka decide how the buffer recycling occurs. This functionality was added in the latter PR linked above. I am holding off on this since based on jmh benchmarking the performance gains were not clear and personally I don't know if it worth the complexity of trying to hack around the reflection at this point in time. The zstd-jni uses a very similar default recycler as snappy does currently which seems to provide decent efficiency. While this PR fixes the defect, I feel that using BufferSupplier in both zstd-jni and snappy is outside of the scope of this bugfix and should be considered a separate improvement. I would prefer this change get merged in on its own since the performance gains here are very significant relative to the more incremental and minor optimizations which could be achieved by doing that separate work. There are some noticeable improvements in the JMH benchmarks (excerpt): BEFORE: ``` Benchmark (bufferSupplierStr) (bytes) (compressionType) (maxBatchSize) (messageSize) (messageVersion) Mode Cnt Score Error Units CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed CREATE RANDOM ZSTD 200 1000 2 thrpt 15 27743.260 ± 673.869 ops/s CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 3399.966 ± 82.608 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 134968.010 ± 0.012 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 3850.985 ± 84.476 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 152881.128 ± 942.189 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 174.241 ± 3.486 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 6917.758 ± 82.522 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 1689.000 counts CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 82621.000 ms JMH benchmarks done Benchmark (bufferSupplierStr) (bytes) (compressionType) (maxBatchSize) (messageSize) (messageVersion) Mode Cnt Score Error Units RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage CREATE RANDOM ZSTD 200 1000 2 thrpt 15 24095.711 ± 895.866 ops/s RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2932.289 ± 109.465 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 134032.012 ± 0.013 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 3282.912 ± 115.042 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 150073.914 ± 1342.235 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 149.697 ± 5.786 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 6842.462 ± 64.515 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 1449.000 counts RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 82518.000 ms RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize CREATE RANDOM ZSTD 200 1000 2 thrpt 15 1449.060 ± 230.498 ops/s RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 198.051 ± 31.532 MB/sec RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 150502.519 ± 0.186 B/op RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 200.064 ± 31.879 MB/sec RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 152569.341 ± 13826.686 B/op RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 91.000 counts RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 75869.000 ms RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2609.660 ± 1145.160 ops/s RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 815.441 ± 357.818 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 344309.097 ± 0.238 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 808.952 ± 354.975 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 345712.061 ± 51434.034 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.019 ± 0.042 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 18.615 ± 42.045 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 24.132 ± 12.254 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 13540.960 ± 14649.192 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 148.000 counts RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 23848.000 ms JMH benchmarks done ``` AFTER: ``` Benchmark (bufferSupplierStr) (bytes) (compressionType) (maxBatchSize) (messageSize) (messageVersion) Mode Cnt Score Error Units CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed CREATE RANDOM ZSTD 200 1000 2 thrpt 15 147792.454 ± 2721.318 ops/s CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2708.481 ± 50.012 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 20184.002 ± 0.002 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2732.667 ± 59.258 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 20363.460 ± 120.585 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.042 ± 0.033 MB/sec CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.316 ± 0.249 B/op CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 833.000 counts CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 8390.000 ms JMH benchmarks done Benchmark (bufferSupplierStr) (bytes) (compressionType) (maxBatchSize) (messageSize) (messageVersion) Mode Cnt Score Error Units RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage CREATE RANDOM ZSTD 200 1000 2 thrpt 15 166786.092 ± 3285.702 ops/s RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2926.914 ± 57.464 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 19328.002 ± 0.002 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2938.541 ± 66.850 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 19404.357 ± 177.485 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.516 ± 0.100 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 3.409 ± 0.657 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.032 ± 0.131 MB/sec RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.207 ± 0.858 B/op RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 834.000 counts RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 9370.000 ms RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize CREATE RANDOM ZSTD 200 1000 2 thrpt 15 15988.116 ± 137.427 ops/s RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 448.636 ± 3.851 MB/sec RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 30907.698 ± 0.020 B/op RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 450.905 ± 5.587 MB/sec RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 31064.113 ± 291.190 B/op RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.043 ± 0.007 MB/sec RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2.931 ± 0.493 B/op RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 790.000 counts RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 999.000 ms RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize CREATE RANDOM ZSTD 200 1000 2 thrpt 15 11345.169 ± 206.528 ops/s RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2314.800 ± 42.094 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 224714.266 ± 0.028 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2320.213 ± 45.521 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 225235.965 ± 803.309 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen CREATE RANDOM ZSTD 200 1000 2 thrpt 15 0.026 ± 0.005 MB/sec RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm CREATE RANDOM ZSTD 200 1000 2 thrpt 15 2.551 ± 0.455 B/op RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count CREATE RANDOM ZSTD 200 1000 2 thrpt 15 994.000 counts RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time CREATE RANDOM ZSTD 200 1000 2 thrpt 15 1189.000 ms JMH benchmarks done ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org