Yes

On Tue, Oct 16, 2018 at 4:34 PM [email protected] <[email protected]> wrote:

> Ok, thanks, but still I have to manually specify how to extract messages
> of a specific stream (previously flattened), e.g. via "instanceof". Right?
>
> On 2018/10/16 22:41:36, Lukasz Cwik <[email protected]> wrote:
> > Use Flatten to "merge" two PCollections and then GBK to group all the
> > records by key.
> >
> > On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]>
> wrote:
> >
> > > I realized I even can't follow that approach with the same type of
> > > messages merged since I'm getting an exception: Multiple entries with
> same
> > > key.
> > >
> > > I appreciate any advice.
> > >
> > > On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote:
> > > > Hi there!
> > > >
> > > > I need to merge 2 streams: the first stream contains metric updates
> and
> > > > second contains customer subscriptions to metrics, so that customer
> > > > receives only those metrics she subscribed to.
> > > > I previously used Flink and there is a CoFlatMapFunction with
> flatMap1
> > > and
> > > > flatMap2 methods that process each stream correspondingly.
> > > > In Beam I've learned that I can merge collection values into a
> > > CoGbkResult
> > > > collection. And then use TupleTags to select messages of a particular
> > > > stream, e.g.:
> > > >
> > > > ```
> > > > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
> > > > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new
> > > TupleTag<>();
> > > > ..
> > > > KeyedPCollectionTuple.of(metricObjectTag, metricStream)
> > > >                 .and(subscriptionObjectTag, subscriptionStream)
> > > >                 .apply(CoGroupByKey.create())
> > > >                 // logic to join streams
> > > >                 .apply(ParDo.of(new JoinDoFn()))
> > > >                 // ... etc.
> > > > ```
> > > >
> > > > and in JoinDoFn:
> > > >
> > > > ```
> > > > Iterable<MetricObjectType> metricStreamMessages =
> > > > context.element().getValue().getAll(metricObjectTag);
> > > > Iterable<SubscriptionObjectType> subscriptionStreamMessages =
> > > > context.element().getValue().getAll(subscriptionObjectTag);
> > > >
> > > > for ( MetricObjectType message : metricStreamMessages ) {
> > > >         processMetricStreamMessages(message); // like flatMap1 in
> Flink
> > > > }
> > > >
> > > > for ( SubscriptionObjectType message : subscriptionStreamMessages ) {
> > > >         processSubscriptionStreamMessages(message); // like flatMap2
> in
> > > > Flink
> > > > }
> > > > ```
> > > >
> > > > However, in my case, I have a wrapper around MetricObjectType and
> > > > SubscriptionObjectType that I use to store additional information
> about
> > > the
> > > > event. So it looks like this:
> > > >
> > > > MyEvent: {
> > > >         header: {
> > > >                 timestamp: 123456789,
> > > >                 orgId: 234
> > > >         },
> > > >         body: | can be either MetricObjectType or
> SubscriptionObjectType
> > > |
> > > > }
> > > >
> > > > Since I no longer have a way to select messages of one or another
> stream
> > > by
> > > > TupleTags, I'm joining them with the same TupleTag:
> > > >
> > > > ```
> > > > TupleTag<MyEvent> myEventTag = new TupleTag<>();
> > > >
> > > > KeyedPCollectionTuple.of(myEventTag, metricStream)
> > > >                 .and(myEventTag, subscriptionStream)
> > > >                 .apply(CoGroupByKey.create())
> > > >                 // logic to join streams
> > > >                 .apply(ParDo.of(new JoinDoFn()))
> > > >                 // ... etc.
> > > > ```
> > > >
> > > > Then I'm separating streams by checking the instanceof body of the
> > > message:
> > > >
> > > > ```
> > > > // get all messages from both streams
> > > > Iterable<MyEvent> streamMessages =
> > > > context.element().getValue().getAll(myEventTag);
> > > >
> > > > for ( MyEvent message : streamMessages ) {
> > > >         if ( message.getBody() instanceof MetricObjectType ) {
> > > >                 processMetricStreamMessages(message);
> > > >         } else if ( message.getBody() instanceof
> SubscriptionObjectType
> > > ) {
> > > >                 processSubscriptionStreamMessages(message);
> > > >         } else {
> > > >                 // unknown type of message received
> > > >         }
> > > > }
> > > > ```
> > > >
> > > > Question: is there a better way of joining the streams that have the
> same
> > > > type of messages?
> > > >
> > > > Thanks,
> > > > Dmitry
> > > >
> > > > --
> > > >
> > > > --
> > > > Dmitry
> > > >
> > >
> >
>

Reply via email to