Hi Prabeesh,

You may checkout
https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
for
reference.

Thanks,
Manu

On Mon, Jul 31, 2017 at 3:40 PM Prabeesh K. <[email protected]> wrote:

> pcoll1 = [('key1', [values]), ('key2', [values])]
>
> pcoll2 = [('key1', value), ('key3', value)]
>
>
> On 31 July 2017 at 11:21, Prabeesh K. <[email protected]> wrote:
>
>> Hi,
>>
>> help me to improve the left Joiner. Is this the right way to join the
>> Pcollection in the beam ?
>>
>>
>> pcoll1 = ..........
>> pcoll2 = ..........
>>
>> left_joined = (
>>     {'left': pcoll1, 'right': pcoll2}
>>     | 'LeftJoiner: Combine' >> beam.CoGroupByKey()
>>     | 'LeftJoiner: ExtractValues' >> beam.Values()
>>     | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
>> )
>>
>> class LeftJoinerFn(beam.DoFn):
>>
>>     def __init__(self):
>>         super(LeftJoinerFn, self).__init__()
>>
>>     def process(self, row, **kwargs):
>>
>>         left = row['left']
>>         right = row['right']
>>
>>         if left and right:
>>             for each in left:
>>                 yield each + right[0]
>>
>>         elif left:
>>             for each in left:
>>                 yield each
>>
>>
>

Reply via email to