Hello,

I'm trying to use the HybridSource to read from a bounded FLIP-27 Iceberg
source and an unbounded FLIP-27 PubSub source. Based on my searches, the
most common patterns for assigning timestamps and watermarks with FLIP-27
sources are either via the fromSource method in
StreamingExecutionEnvironment, which returns a subclass of DataStream, or
using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
existing DataStream.

But the only way to build a HybridSource, it seems, is via its addSource
method, which accepts only Source types. We cannot call an analogue of
assignTimestampsAndWatermarks on a Source.

We could perhaps call that method on the DataStream produced by
env.fromSource(hybridSource), but I believe that would use the same
timestamp & watermark assignment strategy for both. We need the watermarks
to be assigned differently based on the source. Our Iceberg tables are
partitioned by day, so the watermark lags by about 1 day when we use
event-time aligned assignment until it finishes. Those watermarks originate
within the Source. But for PubSub, we'd like to use the generic bounded
out-of-orderness watermark strategy for just a few seconds of
out-of-orderness. The PubSub source doesn't generate watermarks within the
source -- there is no per-split information exposed by PubSub that would
make a better watermark, like in the Kafka connector. The only way to
assign it a watermark strategy is via the DataStream-level assignment
methods.

Is the only way to use different watermark strategies to modify the PubSub
Source to do the assignment within the Source?

I wonder if, given that it seems much more common to assign watermarks at
the DataStream level (e.g. FLIP-182's examples), if HybridSource could be
modified to accept an optional WatermarkStrategy for each underlying
source, similar to how fromSource works.

Kind regards,
David

Reply via email to