Use Flatten to "merge" two PCollections and then GBK to group all the
records by key.

On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]> wrote:

> I realized I even can't follow that approach with the same type of
> messages merged since I'm getting an exception: Multiple entries with same
> key.
>
> I appreciate any advice.
>
> On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote:
> > Hi there!
> >
> > I need to merge 2 streams: the first stream contains metric updates and
> > second contains customer subscriptions to metrics, so that customer
> > receives only those metrics she subscribed to.
> > I previously used Flink and there is a CoFlatMapFunction with flatMap1
> and
> > flatMap2 methods that process each stream correspondingly.
> > In Beam I've learned that I can merge collection values into a
> CoGbkResult
> > collection. And then use TupleTags to select messages of a particular
> > stream, e.g.:
> >
> > ```
> > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
> > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new
> TupleTag<>();
> > ..
> > KeyedPCollectionTuple.of(metricObjectTag, metricStream)
> >                 .and(subscriptionObjectTag, subscriptionStream)
> >                 .apply(CoGroupByKey.create())
> >                 // logic to join streams
> >                 .apply(ParDo.of(new JoinDoFn()))
> >                 // ... etc.
> > ```
> >
> > and in JoinDoFn:
> >
> > ```
> > Iterable<MetricObjectType> metricStreamMessages =
> > context.element().getValue().getAll(metricObjectTag);
> > Iterable<SubscriptionObjectType> subscriptionStreamMessages =
> > context.element().getValue().getAll(subscriptionObjectTag);
> >
> > for ( MetricObjectType message : metricStreamMessages ) {
> >         processMetricStreamMessages(message); // like flatMap1 in Flink
> > }
> >
> > for ( SubscriptionObjectType message : subscriptionStreamMessages ) {
> >         processSubscriptionStreamMessages(message); // like flatMap2 in
> > Flink
> > }
> > ```
> >
> > However, in my case, I have a wrapper around MetricObjectType and
> > SubscriptionObjectType that I use to store additional information about
> the
> > event. So it looks like this:
> >
> > MyEvent: {
> >         header: {
> >                 timestamp: 123456789,
> >                 orgId: 234
> >         },
> >         body: | can be either MetricObjectType or SubscriptionObjectType
> |
> > }
> >
> > Since I no longer have a way to select messages of one or another stream
> by
> > TupleTags, I'm joining them with the same TupleTag:
> >
> > ```
> > TupleTag<MyEvent> myEventTag = new TupleTag<>();
> >
> > KeyedPCollectionTuple.of(myEventTag, metricStream)
> >                 .and(myEventTag, subscriptionStream)
> >                 .apply(CoGroupByKey.create())
> >                 // logic to join streams
> >                 .apply(ParDo.of(new JoinDoFn()))
> >                 // ... etc.
> > ```
> >
> > Then I'm separating streams by checking the instanceof body of the
> message:
> >
> > ```
> > // get all messages from both streams
> > Iterable<MyEvent> streamMessages =
> > context.element().getValue().getAll(myEventTag);
> >
> > for ( MyEvent message : streamMessages ) {
> >         if ( message.getBody() instanceof MetricObjectType ) {
> >                 processMetricStreamMessages(message);
> >         } else if ( message.getBody() instanceof SubscriptionObjectType
> ) {
> >                 processSubscriptionStreamMessages(message);
> >         } else {
> >                 // unknown type of message received
> >         }
> > }
> > ```
> >
> > Question: is there a better way of joining the streams that have the same
> > type of messages?
> >
> > Thanks,
> > Dmitry
> >
> > --
> >
> > --
> > Dmitry
> >
>

Reply via email to