Hi there!

I need to merge 2 streams: the first stream contains metric updates and
second contains customer subscriptions to metrics, so that customer
receives only those metrics she subscribed to.
I previously used Flink and there is a CoFlatMapFunction with flatMap1 and
flatMap2 methods that process each stream correspondingly.
In Beam I've learned that I can merge collection values into a CoGbkResult
collection. And then use TupleTags to select messages of a particular
stream, e.g.:

```
TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
TupleTag<SubscriptionObjectType> subscriptionObjectTag = new TupleTag<>();
..
KeyedPCollectionTuple.of(metricObjectTag, metricStream)
                .and(subscriptionObjectTag, subscriptionStream)
                .apply(CoGroupByKey.create())
                // logic to join streams
                .apply(ParDo.of(new JoinDoFn()))
                // ... etc.
```

and in JoinDoFn:

```
Iterable<MetricObjectType> metricStreamMessages =
context.element().getValue().getAll(metricObjectTag);
Iterable<SubscriptionObjectType> subscriptionStreamMessages =
context.element().getValue().getAll(subscriptionObjectTag);

for ( MetricObjectType message : metricStreamMessages ) {
        processMetricStreamMessages(message); // like flatMap1 in Flink
}

for ( SubscriptionObjectType message : subscriptionStreamMessages ) {
        processSubscriptionStreamMessages(message); // like flatMap2 in
Flink
}
```

However, in my case, I have a wrapper around MetricObjectType and
SubscriptionObjectType that I use to store additional information about the
event. So it looks like this:

MyEvent: {
        header: {
                timestamp: 123456789,
                orgId: 234
        },
        body: | can be either MetricObjectType or SubscriptionObjectType |
}

Since I no longer have a way to select messages of one or another stream by
TupleTags, I'm joining them with the same TupleTag:

```
TupleTag<MyEvent> myEventTag = new TupleTag<>();

KeyedPCollectionTuple.of(myEventTag, metricStream)
                .and(myEventTag, subscriptionStream)
                .apply(CoGroupByKey.create())
                // logic to join streams
                .apply(ParDo.of(new JoinDoFn()))
                // ... etc.
```

Then I'm separating streams by checking the instanceof body of the message:

```
// get all messages from both streams
Iterable<MyEvent> streamMessages =
context.element().getValue().getAll(myEventTag);

for ( MyEvent message : streamMessages ) {
        if ( message.getBody() instanceof MetricObjectType ) {
                processMetricStreamMessages(message);
        } else if ( message.getBody() instanceof SubscriptionObjectType ) {
                processSubscriptionStreamMessages(message);
        } else {
                // unknown type of message received
        }
}
```

Question: is there a better way of joining the streams that have the same
type of messages?

Thanks,
Dmitry

-- 

--
Dmitry

Reply via email to