Yes On Tue, Oct 16, 2018 at 4:34 PM [email protected] <[email protected]> wrote:
> Ok, thanks, but still I have to manually specify how to extract messages > of a specific stream (previously flattened), e.g. via "instanceof". Right? > > On 2018/10/16 22:41:36, Lukasz Cwik <[email protected]> wrote: > > Use Flatten to "merge" two PCollections and then GBK to group all the > > records by key. > > > > On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]> > wrote: > > > > > I realized I even can't follow that approach with the same type of > > > messages merged since I'm getting an exception: Multiple entries with > same > > > key. > > > > > > I appreciate any advice. > > > > > > On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote: > > > > Hi there! > > > > > > > > I need to merge 2 streams: the first stream contains metric updates > and > > > > second contains customer subscriptions to metrics, so that customer > > > > receives only those metrics she subscribed to. > > > > I previously used Flink and there is a CoFlatMapFunction with > flatMap1 > > > and > > > > flatMap2 methods that process each stream correspondingly. > > > > In Beam I've learned that I can merge collection values into a > > > CoGbkResult > > > > collection. And then use TupleTags to select messages of a particular > > > > stream, e.g.: > > > > > > > > ``` > > > > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>(); > > > > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new > > > TupleTag<>(); > > > > .. > > > > KeyedPCollectionTuple.of(metricObjectTag, metricStream) > > > > .and(subscriptionObjectTag, subscriptionStream) > > > > .apply(CoGroupByKey.create()) > > > > // logic to join streams > > > > .apply(ParDo.of(new JoinDoFn())) > > > > // ... etc. > > > > ``` > > > > > > > > and in JoinDoFn: > > > > > > > > ``` > > > > Iterable<MetricObjectType> metricStreamMessages = > > > > context.element().getValue().getAll(metricObjectTag); > > > > Iterable<SubscriptionObjectType> subscriptionStreamMessages = > > > > context.element().getValue().getAll(subscriptionObjectTag); > > > > > > > > for ( MetricObjectType message : metricStreamMessages ) { > > > > processMetricStreamMessages(message); // like flatMap1 in > Flink > > > > } > > > > > > > > for ( SubscriptionObjectType message : subscriptionStreamMessages ) { > > > > processSubscriptionStreamMessages(message); // like flatMap2 > in > > > > Flink > > > > } > > > > ``` > > > > > > > > However, in my case, I have a wrapper around MetricObjectType and > > > > SubscriptionObjectType that I use to store additional information > about > > > the > > > > event. So it looks like this: > > > > > > > > MyEvent: { > > > > header: { > > > > timestamp: 123456789, > > > > orgId: 234 > > > > }, > > > > body: | can be either MetricObjectType or > SubscriptionObjectType > > > | > > > > } > > > > > > > > Since I no longer have a way to select messages of one or another > stream > > > by > > > > TupleTags, I'm joining them with the same TupleTag: > > > > > > > > ``` > > > > TupleTag<MyEvent> myEventTag = new TupleTag<>(); > > > > > > > > KeyedPCollectionTuple.of(myEventTag, metricStream) > > > > .and(myEventTag, subscriptionStream) > > > > .apply(CoGroupByKey.create()) > > > > // logic to join streams > > > > .apply(ParDo.of(new JoinDoFn())) > > > > // ... etc. > > > > ``` > > > > > > > > Then I'm separating streams by checking the instanceof body of the > > > message: > > > > > > > > ``` > > > > // get all messages from both streams > > > > Iterable<MyEvent> streamMessages = > > > > context.element().getValue().getAll(myEventTag); > > > > > > > > for ( MyEvent message : streamMessages ) { > > > > if ( message.getBody() instanceof MetricObjectType ) { > > > > processMetricStreamMessages(message); > > > > } else if ( message.getBody() instanceof > SubscriptionObjectType > > > ) { > > > > processSubscriptionStreamMessages(message); > > > > } else { > > > > // unknown type of message received > > > > } > > > > } > > > > ``` > > > > > > > > Question: is there a better way of joining the streams that have the > same > > > > type of messages? > > > > > > > > Thanks, > > > > Dmitry > > > > > > > > -- > > > > > > > > -- > > > > Dmitry > > > > > > > > > >
