[ 
https://issues.apache.org/jira/browse/FLINK-11859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-11859.
----------------------------------
    Resolution: Fixed

Fixed via 35e57a8460c7f03010972f587bb24052ea694cce

> Improve SpanningRecordSerializer performance by serializing record length to 
> serialization buffer directly
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11859
>                 URL: https://issues.apache.org/jira/browse/FLINK-11859
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Assignee: Yingjie Cao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In the current implementation of SpanningRecordSerializer, the length of a 
> record is serialized to an intermediate length buffer and then copied to the 
> target buffer. Actually, the length filed can be serialized directly to the 
> data buffer (serializationBuffer), which can avoid the copy of length buffer. 
> Though the total bytes copied remain unchanged, it one copy of a small record 
> which incurs high overhead. The flink-benchmarks shows it can improve 
> performance and the test results are as follows.
> Result with the optimization:
> |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
> channelsFlushTimeout|Param: stateBackend|
> |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
> |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms|
>  |MEMORY|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms|
>  |FS|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms|
>  |FS_ASYNC|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms|
>  |ROCKS|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms|
>  |ROCKS_INC|
> |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms|
>  | |
> |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms|
>  | |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL|
>  |
> |SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | 
> |
> |WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
> |WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
> |WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
> |WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
> |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op|
>  | |
> Result without the optimization:
>  
> |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
> channelsFlushTimeout|Param: stateBackend|
> |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
> |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms|
>  |MEMORY|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms|
>  |FS|
> |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms|
>  |FS_ASYNC|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms|
>  |ROCKS|
> |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms|
>  |ROCKS_INC|
> |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms|
>  | |
> |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms|
>  | |
> |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms|
>  | |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|20788.34364|1146.470652|ops/ms|1000,1ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|24412.00941|981.98882|ops/ms|1000,100ms|
>  |
> |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8284.336114|177.482373|ops/ms|1000,100ms,SSL|
>  |
> |SumLongsBenchmark.benchmarkCount|thrpt|1|30|7846.800667|127.321584|ops/ms| | 
> |
> |WindowBenchmarks.globalWindow|thrpt|1|30|4837.270101|94.519852|ops/ms| | |
> |WindowBenchmarks.sessionWindow|thrpt|1|30|591.304589|5.324132|ops/ms| | |
> |WindowBenchmarks.slidingWindow|thrpt|1|30|446.605784|2.53677|ops/ms| | |
> |WindowBenchmarks.tumblingWindow|thrpt|1|30|2878.885056|64.035709|ops/ms| | |
> |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.705601|0.164959|ms/op|
>  | |
>  
> The optimization is especially useful for small records.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to