You can use WithKeys for that: 
https://beam.apache.org/documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/transforms/WithKeys.html

Best,
Aljoscha

> On 22 Mar 2017, at 19:06, Newport, Billy <[email protected]> wrote:
> 
> If I’m reading a parquet or avro file though, I don’t have a KV<K,Data1>, I 
> have a Data. Do I need to run a pardo just to extract the keys for this to 
> work?
>  
> PCollection<GenericRecord> data;
> PCollection<KV<String,GenericRecord>> keyedData = “data par do’ed to create 
> KV for each GenericRecord, extracting possibly multiple field PKs encoded as 
> a string”
>  
> Then do the stuff below. This seems pretty expensive (serialization wise) 
> compared with the flink Keyextractor for example or is it similar in practice?
>  
> Thanks Thomas.
>  
> From: Thomas Groh [mailto:[email protected]] 
> Sent: Wednesday, March 22, 2017 1:53 PM
> To: [email protected]
> Subject: Re: Apache Beam cogroup help
>  
> This would be implemented via a CoGroupByKey 
> (https://beam.apache.org/documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html)
>  
> Your transform logic will be mostly the same; after applying the extraction 
> (the right side of k1 and k2 in your example), you should have two 
> PCollections of KVs - 
>  
> PCollection<KV<K, Data1>> k1;
> PCollection<KV<K, Data2>> k2;
>  
> You can construct a KeyedPCollectionTuple containing the two PCollections:
>  
> final TupleTag<Data1> data1Tag = new TupleTag<>();
> final TupleTag<Data2> data2Tag = new TupleTag<>();
> KeyedPCollectionTuple<K> coGroupTuple = KeyedPCollectionTuple.of(data1Tag, 
> k1).and(data2Tag, k2);
>  
> Then apply the CoGroupByKey:
>  
> PColection<KV<K, CoGroupByKeyResult>> coGrouped = 
> coGroupTuple.apply(CoGroupByKey.<K>create());
>  
> Then you can run an arbitrary ParDo to combine the elements as appropriate. 
> You'll need to reuse the TupleTags created above to extract out the 
> per-PCollection outputs. As a simple example where the elements have a shared 
> supertype CombinedData, and you'd like to add them to a single output list:
>  
> PCollection<KV<K, List<CombinedData>> combined = coGrouped.apply(ParDo.of(new 
> DoFn<KV<K, CoGroupByKeyResult>, KV<K, List<CombinedData>>>() {
>   @ProcessElement
>   public void process(ProcessContext context) {
>     List<CombinedData> all = new ArrayList<>();
>     for (Data1 d1 : context.element().value().getAll(data1Tag)) {
>       all.add(d1);
>     }
>     for (Data2 d2 : context.element().value().getAll(data2Tag)) {
>       all.add(d2);
>     }
>     context.output(all);
>   }
> }));
>  
> On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy <[email protected]> wrote:
> Trying to port flink code to Apache Beam but I’m having trouble decoding the 
> documentation.
>  
> I have flink code which looks like:
>  
> DataSet<GenericRecord> d1 = Read parquet
> DataSet<GenericRecord> d2 = Read Avro
> KeyExtractor<GenericRecord> k1 = … (extracts an object containing the key 
> fields from d1 records)
> KeyExtractor<GenericRecord> k2 = … (extracts an object containing the key 
> fields from d2 records)
>  
> CoGroup<GenericRecord,GenericRecord,GenericRecord> grouper = (combines values 
> for equal keys in to a combined list for that key)
>  
> DataSet<GenericRecord> combined = 
> d1.coGroup(d2).where(k1).equalTo(k2).with(grouper)
>  
> Whats the beam equivalent?
>  
> Thanks

Reply via email to