Hello Flink comminity,
When using a KeyedProcessFunction<K, IN, OUT>, what is the *processing
order* of the records *belonging to the same key* within a single batch?
Concretely:
1. With *no event-time* defined, and running in *batch mode*, does
KeyedProcessFunction process all elements of the same key in a
*deterministic
order*?
2. Is that order tied to the *upstream partition/file order*, or to
any *internal
shuffle/sort* guarantees in batch execution (e.g., sort-shuffle /
hash-shuffle), or is it *unspecified*?
3. Is the per-key order *stable across runs* (same input & parallelism),
or can it vary?
4. If a stable/ascending order is required (e.g., by a field like ts or
seq), what is the *recommended way* to enforce it before/inside
KeyedProcessFunction in batch mode? (e.g., explicit sortPartition,
pre-aggregation, or other operator/setting)
*Minimal sketch:*
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<MyEvent> input = env.fromSource(...); // no timestamps/watermarks
input
.keyBy(e -> e.userId)
.process(new KeyedProcessFunction<String, MyEvent, MyOut>() {
@Override
public void processElement(MyEvent value, Context ctx,
Collector<MyOut> out) {
// Question: in what order will events with the same userId arrive here?
}
});
Any pointers to *docs / source code locations* (for 1.19 batch) that
describe the per-key ordering semantics would be highly appreciated. Thanks!
Best!