Hello Flink comminity,

When using a KeyedProcessFunction<K, IN, OUT>, what is the *processing
order* of the records *belonging to the same key* within a single batch?

Concretely:

   1. With *no event-time* defined, and running in *batch mode*, does
   KeyedProcessFunction process all elements of the same key in a
*deterministic
   order*?
   2. Is that order tied to the *upstream partition/file order*, or to
any *internal
   shuffle/sort* guarantees in batch execution (e.g., sort-shuffle /
   hash-shuffle), or is it *unspecified*?
   3. Is the per-key order *stable across runs* (same input & parallelism),
   or can it vary?
   4. If a stable/ascending order is required (e.g., by a field like ts or
   seq), what is the *recommended way* to enforce it before/inside
   KeyedProcessFunction in batch mode? (e.g., explicit sortPartition,
   pre-aggregation, or other operator/setting)

*Minimal sketch:*

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<MyEvent> input = env.fromSource(...); // no timestamps/watermarks

input
  .keyBy(e -> e.userId)
  .process(new KeyedProcessFunction<String, MyEvent, MyOut>() {
    @Override
    public void processElement(MyEvent value, Context ctx,
Collector<MyOut> out) {
      // Question: in what order will events with the same userId arrive here?
    }
  });

Any pointers to *docs / source code locations* (for 1.19 batch) that
describe the per-key ordering semantics would be highly appreciated. Thanks!

Best!

Reply via email to