Hi Manu, It is helpful.
Thanks, Prabeesh K. On 16 August 2017 at 08:25, Manu Zhang <[email protected]> wrote: > Hi Prabeesh, > > You may checkout https://github.com/apache/beam/blob/master/sdks/ > java/extensions/join-library/src/main/java/org/apache/beam/ > sdk/extensions/joinlibrary/Join.java for reference. > > Thanks, > Manu > > On Mon, Jul 31, 2017 at 3:40 PM Prabeesh K. <[email protected]> wrote: > >> pcoll1 = [('key1', [values]), ('key2', [values])] >> >> pcoll2 = [('key1', value), ('key3', value)] >> >> >> On 31 July 2017 at 11:21, Prabeesh K. <[email protected]> wrote: >> >>> Hi, >>> >>> help me to improve the left Joiner. Is this the right way to join the >>> Pcollection in the beam ? >>> >>> >>> pcoll1 = .......... >>> pcoll2 = .......... >>> >>> left_joined = ( >>> {'left': pcoll1, 'right': pcoll2} >>> | 'LeftJoiner: Combine' >> beam.CoGroupByKey() >>> | 'LeftJoiner: ExtractValues' >> beam.Values() >>> | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn()) >>> ) >>> >>> class LeftJoinerFn(beam.DoFn): >>> >>> def __init__(self): >>> super(LeftJoinerFn, self).__init__() >>> >>> def process(self, row, **kwargs): >>> >>> left = row['left'] >>> right = row['right'] >>> >>> if left and right: >>> for each in left: >>> yield each + right[0] >>> >>> elif left: >>> for each in left: >>> yield each >>> >>> >>
