If I understand this correctly *Stream A *should be a changelog stream and *Stream B *an append-only stream. Then you want to use *Stream A* as a *lookup Table *for *Stream B?* I don't think there is an out-of-the box way for saying - start reading StreamB only after I have read up to this point from *Stream A.* But I think using a Temporal Join https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins should help, since temporal condition should ensure your events are match against the latest available key version. The other approach I think (if you want to use the DataStream API for example) will be to buffer *Stream B* elements (inside a process Function for example) until you have read up to the specific point from Stream A.
On Thu, Mar 9, 2023 at 9:49 AM Yuval Itzchakov <yuva...@gmail.com> wrote: > Thank you Mason and Ken for your replies > > The source itself isn't actually hybrid. The requirement is to materialize > a stream first since the stream itself may contain updates to existing > keys, changing their meanings, i.e. we can have key A go from medium > importance to high importance and vice versa, and we want the stream which > performs the lookups to have the most up to date view of the data. > > On Thu, Mar 9, 2023, 02:34 Mason Chen <mas.chen6...@gmail.com> wrote: > >> Hi Yuval, >> >> It seems you are trying to perform bootstrapping on a Flink job by doing >> the bounded read first. A good pattern to follow is to use HybridSource [1] >> and the docs have some examples with File and Kafka sources. The point of >> switching can be coordinated by the source so that you can dynamically >> infer where to start from stream B. Please take a look at the link and let >> me know if you have additional questions. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/ >> >> Best, >> Mason >> >> On Wed, Mar 8, 2023 at 11:12 AM Yuval Itzchakov <yuva...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I have a use-case where I have two streams, call them A and B. I need to >>> consume stream A up to a certain point, and only then start processing on >>> stream B. >>> >>> What could be a way to go about this? >>> >>