Also as for expense -- it shouldn't be expensive, as on almost every runner (flink included) such a ParDo will usually be run on the same machine without any serialization
On Wed, Mar 22, 2017 at 12:25 PM, Aljoscha Krettek <[email protected]> wrote: > You can use WithKeys for that: https://beam.apache.org/ > documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/ > transforms/WithKeys.html > > Best, > Aljoscha > > > On 22 Mar 2017, at 19:06, Newport, Billy <[email protected]> wrote: > > > > If I’m reading a parquet or avro file though, I don’t have a > KV<K,Data1>, I have a Data. Do I need to run a pardo just to extract the > keys for this to work? > > > > PCollection<GenericRecord> data; > > PCollection<KV<String,GenericRecord>> keyedData = “data par do’ed to > create KV for each GenericRecord, extracting possibly multiple field PKs > encoded as a string” > > > > Then do the stuff below. This seems pretty expensive (serialization > wise) compared with the flink Keyextractor for example or is it similar in > practice? > > > > Thanks Thomas. > > > > From: Thomas Groh [mailto:[email protected]] > > Sent: Wednesday, March 22, 2017 1:53 PM > > To: [email protected] > > Subject: Re: Apache Beam cogroup help > > > > This would be implemented via a CoGroupByKey (https://beam.apache.org/ > documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/ > transforms/join/CoGroupByKey.html) > > > > Your transform logic will be mostly the same; after applying the > extraction (the right side of k1 and k2 in your example), you should have > two PCollections of KVs - > > > > PCollection<KV<K, Data1>> k1; > > PCollection<KV<K, Data2>> k2; > > > > You can construct a KeyedPCollectionTuple containing the two > PCollections: > > > > final TupleTag<Data1> data1Tag = new TupleTag<>(); > > final TupleTag<Data2> data2Tag = new TupleTag<>(); > > KeyedPCollectionTuple<K> coGroupTuple = KeyedPCollectionTuple.of(data1Tag, > k1).and(data2Tag, k2); > > > > Then apply the CoGroupByKey: > > > > PColection<KV<K, CoGroupByKeyResult>> coGrouped = coGroupTuple.apply( > CoGroupByKey.<K>create()); > > > > Then you can run an arbitrary ParDo to combine the elements as > appropriate. You'll need to reuse the TupleTags created above to extract > out the per-PCollection outputs. As a simple example where the elements > have a shared supertype CombinedData, and you'd like to add them to a > single output list: > > > > PCollection<KV<K, List<CombinedData>> combined = > coGrouped.apply(ParDo.of(new DoFn<KV<K, CoGroupByKeyResult>, KV<K, > List<CombinedData>>>() { > > @ProcessElement > > public void process(ProcessContext context) { > > List<CombinedData> all = new ArrayList<>(); > > for (Data1 d1 : context.element().value().getAll(data1Tag)) { > > all.add(d1); > > } > > for (Data2 d2 : context.element().value().getAll(data2Tag)) { > > all.add(d2); > > } > > context.output(all); > > } > > })); > > > > On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy <[email protected]> > wrote: > > Trying to port flink code to Apache Beam but I’m having trouble decoding > the documentation. > > > > I have flink code which looks like: > > > > DataSet<GenericRecord> d1 = Read parquet > > DataSet<GenericRecord> d2 = Read Avro > > KeyExtractor<GenericRecord> k1 = … (extracts an object containing the > key fields from d1 records) > > KeyExtractor<GenericRecord> k2 = … (extracts an object containing the > key fields from d2 records) > > > > CoGroup<GenericRecord,GenericRecord,GenericRecord> grouper = (combines > values for equal keys in to a combined list for that key) > > > > DataSet<GenericRecord> combined = d1.coGroup(d2).where(k1). > equalTo(k2).with(grouper) > > > > Whats the beam equivalent? > > > > Thanks > >
