Hi Prabeesh, You may checkout https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java for reference.
Thanks, Manu On Mon, Jul 31, 2017 at 3:40 PM Prabeesh K. <[email protected]> wrote: > pcoll1 = [('key1', [values]), ('key2', [values])] > > pcoll2 = [('key1', value), ('key3', value)] > > > On 31 July 2017 at 11:21, Prabeesh K. <[email protected]> wrote: > >> Hi, >> >> help me to improve the left Joiner. Is this the right way to join the >> Pcollection in the beam ? >> >> >> pcoll1 = .......... >> pcoll2 = .......... >> >> left_joined = ( >> {'left': pcoll1, 'right': pcoll2} >> | 'LeftJoiner: Combine' >> beam.CoGroupByKey() >> | 'LeftJoiner: ExtractValues' >> beam.Values() >> | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn()) >> ) >> >> class LeftJoinerFn(beam.DoFn): >> >> def __init__(self): >> super(LeftJoinerFn, self).__init__() >> >> def process(self, row, **kwargs): >> >> left = row['left'] >> right = row['right'] >> >> if left and right: >> for each in left: >> yield each + right[0] >> >> elif left: >> for each in left: >> yield each >> >> >
