Also as for expense -- it shouldn't be expensive, as on almost every runner
(flink included) such a ParDo will usually be run on the same machine
without any serialization

On Wed, Mar 22, 2017 at 12:25 PM, Aljoscha Krettek <[email protected]>
wrote:

> You can use WithKeys for that: https://beam.apache.org/
> documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/
> transforms/WithKeys.html
>
> Best,
> Aljoscha
>
> > On 22 Mar 2017, at 19:06, Newport, Billy <[email protected]> wrote:
> >
> > If I’m reading a parquet or avro file though, I don’t have a
> KV<K,Data1>, I have a Data. Do I need to run a pardo just to extract the
> keys for this to work?
> >
> > PCollection<GenericRecord> data;
> > PCollection<KV<String,GenericRecord>> keyedData = “data par do’ed to
> create KV for each GenericRecord, extracting possibly multiple field PKs
> encoded as a string”
> >
> > Then do the stuff below. This seems pretty expensive (serialization
> wise) compared with the flink Keyextractor for example or is it similar in
> practice?
> >
> > Thanks Thomas.
> >
> > From: Thomas Groh [mailto:[email protected]]
> > Sent: Wednesday, March 22, 2017 1:53 PM
> > To: [email protected]
> > Subject: Re: Apache Beam cogroup help
> >
> > This would be implemented via a CoGroupByKey (https://beam.apache.org/
> documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/
> transforms/join/CoGroupByKey.html)
> >
> > Your transform logic will be mostly the same; after applying the
> extraction (the right side of k1 and k2 in your example), you should have
> two PCollections of KVs -
> >
> > PCollection<KV<K, Data1>> k1;
> > PCollection<KV<K, Data2>> k2;
> >
> > You can construct a KeyedPCollectionTuple containing the two
> PCollections:
> >
> > final TupleTag<Data1> data1Tag = new TupleTag<>();
> > final TupleTag<Data2> data2Tag = new TupleTag<>();
> > KeyedPCollectionTuple<K> coGroupTuple = KeyedPCollectionTuple.of(data1Tag,
> k1).and(data2Tag, k2);
> >
> > Then apply the CoGroupByKey:
> >
> > PColection<KV<K, CoGroupByKeyResult>> coGrouped = coGroupTuple.apply(
> CoGroupByKey.<K>create());
> >
> > Then you can run an arbitrary ParDo to combine the elements as
> appropriate. You'll need to reuse the TupleTags created above to extract
> out the per-PCollection outputs. As a simple example where the elements
> have a shared supertype CombinedData, and you'd like to add them to a
> single output list:
> >
> > PCollection<KV<K, List<CombinedData>> combined =
> coGrouped.apply(ParDo.of(new DoFn<KV<K, CoGroupByKeyResult>, KV<K,
> List<CombinedData>>>() {
> >   @ProcessElement
> >   public void process(ProcessContext context) {
> >     List<CombinedData> all = new ArrayList<>();
> >     for (Data1 d1 : context.element().value().getAll(data1Tag)) {
> >       all.add(d1);
> >     }
> >     for (Data2 d2 : context.element().value().getAll(data2Tag)) {
> >       all.add(d2);
> >     }
> >     context.output(all);
> >   }
> > }));
> >
> > On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy <[email protected]>
> wrote:
> > Trying to port flink code to Apache Beam but I’m having trouble decoding
> the documentation.
> >
> > I have flink code which looks like:
> >
> > DataSet<GenericRecord> d1 = Read parquet
> > DataSet<GenericRecord> d2 = Read Avro
> > KeyExtractor<GenericRecord> k1 = … (extracts an object containing the
> key fields from d1 records)
> > KeyExtractor<GenericRecord> k2 = … (extracts an object containing the
> key fields from d2 records)
> >
> > CoGroup<GenericRecord,GenericRecord,GenericRecord> grouper = (combines
> values for equal keys in to a combined list for that key)
> >
> > DataSet<GenericRecord> combined = d1.coGroup(d2).where(k1).
> equalTo(k2).with(grouper)
> >
> > Whats the beam equivalent?
> >
> > Thanks
>
>

Reply via email to