Thank you, Lukasz! On Wed, Oct 17, 2018 at 9:22 AM Lukasz Cwik <[email protected]> wrote:
> Yes > > On Tue, Oct 16, 2018 at 4:34 PM [email protected] <[email protected]> > wrote: > >> Ok, thanks, but still I have to manually specify how to extract messages >> of a specific stream (previously flattened), e.g. via "instanceof". Right? >> >> On 2018/10/16 22:41:36, Lukasz Cwik <[email protected]> wrote: >> > Use Flatten to "merge" two PCollections and then GBK to group all the >> > records by key. >> > >> > On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]> >> wrote: >> > >> > > I realized I even can't follow that approach with the same type of >> > > messages merged since I'm getting an exception: Multiple entries with >> same >> > > key. >> > > >> > > I appreciate any advice. >> > > >> > > On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote: >> > > > Hi there! >> > > > >> > > > I need to merge 2 streams: the first stream contains metric updates >> and >> > > > second contains customer subscriptions to metrics, so that customer >> > > > receives only those metrics she subscribed to. >> > > > I previously used Flink and there is a CoFlatMapFunction with >> flatMap1 >> > > and >> > > > flatMap2 methods that process each stream correspondingly. >> > > > In Beam I've learned that I can merge collection values into a >> > > CoGbkResult >> > > > collection. And then use TupleTags to select messages of a >> particular >> > > > stream, e.g.: >> > > > >> > > > ``` >> > > > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>(); >> > > > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new >> > > TupleTag<>(); >> > > > .. >> > > > KeyedPCollectionTuple.of(metricObjectTag, metricStream) >> > > > .and(subscriptionObjectTag, subscriptionStream) >> > > > .apply(CoGroupByKey.create()) >> > > > // logic to join streams >> > > > .apply(ParDo.of(new JoinDoFn())) >> > > > // ... etc. >> > > > ``` >> > > > >> > > > and in JoinDoFn: >> > > > >> > > > ``` >> > > > Iterable<MetricObjectType> metricStreamMessages = >> > > > context.element().getValue().getAll(metricObjectTag); >> > > > Iterable<SubscriptionObjectType> subscriptionStreamMessages = >> > > > context.element().getValue().getAll(subscriptionObjectTag); >> > > > >> > > > for ( MetricObjectType message : metricStreamMessages ) { >> > > > processMetricStreamMessages(message); // like flatMap1 in >> Flink >> > > > } >> > > > >> > > > for ( SubscriptionObjectType message : subscriptionStreamMessages ) >> { >> > > > processSubscriptionStreamMessages(message); // like >> flatMap2 in >> > > > Flink >> > > > } >> > > > ``` >> > > > >> > > > However, in my case, I have a wrapper around MetricObjectType and >> > > > SubscriptionObjectType that I use to store additional information >> about >> > > the >> > > > event. So it looks like this: >> > > > >> > > > MyEvent: { >> > > > header: { >> > > > timestamp: 123456789, >> > > > orgId: 234 >> > > > }, >> > > > body: | can be either MetricObjectType or >> SubscriptionObjectType >> > > | >> > > > } >> > > > >> > > > Since I no longer have a way to select messages of one or another >> stream >> > > by >> > > > TupleTags, I'm joining them with the same TupleTag: >> > > > >> > > > ``` >> > > > TupleTag<MyEvent> myEventTag = new TupleTag<>(); >> > > > >> > > > KeyedPCollectionTuple.of(myEventTag, metricStream) >> > > > .and(myEventTag, subscriptionStream) >> > > > .apply(CoGroupByKey.create()) >> > > > // logic to join streams >> > > > .apply(ParDo.of(new JoinDoFn())) >> > > > // ... etc. >> > > > ``` >> > > > >> > > > Then I'm separating streams by checking the instanceof body of the >> > > message: >> > > > >> > > > ``` >> > > > // get all messages from both streams >> > > > Iterable<MyEvent> streamMessages = >> > > > context.element().getValue().getAll(myEventTag); >> > > > >> > > > for ( MyEvent message : streamMessages ) { >> > > > if ( message.getBody() instanceof MetricObjectType ) { >> > > > processMetricStreamMessages(message); >> > > > } else if ( message.getBody() instanceof >> SubscriptionObjectType >> > > ) { >> > > > processSubscriptionStreamMessages(message); >> > > > } else { >> > > > // unknown type of message received >> > > > } >> > > > } >> > > > ``` >> > > > >> > > > Question: is there a better way of joining the streams that have >> the same >> > > > type of messages? >> > > > >> > > > Thanks, >> > > > Dmitry >> > > > >> > > > -- >> > > > >> > > > -- >> > > > Dmitry >> > > > >> > > >> > >> > -- -- Dmitry
