If I understand this correctly *Stream A *should be a changelog stream
and *Stream
B *an append-only stream.
Then you want to use *Stream A* as a *lookup Table *for *Stream B?*
I don't think there is an out-of-the box way for saying - start reading
StreamB only after I have read up to this point from *Stream A.*
But I think using a Temporal Join
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
should help, since temporal condition should ensure your events are match
against the latest available key version.
The other approach I think (if you want to use the DataStream API for
example) will be to buffer *Stream B* elements (inside a process Function
for example) until you have read up to the specific point from Stream A.


On Thu, Mar 9, 2023 at 9:49 AM Yuval Itzchakov <yuva...@gmail.com> wrote:

> Thank you Mason and Ken for your replies
>
> The source itself isn't actually hybrid. The requirement is to materialize
> a stream first since the stream itself may contain updates to existing
> keys, changing their meanings, i.e. we can have key A go from medium
> importance to high importance and vice versa, and we want the stream which
> performs the lookups to have the most up to date view of the data.
>
> On Thu, Mar 9, 2023, 02:34 Mason Chen <mas.chen6...@gmail.com> wrote:
>
>> Hi Yuval,
>>
>> It seems you are trying to perform bootstrapping on a Flink job by doing
>> the bounded read first. A good pattern to follow is to use HybridSource [1]
>> and the docs have some examples with File and Kafka sources. The point of
>> switching can be coordinated by the source so that you can dynamically
>> infer where to start from stream B. Please take a look at the link and let
>> me know if you have additional questions.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>
>> Best,
>> Mason
>>
>> On Wed, Mar 8, 2023 at 11:12 AM Yuval Itzchakov <yuva...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a use-case where I have two streams, call them A and B. I need to
>>> consume stream A up to a certain point, and only then start processing on
>>> stream B.
>>>
>>> What could be a way to go about this?
>>>
>>

Reply via email to