Interesting question. Maybe it's useful to compare this with what Flink SQL
does.

A strategy that sometimes works well is the one used by temporal joins in
Flink SQL. In this scenario there's an event that's being enriched, e.g.,
an Order being enriched with info about the Customer, Product, Payment,
Shipment, etc, using the versions of those enrichment tables that were in
effect at the time of the Order. The timestamp of the output record is the
timestamp of the Order, and the join will wait until the watermarks of the
other tables have advanced to that point.

The semantics of this event time temporal join are clean, and the output
stream is monotonic -- but things get messy when enrichment streams become
idle. Unfortunately, this is rather common.

Flink SQL's non-temporal joins don't set a timestamp on the output record.
This makes it impossible to use the output of a regular join as input to a
temporal operation without first materializing and re-ingesting the stream
of join results. An alternative that's been discussed would be to use the
timestamp of the latest processed record (your strategy #2), but I'm not
aware of any concrete plans to pursue this.

As for per-key watermarks, in my opinion we'll never see this in Flink.

On Wed, Mar 25, 2026 at 9:58 AM Salva Alcántara <[email protected]>
wrote:

> This is a conceptual question, and the answer will probably depend on the
> specific use case.
>
> But to make the context clear, I'm joining multiple input streams by a
> common (entity) id. Those inputs represent partial/disjoint updates for the
> same entity. E.g., consider device interfaces, device vms, device metrics,
> device vendor information, etc. I've personally worked on multiple such
> assemblers and the event time strategy was never super clear. That is, how
> to propagate the inputs' event times to the output, assembled entity.
> Again, conceptually those services emit entity snapshots at a given time.
>
> The strategies that I've commonly found in practice:
>
> 1. Use the max timestamp seen among the inputs (that has some nice
> properties like being monotonically increasing but it can potentially hide
> lag/delay in one of the inputs)
> 2. Use the timestamp of the latest processed event (this is how Flink
> would propagate the timestamp by default, e.g., when using a
> CoProcessFunction; this approach does not look semantically consistent plus
> the output timestamp wouldn't be monotonically increasing)
> 3. Use the watermark in conjunction with event time timers to emit
> periodic snapshots for example. This would recover the monotonicity at the
> expense of extra latency, exacerbated by the fact that watermarks are not
> calculated per-key so to say. On the flip side, it would immediately show
> if one source is delayed, which might be desirable—or not!
>
> Taking the chance for asking if there is something on the roadmap for
> supporting per-key watermarks along the lines of this:
> - https://github.com/TawfikYasser/Keyed-Watermarks-in-Apache-Flink
>
> Thanks!
>
> Salva
>
>

Reply via email to