This would be implemented via a CoGroupByKey (
https://beam.apache.org/documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
)
Your transform logic will be mostly the same; after applying the extraction
(the right side of k1 and k2 in your example), you should have two
PCollections of KVs -
PCollection<KV<K, Data1>> k1;
PCollection<KV<K, Data2>> k2;
You can construct a KeyedPCollectionTuple containing the two PCollections:
final TupleTag<Data1> data1Tag = new TupleTag<>();
final TupleTag<Data2> data2Tag = new TupleTag<>();
KeyedPCollectionTuple<K> coGroupTuple = KeyedPCollectionTuple.of(data1Tag,
k1).and(data2Tag, k2);
Then apply the CoGroupByKey:
PColection<KV<K, CoGroupByKeyResult>> coGrouped =
coGroupTuple.apply(CoGroupByKey.<K>create());
Then you can run an arbitrary ParDo to combine the elements as appropriate.
You'll need to reuse the TupleTags created above to extract out the
per-PCollection outputs. As a simple example where the elements have a
shared supertype CombinedData, and you'd like to add them to a single
output list:
PCollection<KV<K, List<CombinedData>> combined =
coGrouped.apply(ParDo.of(new DoFn<KV<K, CoGroupByKeyResult>, KV<K,
List<CombinedData>>>() {
@ProcessElement
public void process(ProcessContext context) {
List<CombinedData> all = new ArrayList<>();
for (Data1 d1 : context.element().value().getAll(data1Tag)) {
all.add(d1);
}
for (Data2 d2 : context.element().value().getAll(data2Tag)) {
all.add(d2);
}
context.output(all);
}
}));
On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy <[email protected]>
wrote:
> Trying to port flink code to Apache Beam but I’m having trouble decoding
> the documentation.
>
>
>
> I have flink code which looks like:
>
>
>
> DataSet<GenericRecord> d1 = Read parquet
>
> DataSet<GenericRecord> d2 = Read Avro
>
> KeyExtractor<GenericRecord> k1 = … (extracts an object containing the key
> fields from d1 records)
>
> KeyExtractor<GenericRecord> k2 = … (extracts an object containing the key
> fields from d2 records)
>
>
>
> CoGroup<GenericRecord,GenericRecord,GenericRecord> grouper = (combines
> values for equal keys in to a combined list for that key)
>
>
>
> DataSet<GenericRecord> combined = d1.coGroup(d2).where(k1).
> equalTo(k2).with(grouper)
>
>
>
> Whats the beam equivalent?
>
>
>
> Thanks
>
>
>
>
>
>
>