Hello everybody

I think this will do the job:
p = beam.Pipeline(options=pipeline_options)

data = (
p
| 'Get data' >> beam.io.ReadFromText(user_options.input_file)
)

output1 = (
data
| 'Transform 1' >> beam.ParDo(trasnf1())
)

output2 = (
data
| 'Transform 2' >> beam.ParDo(trasnf2())
| 'Another one' >> beam.FlatMap(something, user_options.parameter)
)

aggregate = (
(output1, output2)
| 'Aggregate both pipes into one' >> beam.Flatten()
| 'Write to file' >> beam.io.WriteToText(out3)


p.run()

Let me know if it worked.
Best

On Sat, May 2, 2020 at 7:04 PM Marco Mistroni <[email protected]> wrote:

> HI
>  thanks and in the example above, what if i want to combine output1 and
> output2 in a unique dta structure that i can then write to teh same file in
> a bucket?
> is there some sort of aggregator in Beam?>
> thanks
>
> On Wed, Apr 29, 2020 at 5:56 PM André Rocha Silva <
> [email protected]> wrote:
>
>> Hey
>>
>> You simply use the output PCollection from one to many pipes as you want.
>> E.g.:
>> p = beam.Pipeline(options=pipeline_options)
>>
>> data = (
>> p
>> | 'Get data' >> beam.io.ReadFromText(user_options.input_file)
>> )
>>
>> output1 = (
>> data
>> | 'Transform 1' >> beam.ParDo(trasnf1())
>> | 'Write transform 1 results' >> beam.io.WriteToText(out1)
>> )
>>
>> output2 = (
>> data
>> | 'Transform 2' >> beam.ParDo(trasnf2())
>> | 'Another one' >> beam.FlatMap(something, user_options.parameter)
>> | 'Write transform 2 results' >> beam.io.WriteToText(out2)
>> )
>>
>> p.run()
>>
>>
>
>> On Wed, Apr 29, 2020 at 1:19 PM Marco Mistroni <[email protected]>
>> wrote:
>>
>>> Hi all
>>>  Is it possible in beam to create a pipeline where two tasks can run in
>>> parallel as opposed to sequential,?
>>> Simple usecase would be step 3 will generate some data out of which I
>>> generate eg 3 completely different outcomes. ( Eg 3 different files stored
>>> in a bucket)
>>> Thanks
>>>  Marco
>>>
>>
>>
>> --
>>
>>    *ANDRÉ ROCHA SILVA*
>>   * DATA ENGINEER*
>>   (48) 3181-0611
>>
>>   <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
>> <http://portaltelemedicina.com.br/>
>> <https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
>> <https://pt-br.facebook.com/PortalTelemedicina/>
>> <https://www.linkedin.com/company/9426084/>
>>
>>

-- 

   *ANDRÉ ROCHA SILVA*
  * DATA ENGINEER*
  (48) 3181-0611

  <https://www.linkedin.com/in/andre-rocha-silva/> /andre-rocha-silva/
<http://portaltelemedicina.com.br/>
<https://www.youtube.com/channel/UC0KH36-OXHFIKjlRY2GyAtQ>
<https://pt-br.facebook.com/PortalTelemedicina/>
<https://www.linkedin.com/company/9426084/>

Reply via email to