Hi David, That's good to know, perhaps it would be nice to see if we can move it all forward into a new version. The externalization is nearly finished (I pushed a release candidate out for the version that's currently bundled with Flink 1.16), so let's see if we can move it forward. I'm also looping in Ryan who has been helping out here too.
If you are able to confirm that it works, do let us know. It could also be interesting as a blog post to share. Best regards, Martijn Op wo 18 jan. 2023 om 07:43 schreef David Christle < david.chris...@discordapp.com>: > Hi Martijn, > > That's correct. We have a forked version of the PR from Ryan Skraba ( > https://github.com/apache/flink/pull/18823), which itself is originally > based on the PR from Jakob Edding ( > https://github.com/apache/flink/pull/15152), and have incorporated an old > PR (https://github.com/apache/flink/pull/9388) for retries from Richard > Deurwaarder, the main author of the non-FLIP-27 PubSub source. Some changes > suggested in code review & other cosmetic changes are also in it. > > I'm aware of the ongoing efforts to split out connectors into separate > repositories, and I'd planned on opening a new PR somewhere -- either the > flink repository, or a new one -- to contribute it upstream. Any > suggestions on how to proceed? The forked code runs without issues on our > production traffic. > > As far as the HybridSource & watermarking is concerned, I realized that I > can use the switch timestamp to determine whether the underlying Iceberg or > PubSub source is being used, within the same WatermarkGenerator. This way, > a single WatermarkGenerator can be applied to the HybridSource that has the > correct behavior for each underlying source. I haven't confirmed everything > works as expected, but I am pretty optimistic. > > Also, in my last message, I was mistaken about the Iceberg source creating > watermarks internally. In a conversation with Steven Wu, he mentioned that > the Iceberg source does not support generating watermarks inside. I don't > quite have a full understanding of how watermarks are generated within > sources & was confused by the event-time-aligned assigner code's in-memory > "watermark" tracker -- it may be that these aren't watermarks, but instead > are just timestamp trackers used for aligning splits. > > Kind regards, > David > > On Tue, Jan 17, 2023 at 6:01 AM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi David, >> >> While I don't immediately have an answer to your question, I was >> triggered by your inclusion of "an unbounded FLIP-27 PubSub source". Do you >> mean your own (forked) version of GCP using the new interfaces, since the >> current GCP connector is not a FLIP-27 compatible source? >> >> Best regards, >> >> Martijn >> >> Op vr 13 jan. 2023 om 08:03 schreef David Christle via user < >> user@flink.apache.org>: >> >>> Hello, >>> >>> I'm trying to use the HybridSource to read from a bounded FLIP-27 >>> Iceberg source and an unbounded FLIP-27 PubSub source. Based on my >>> searches, the most common patterns for assigning timestamps and watermarks >>> with FLIP-27 sources are either via the fromSource method in >>> StreamingExecutionEnvironment, which returns a subclass of DataStream, or >>> using assignTimestampsAndWatermarks(WatermarkStrategy wms) on an already >>> existing DataStream. >>> >>> But the only way to build a HybridSource, it seems, is via its addSource >>> method, which accepts only Source types. We cannot call an analogue of >>> assignTimestampsAndWatermarks on a Source. >>> >>> We could perhaps call that method on the DataStream produced by >>> env.fromSource(hybridSource), but I believe that would use the same >>> timestamp & watermark assignment strategy for both. We need the watermarks >>> to be assigned differently based on the source. Our Iceberg tables are >>> partitioned by day, so the watermark lags by about 1 day when we use >>> event-time aligned assignment until it finishes. Those watermarks originate >>> within the Source. But for PubSub, we'd like to use the generic bounded >>> out-of-orderness watermark strategy for just a few seconds of >>> out-of-orderness. The PubSub source doesn't generate watermarks within the >>> source -- there is no per-split information exposed by PubSub that would >>> make a better watermark, like in the Kafka connector. The only way to >>> assign it a watermark strategy is via the DataStream-level assignment >>> methods. >>> >>> Is the only way to use different watermark strategies to modify the >>> PubSub Source to do the assignment within the Source? >>> >>> I wonder if, given that it seems much more common to assign watermarks >>> at the DataStream level (e.g. FLIP-182's examples), if HybridSource could >>> be modified to accept an optional WatermarkStrategy for each underlying >>> source, similar to how fromSource works. >>> >>> Kind regards, >>> David >>> >>>