Hi there!
I need to merge 2 streams: the first stream contains metric updates and
second contains customer subscriptions to metrics, so that customer
receives only those metrics she subscribed to.
I previously used Flink and there is a CoFlatMapFunction with flatMap1 and
flatMap2 methods that process each stream correspondingly.
In Beam I've learned that I can merge collection values into a CoGbkResult
collection. And then use TupleTags to select messages of a particular
stream, e.g.:
```
TupleTag<MetricObjectType> metricObjectTag = new TupleTag<>();
TupleTag<SubscriptionObjectType> subscriptionObjectTag = new TupleTag<>();
..
KeyedPCollectionTuple.of(metricObjectTag, metricStream)
.and(subscriptionObjectTag, subscriptionStream)
.apply(CoGroupByKey.create())
// logic to join streams
.apply(ParDo.of(new JoinDoFn()))
// ... etc.
```
and in JoinDoFn:
```
Iterable<MetricObjectType> metricStreamMessages =
context.element().getValue().getAll(metricObjectTag);
Iterable<SubscriptionObjectType> subscriptionStreamMessages =
context.element().getValue().getAll(subscriptionObjectTag);
for ( MetricObjectType message : metricStreamMessages ) {
processMetricStreamMessages(message); // like flatMap1 in Flink
}
for ( SubscriptionObjectType message : subscriptionStreamMessages ) {
processSubscriptionStreamMessages(message); // like flatMap2 in
Flink
}
```
However, in my case, I have a wrapper around MetricObjectType and
SubscriptionObjectType that I use to store additional information about the
event. So it looks like this:
MyEvent: {
header: {
timestamp: 123456789,
orgId: 234
},
body: | can be either MetricObjectType or SubscriptionObjectType |
}
Since I no longer have a way to select messages of one or another stream by
TupleTags, I'm joining them with the same TupleTag:
```
TupleTag<MyEvent> myEventTag = new TupleTag<>();
KeyedPCollectionTuple.of(myEventTag, metricStream)
.and(myEventTag, subscriptionStream)
.apply(CoGroupByKey.create())
// logic to join streams
.apply(ParDo.of(new JoinDoFn()))
// ... etc.
```
Then I'm separating streams by checking the instanceof body of the message:
```
// get all messages from both streams
Iterable<MyEvent> streamMessages =
context.element().getValue().getAll(myEventTag);
for ( MyEvent message : streamMessages ) {
if ( message.getBody() instanceof MetricObjectType ) {
processMetricStreamMessages(message);
} else if ( message.getBody() instanceof SubscriptionObjectType ) {
processSubscriptionStreamMessages(message);
} else {
// unknown type of message received
}
}
```
Question: is there a better way of joining the streams that have the same
type of messages?
Thanks,
Dmitry
--
--
Dmitry