Hi Łukasz, First, we are planning to design and implement the BoundedStream story, which will be discussed further in 1.11 or 1.12.
SortedMapState was discussed in FLINK-6219 [1], But there are some problems that can not be solved well, so they have not been introduced. If it is a pure BoundedStream without checkpoints, it is not recommended to use state, because state is usually used for checkpoints, which will cause more overhead. SortOperator is introduced for table BaseRow, I recommend that you use the UnilateralSortMerger to construct your own SortOperator. [1] https://issues.apache.org/jira/browse/FLINK-6219 Best, Jingsong Lee On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski <[email protected]> wrote: > Hi all, > > In Flink 1.9 couple of changes was introduced to deal with bounded > streams e.g. BoundedOneInput interface. I'm wondering would it be > doable to do some kind of global sort after receiving end input event on > finished data stream source, using only DataStream API? > > We have made some experiments with BoundedOneInput - buffering elements > and then sorting them after receiving the end input event and finally > emitting sorted elements. it is seems to be working as excepted though > we are having troubles to sort a big stream efficiently. One problem is > missing appropriate state type something like SortedMapState. While > using MapState the elements are inserted into a kind of byte order. I > think it could be possible to do some key modification to achieve > correct bytes order but it's not trivial for every type (string, int, > tuples, and so on). Do you plan adding such kind of sorted state? > > In Flink Table API there is SortOperator but it is restricted to > BinaryRow. Would it be possible to adapt this functionality in streaming > API for arbitrary types? What do you think? > > Thanks, > Łukasz > > -- Best, Jingsong Lee
