Haibo Sun created FLINK-12529:
---------------------------------

             Summary: Release buffers of the record deserializer timely to 
improve the efficiency of heap memory usage on taskmanager
                 Key: FLINK-12529
                 URL: https://issues.apache.org/jira/browse/FLINK-12529
             Project: Flink
          Issue Type: Improvement
    Affects Versions: 1.8.0
            Reporter: Haibo Sun
            Assignee: Haibo Sun


In input processors (`StreamInputProcessor` and `StreamTwoInputProcessor`), 
each input channel has a corresponding record deserializer. Currently, these 
record deserializers are cleaned up at the end of the task (look at 
`StreamInputProcessor#cleanup()` and `StreamTwoInputProcessor#cleanup()`). This 
is not a problem for unbounded streams, but it may reduce the efficiency of 
heap memory usage on taskmanger when input is bounded stream.

For example, in case that all inputs are bounded streams, some of them end very 
early because of the small amount of data, and the other end very late because 
of the large amount of data, then the buffers of the record deserializers 
corresponding to the input channels finished early is idle for a long time and 
no longer used.

In another case, when both unbounded and bounded streams exist in the inputs, 
the buffers of the record deserializers corresponding to the bounded stream are 
idle for ever (no longer used) after the bounded streams are finished. 
Especially when the record and the parallelism of upstream are large, the total 
size of `SpanningWrapper#buffer` are very large. The size of 
`SpanningWrapper#buffer` is allowed to reach up to 5 MB, and if the parallelism 
of upstream is 100, the maximum total size will reach 500 MB (in our 
production, there are jobs with the record size up to hundreds of KB and the 
parallelism of upstream up to 1000).

Overall, after receiving `EndOfPartitionEvent` from the input channel, the 
corresponding record deserializer should be cleared immediately to improve the 
efficiency of heap memory usage on taskmanager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to