Hi,

help me to improve the left Joiner. Is this the right way to join the
Pcollection in the beam ?


pcoll1 = ..........
pcoll2 = ..........

left_joined = (
    {'left': pcoll1, 'right': pcoll2}
    | 'LeftJoiner: Combine' >> beam.CoGroupByKey()
    | 'LeftJoiner: ExtractValues' >> beam.Values()
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)

class LeftJoinerFn(beam.DoFn):

    def __init__(self):
        super(LeftJoinerFn, self).__init__()

    def process(self, row, **kwargs):

        left = row['left']
        right = row['right']

        if left and right:
            for each in left:
                yield each + right[0]

        elif left:
            for each in left:
                yield each

Reply via email to