I realized I even can't follow that approach with the same type of messages merged since I'm getting an exception: Multiple entries with same key.
I appreciate any advice. On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote: > Hi there! > > I need to merge 2 streams: the first stream contains metric updates and > second contains customer subscriptions to metrics, so that customer > receives only those metrics she subscribed to. > I previously used Flink and there is a CoFlatMapFunction with flatMap1 and > flatMap2 methods that process each stream correspondingly. > In Beam I've learned that I can merge collection values into a CoGbkResult > collection. And then use TupleTags to select messages of a particular > stream, e.g.: > > ``` > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>(); > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new TupleTag<>(); > .. > KeyedPCollectionTuple.of(metricObjectTag, metricStream) > .and(subscriptionObjectTag, subscriptionStream) > .apply(CoGroupByKey.create()) > // logic to join streams > .apply(ParDo.of(new JoinDoFn())) > // ... etc. > ``` > > and in JoinDoFn: > > ``` > Iterable<MetricObjectType> metricStreamMessages = > context.element().getValue().getAll(metricObjectTag); > Iterable<SubscriptionObjectType> subscriptionStreamMessages = > context.element().getValue().getAll(subscriptionObjectTag); > > for ( MetricObjectType message : metricStreamMessages ) { > processMetricStreamMessages(message); // like flatMap1 in Flink > } > > for ( SubscriptionObjectType message : subscriptionStreamMessages ) { > processSubscriptionStreamMessages(message); // like flatMap2 in > Flink > } > ``` > > However, in my case, I have a wrapper around MetricObjectType and > SubscriptionObjectType that I use to store additional information about the > event. So it looks like this: > > MyEvent: { > header: { > timestamp: 123456789, > orgId: 234 > }, > body: | can be either MetricObjectType or SubscriptionObjectType | > } > > Since I no longer have a way to select messages of one or another stream by > TupleTags, I'm joining them with the same TupleTag: > > ``` > TupleTag<MyEvent> myEventTag = new TupleTag<>(); > > KeyedPCollectionTuple.of(myEventTag, metricStream) > .and(myEventTag, subscriptionStream) > .apply(CoGroupByKey.create()) > // logic to join streams > .apply(ParDo.of(new JoinDoFn())) > // ... etc. > ``` > > Then I'm separating streams by checking the instanceof body of the message: > > ``` > // get all messages from both streams > Iterable<MyEvent> streamMessages = > context.element().getValue().getAll(myEventTag); > > for ( MyEvent message : streamMessages ) { > if ( message.getBody() instanceof MetricObjectType ) { > processMetricStreamMessages(message); > } else if ( message.getBody() instanceof SubscriptionObjectType ) { > processSubscriptionStreamMessages(message); > } else { > // unknown type of message received > } > } > ``` > > Question: is there a better way of joining the streams that have the same > type of messages? > > Thanks, > Dmitry > > -- > > -- > Dmitry >
