Hi Martijn,

That's correct. We have a forked version of the PR from Ryan Skraba (
https://github.com/apache/flink/pull/18823), which itself is originally
based on the PR from Jakob Edding (
https://github.com/apache/flink/pull/15152), and have incorporated an old
PR (https://github.com/apache/flink/pull/9388) for retries from Richard
Deurwaarder, the main author of the non-FLIP-27 PubSub source. Some changes
suggested in code review & other cosmetic changes are also in it.

I'm aware of the ongoing efforts to split out connectors into separate
repositories, and I'd planned on opening a new PR somewhere -- either the
flink repository, or a new one -- to contribute it upstream. Any
suggestions on how to proceed? The forked code runs without issues on our
production traffic.

As far as the HybridSource & watermarking is concerned, I realized that I
can use the switch timestamp to determine whether the underlying Iceberg or
PubSub source is being used, within the same WatermarkGenerator. This way,
a single WatermarkGenerator can be applied to the HybridSource that has the
correct behavior for each underlying source. I haven't confirmed everything
works as expected, but I am pretty optimistic.

Also, in my last message, I was mistaken about the Iceberg source creating
watermarks internally. In a conversation with Steven Wu, he mentioned that
the Iceberg source does not support generating watermarks inside. I don't
quite have a full understanding of how watermarks are generated within
sources & was confused by the event-time-aligned assigner code's in-memory
"watermark" tracker -- it may be that these aren't watermarks, but instead
are just timestamp trackers used for aligning splits.

Kind regards,
David

On Tue, Jan 17, 2023 at 6:01 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi David,
>
> While I don't immediately have an answer to your question, I was triggered
> by your inclusion of "an unbounded FLIP-27 PubSub source". Do you mean your
> own (forked) version of GCP using the new interfaces, since the current GCP
> connector is not a FLIP-27 compatible source?
>
> Best regards,
>
> Martijn
>
> Op vr 13 jan. 2023 om 08:03 schreef David Christle via user <
> user@flink.apache.org>:
>
>> Hello,
>>
>> I'm trying to use the HybridSource to read from a bounded FLIP-27 Iceberg
>> source and an unbounded FLIP-27 PubSub source. Based on my searches, the
>> most common patterns for assigning timestamps and watermarks with FLIP-27
>> sources are either via the fromSource method in
>> StreamingExecutionEnvironment, which returns a subclass of DataStream, or
>> using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already
>> existing DataStream.
>>
>> But the only way to build a HybridSource, it seems, is via its addSource
>> method, which accepts only Source types. We cannot call an analogue of
>> assignTimestampsAndWatermarks on a Source.
>>
>> We could perhaps call that method on the DataStream produced by
>> env.fromSource(hybridSource), but I believe that would use the same
>> timestamp & watermark assignment strategy for both. We need the watermarks
>> to be assigned differently based on the source. Our Iceberg tables are
>> partitioned by day, so the watermark lags by about 1 day when we use
>> event-time aligned assignment until it finishes. Those watermarks originate
>> within the Source. But for PubSub, we'd like to use the generic bounded
>> out-of-orderness watermark strategy for just a few seconds of
>> out-of-orderness. The PubSub source doesn't generate watermarks within the
>> source -- there is no per-split information exposed by PubSub that would
>> make a better watermark, like in the Kafka connector. The only way to
>> assign it a watermark strategy is via the DataStream-level assignment
>> methods.
>>
>> Is the only way to use different watermark strategies to modify the
>> PubSub Source to do the assignment within the Source?
>>
>> I wonder if, given that it seems much more common to assign watermarks at
>> the DataStream level (e.g. FLIP-182's examples), if HybridSource could be
>> modified to accept an optional WatermarkStrategy for each underlying
>> source, similar to how fromSource works.
>>
>> Kind regards,
>> David
>>
>>

Reply via email to