Thank you, Lukasz!

On Wed, Oct 17, 2018 at 9:22 AM Lukasz Cwik <[email protected]> wrote:

> Yes
>
> On Tue, Oct 16, 2018 at 4:34 PM [email protected] <[email protected]>
> wrote:
>
>> Ok, thanks, but still I have to manually specify how to extract messages
>> of a specific stream (previously flattened), e.g. via "instanceof". Right?
>>
>> On 2018/10/16 22:41:36, Lukasz Cwik <[email protected]> wrote:
>> > Use Flatten to "merge" two PCollections and then GBK to group all the
>> > records by key.
>> >
>> > On Tue, Oct 16, 2018 at 3:27 PM [email protected] <[email protected]>
>> wrote:
>> >
>> > > I realized I even can't follow that approach with the same type of
>> > > messages merged since I'm getting an exception: Multiple entries with
>> same
>> > > key.
>> > >
>> > > I appreciate any advice.
>> > >
>> > > On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote:
>> > > > Hi there!
>> > > >
>> > > > I need to merge 2 streams: the first stream contains metric updates
>> and
>> > > > second contains customer subscriptions to metrics, so that customer
>> > > > receives only those metrics she subscribed to.
>> > > > I previously used Flink and there is a CoFlatMapFunction with
>> flatMap1
>> > > and
>> > > > flatMap2 methods that process each stream correspondingly.
>> > > > In Beam I've learned that I can merge collection values into a
>> > > CoGbkResult
>> > > > collection. And then use TupleTags to select messages of a
>> particular
>> > > > stream, e.g.:
>> > > >
>> > > > ```
>> > > > TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
>> > > > TupleTag<SubscriptionObjectType> subscriptionObjectTag = new
>> > > TupleTag<>();
>> > > > ..
>> > > > KeyedPCollectionTuple.of(metricObjectTag, metricStream)
>> > > >                 .and(subscriptionObjectTag, subscriptionStream)
>> > > >                 .apply(CoGroupByKey.create())
>> > > >                 // logic to join streams
>> > > >                 .apply(ParDo.of(new JoinDoFn()))
>> > > >                 // ... etc.
>> > > > ```
>> > > >
>> > > > and in JoinDoFn:
>> > > >
>> > > > ```
>> > > > Iterable<MetricObjectType> metricStreamMessages =
>> > > > context.element().getValue().getAll(metricObjectTag);
>> > > > Iterable<SubscriptionObjectType> subscriptionStreamMessages =
>> > > > context.element().getValue().getAll(subscriptionObjectTag);
>> > > >
>> > > > for ( MetricObjectType message : metricStreamMessages ) {
>> > > >         processMetricStreamMessages(message); // like flatMap1 in
>> Flink
>> > > > }
>> > > >
>> > > > for ( SubscriptionObjectType message : subscriptionStreamMessages )
>> {
>> > > >         processSubscriptionStreamMessages(message); // like
>> flatMap2 in
>> > > > Flink
>> > > > }
>> > > > ```
>> > > >
>> > > > However, in my case, I have a wrapper around MetricObjectType and
>> > > > SubscriptionObjectType that I use to store additional information
>> about
>> > > the
>> > > > event. So it looks like this:
>> > > >
>> > > > MyEvent: {
>> > > >         header: {
>> > > >                 timestamp: 123456789,
>> > > >                 orgId: 234
>> > > >         },
>> > > >         body: | can be either MetricObjectType or
>> SubscriptionObjectType
>> > > |
>> > > > }
>> > > >
>> > > > Since I no longer have a way to select messages of one or another
>> stream
>> > > by
>> > > > TupleTags, I'm joining them with the same TupleTag:
>> > > >
>> > > > ```
>> > > > TupleTag<MyEvent> myEventTag = new TupleTag<>();
>> > > >
>> > > > KeyedPCollectionTuple.of(myEventTag, metricStream)
>> > > >                 .and(myEventTag, subscriptionStream)
>> > > >                 .apply(CoGroupByKey.create())
>> > > >                 // logic to join streams
>> > > >                 .apply(ParDo.of(new JoinDoFn()))
>> > > >                 // ... etc.
>> > > > ```
>> > > >
>> > > > Then I'm separating streams by checking the instanceof body of the
>> > > message:
>> > > >
>> > > > ```
>> > > > // get all messages from both streams
>> > > > Iterable<MyEvent> streamMessages =
>> > > > context.element().getValue().getAll(myEventTag);
>> > > >
>> > > > for ( MyEvent message : streamMessages ) {
>> > > >         if ( message.getBody() instanceof MetricObjectType ) {
>> > > >                 processMetricStreamMessages(message);
>> > > >         } else if ( message.getBody() instanceof
>> SubscriptionObjectType
>> > > ) {
>> > > >                 processSubscriptionStreamMessages(message);
>> > > >         } else {
>> > > >                 // unknown type of message received
>> > > >         }
>> > > > }
>> > > > ```
>> > > >
>> > > > Question: is there a better way of joining the streams that have
>> the same
>> > > > type of messages?
>> > > >
>> > > > Thanks,
>> > > > Dmitry
>> > > >
>> > > > --
>> > > >
>> > > > --
>> > > > Dmitry
>> > > >
>> > >
>> >
>>
> --

--
Dmitry

Reply via email to