I will suggest a simple trick:
Create an empty object with the proper keys... You will need to run a step
to get you all the keys if you don't know it (if it is generated randomly
in the run time)
Then merge your data with the empty object with the proper keys...
implement the merger inside a function...

merger = (joined_data
                       | 'Object Mapper' >> beam.ParDo(mapper_function))

def mapper_function(element):
    # Create an object containing all the keys:
    temp_object = {'a': None, 'b': None, 'c': None, 'd': None, 'e': None}
    # then update it with the new object
    temp_object.update(element)
    # That will replace the values and keep the keys
    return [temp_object]

Other solutions may be using Pandas... or just simple StringIO process...

csv_like_data = StringIO(columns.strip() + '\n' + element.values())

There are many solutions actually, you can play with the data as you wish...

If there are some built-in functionality, I would like to know it!

On Wed, Feb 12, 2020 at 3:48 AM Douglas Martins <[email protected]>
wrote:

> Hello, thanks for the response.
>
> Not quite. The PCollections hold Python Dicts, so they look like this:
>
> Suppose the final BQ table must have the columns a, b, c, d, e
>
> PCollection1 {'a': 1, 'b': 2, 'c':3}
>                      {'a': 1, 'b': nan, 'c':3}
>
> PCollection2 {'b': 3, 'd': 10, 'e': nan, 'c': 6}
>                      {'b': nan, 'd': 10, 'e':4, 'c': 6}
>
> This happens because I'm simultaneously applying ParDo to a PCol generated
> by BigQuerySource, which creates these keys based on data from a table. So
> each of these ParDo transforms will create a different number of
> keys(future columns of a BQ table), and potentially the same keys like
> shown in the example above. Now, the question is: how can I create a PCol
> derived from those, which can be written to BQ? Something like:
>
> PCollection_final {'a': 1, 'b': 2, 'c':3, 'd': nan, 'e': nan}
>                             {'a': 1, 'b': nan, 'c':3, 'd': nan, 'e': nan}
>                             {'b': 3, 'd': 10, 'e': nan, 'c': 6, 'a': nan}
>                             {'b': nan, 'd': 10, 'e':4, 'c': 6, 'a': nan}
>
> Is it possible to do something like this without explicitly creating keys
> with no values on the transforms that don't have rules for the creation of
> them, for example by assigning NaN to columns 'd' and 'e' in PCollection1?
>
> Em ter., 11 de fev. de 2020 às 16:23, Heejong Lee <[email protected]>
> escreveu:
>
>> What do you mean by "PCollection of dicts, each having different key
>> values"? What's the type of the PCollections? I assume that you want to
>> merge two PCollections of KV such as
>> PCollection[("a", 1), ("b", 2), ("c", 3)] + PCollection[("a", 4), ("d",
>> 5), ("e", 6)]. Is that correct?
>>
>> On Tue, Feb 11, 2020 at 9:19 AM Douglas Martins <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I am developing a Pipeline thats reads from and writes to BigQuery. At a
>>> certain point, I have two or more PCollections of dicts, each having
>>> different key values. How can I create a single PCollection from those,
>>> that can be written to a BigQuery table? The Flatten transform doesn't work
>>> because each element of the PCol ends up having different keys. Thanks!
>>>
>>

-- 
Soliman ElSaber
Data Engineer
www.mindvalley.com

Reply via email to