Use Flatten to "merge" two PCollections and then GBK to group all the records by key.
On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]> wrote: > I realized I even can't follow that approach with the same type of > messages merged since I'm getting an exception: Multiple entries with same > key. > > I appreciate any advice. > > On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote: > > Hi there! > > > > I need to merge 2 streams: the first stream contains metric updates and > > second contains customer subscriptions to metrics, so that customer > > receives only those metrics she subscribed to. > > I previously used Flink and there is a CoFlatMapFunction with flatMap1 > and > > flatMap2 methods that process each stream correspondingly. > > In Beam I've learned that I can merge collection values into a > CoGbkResult > > collection. And then use TupleTags to select messages of a particular > > stream, e.g.: > > > > ``` > > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>(); > > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new > TupleTag<>(); > > .. > > KeyedPCollectionTuple.of(metricObjectTag, metricStream) > > .and(subscriptionObjectTag, subscriptionStream) > > .apply(CoGroupByKey.create()) > > // logic to join streams > > .apply(ParDo.of(new JoinDoFn())) > > // ... etc. > > ``` > > > > and in JoinDoFn: > > > > ``` > > Iterable<MetricObjectType> metricStreamMessages = > > context.element().getValue().getAll(metricObjectTag); > > Iterable<SubscriptionObjectType> subscriptionStreamMessages = > > context.element().getValue().getAll(subscriptionObjectTag); > > > > for ( MetricObjectType message : metricStreamMessages ) { > > processMetricStreamMessages(message); // like flatMap1 in Flink > > } > > > > for ( SubscriptionObjectType message : subscriptionStreamMessages ) { > > processSubscriptionStreamMessages(message); // like flatMap2 in > > Flink > > } > > ``` > > > > However, in my case, I have a wrapper around MetricObjectType and > > SubscriptionObjectType that I use to store additional information about > the > > event. So it looks like this: > > > > MyEvent: { > > header: { > > timestamp: 123456789, > > orgId: 234 > > }, > > body: | can be either MetricObjectType or SubscriptionObjectType > | > > } > > > > Since I no longer have a way to select messages of one or another stream > by > > TupleTags, I'm joining them with the same TupleTag: > > > > ``` > > TupleTag<MyEvent> myEventTag = new TupleTag<>(); > > > > KeyedPCollectionTuple.of(myEventTag, metricStream) > > .and(myEventTag, subscriptionStream) > > .apply(CoGroupByKey.create()) > > // logic to join streams > > .apply(ParDo.of(new JoinDoFn())) > > // ... etc. > > ``` > > > > Then I'm separating streams by checking the instanceof body of the > message: > > > > ``` > > // get all messages from both streams > > Iterable<MyEvent> streamMessages = > > context.element().getValue().getAll(myEventTag); > > > > for ( MyEvent message : streamMessages ) { > > if ( message.getBody() instanceof MetricObjectType ) { > > processMetricStreamMessages(message); > > } else if ( message.getBody() instanceof SubscriptionObjectType > ) { > > processSubscriptionStreamMessages(message); > > } else { > > // unknown type of message received > > } > > } > > ``` > > > > Question: is there a better way of joining the streams that have the same > > type of messages? > > > > Thanks, > > Dmitry > > > > -- > > > > -- > > Dmitry > > >
