I realized I even can't follow that approach with the same type of messages 
merged since I'm getting an exception: Multiple entries with same key.

I appreciate any advice.

On 2018/10/16 19:33:03, Dmitry Minaev <[email protected]> wrote: 
> Hi there!
> 
> I need to merge 2 streams: the first stream contains metric updates and
> second contains customer subscriptions to metrics, so that customer
> receives only those metrics she subscribed to.
> I previously used Flink and there is a CoFlatMapFunction with flatMap1 and
> flatMap2 methods that process each stream correspondingly.
> In Beam I've learned that I can merge collection values into a CoGbkResult
> collection. And then use TupleTags to select messages of a particular
> stream, e.g.:
> 
> ```
> TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
> TupleTag<SubscriptionObjectType> subscriptionObjectTag = new TupleTag<>();
> ..
> KeyedPCollectionTuple.of(metricObjectTag, metricStream)
>                 .and(subscriptionObjectTag, subscriptionStream)
>                 .apply(CoGroupByKey.create())
>                 // logic to join streams
>                 .apply(ParDo.of(new JoinDoFn()))
>                 // ... etc.
> ```
> 
> and in JoinDoFn:
> 
> ```
> Iterable<MetricObjectType> metricStreamMessages =
> context.element().getValue().getAll(metricObjectTag);
> Iterable<SubscriptionObjectType> subscriptionStreamMessages =
> context.element().getValue().getAll(subscriptionObjectTag);
> 
> for ( MetricObjectType message : metricStreamMessages ) {
>         processMetricStreamMessages(message); // like flatMap1 in Flink
> }
> 
> for ( SubscriptionObjectType message : subscriptionStreamMessages ) {
>         processSubscriptionStreamMessages(message); // like flatMap2 in
> Flink
> }
> ```
> 
> However, in my case, I have a wrapper around MetricObjectType and
> SubscriptionObjectType that I use to store additional information about the
> event. So it looks like this:
> 
> MyEvent: {
>         header: {
>                 timestamp: 123456789,
>                 orgId: 234
>         },
>         body: | can be either MetricObjectType or SubscriptionObjectType |
> }
> 
> Since I no longer have a way to select messages of one or another stream by
> TupleTags, I'm joining them with the same TupleTag:
> 
> ```
> TupleTag<MyEvent> myEventTag = new TupleTag<>();
> 
> KeyedPCollectionTuple.of(myEventTag, metricStream)
>                 .and(myEventTag, subscriptionStream)
>                 .apply(CoGroupByKey.create())
>                 // logic to join streams
>                 .apply(ParDo.of(new JoinDoFn()))
>                 // ... etc.
> ```
> 
> Then I'm separating streams by checking the instanceof body of the message:
> 
> ```
> // get all messages from both streams
> Iterable<MyEvent> streamMessages =
> context.element().getValue().getAll(myEventTag);
> 
> for ( MyEvent message : streamMessages ) {
>         if ( message.getBody() instanceof MetricObjectType ) {
>                 processMetricStreamMessages(message);
>         } else if ( message.getBody() instanceof SubscriptionObjectType ) {
>                 processSubscriptionStreamMessages(message);
>         } else {
>                 // unknown type of message received
>         }
> }
> ```
> 
> Question: is there a better way of joining the streams that have the same
> type of messages?
> 
> Thanks,
> Dmitry
> 
> -- 
> 
> --
> Dmitry
> 

Reply via email to