Hi Sebastien, Can you tell more about how your "step 1" works? I looked at the logs of your job and it's taking suspiciously long (~20 minutes) to produce the ~400 elements, and it's doing that sequentially. Is it possible to parallelize step 1?
On Sat, Jun 10, 2017 at 5:53 PM Lukasz Cwik <lc...@google.com> wrote: > The Dataflow implementation when executing a batch pipeline does not > parallelize dependent fused segments irrespective of the windowing function > so #1 will fully execute before #2 starts. > > On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien < > sebastien.mor...@veolia.com> wrote: > >> Hi again, >> >> So it scales. Now my pipeline is running in two parts: >> >> 1. Reading files content (~400) and then GroupByKey >> >> 2. From GroupByKey transform and write in bigquery (~50M) >> >> 2 is scaling as expected. 1 takes about 25 minutes on my files and 2 >> about 35 minutes scaling. But what if I want to Window so that the second >> part starts sooner and the process is more parallel? >> >> I tried to add a 60 seconds FixedWindow time but it's not working (Job ID >> : 2017-06-06_04_36_01-9894155361321571250) >> >> Regards, >> >> >> *Sébastien MORAND* >> Team Lead Solution Architect >> Technology & Operations / Digital Factory >> Veolia - Group Information Systems & Technology (IS&T) >> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >> <+33%201%2085%2057%2071%2008> >> Bureau 0144C (Ouest) >> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >> *www.veolia.com <http://www.veolia.com>* >> <http://www.veolia.com> >> <https://www.facebook.com/veoliaenvironment/> >> <https://www.youtube.com/user/veoliaenvironnement> >> <https://www.linkedin.com/company/veolia-environnement> >> <https://twitter.com/veolia> >> >> On 6 June 2017 at 01:24, Morand, Sebastien <sebastien.mor...@veolia.com> >> wrote: >> >>> Fine, it scales ... Thank you very much. >>> >>> *Sébastien MORAND* >>> Team Lead Solution Architect >>> Technology & Operations / Digital Factory >>> Veolia - Group Information Systems & Technology (IS&T) >>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>> <+33%201%2085%2057%2071%2008> >>> Bureau 0144C (Ouest) >>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>> *www.veolia.com <http://www.veolia.com>* >>> <http://www.veolia.com> >>> <https://www.facebook.com/veoliaenvironment/> >>> <https://www.youtube.com/user/veoliaenvironnement> >>> <https://www.linkedin.com/company/veolia-environnement> >>> <https://twitter.com/veolia> >>> >>> On 6 June 2017 at 00:31, Morand, Sebastien <sebastien.mor...@veolia.com> >>> wrote: >>> >>>> Thank you Eugene, >>>> >>>> I'm trying the Sourabh way (and yours since it looks like it's the same >>>> idea) and let you know if it's better. >>>> >>>> Regards, >>>> >>>> *Sébastien MORAND* >>>> Team Lead Solution Architect >>>> Technology & Operations / Digital Factory >>>> Veolia - Group Information Systems & Technology (IS&T) >>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>> <+33%201%2085%2057%2071%2008> >>>> Bureau 0144C (Ouest) >>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>> *www.veolia.com <http://www.veolia.com>* >>>> <http://www.veolia.com> >>>> <https://www.facebook.com/veoliaenvironment/> >>>> <https://www.youtube.com/user/veoliaenvironnement> >>>> <https://www.linkedin.com/company/veolia-environnement> >>>> <https://twitter.com/veolia> >>>> >>>> On 5 June 2017 at 23:31, Eugene Kirpichov <kirpic...@google.com> wrote: >>>> >>>>> I looked at the job ID you quoted, and yes, it suffers from excessive >>>>> fusion. I wish we had tooling to automatically detect that and emit a >>>>> warning, but we don't have that yet. >>>>> >>>>> Here's an example of how you can break fusion: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L326-L339 >>>>> >>>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <sourabhba...@google.com> >>>>> wrote: >>>>> >>>>>> Yes you're correct. >>>>>> >>>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien < >>>>>> sebastien.mor...@veolia.com> wrote: >>>>>> >>>>>>> Between parenthesis of each step I meant the number of records in >>>>>>> output >>>>>>> >>>>>>> When I ungroup I send again the 200 data? not the 20M? >>>>>>> >>>>>>> Shouldn't I do instead: >>>>>>> Read (200) -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) -> >>>>>>> clean (20M) -> filter (20M) -> insert >>>>>>> >>>>>>> >>>>>>> *Sébastien MORAND* >>>>>>> Team Lead Solution Architect >>>>>>> Technology & Operations / Digital Factory >>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>> Bureau 0144C (Ouest) >>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>> <http://www.veolia.com> >>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>> <https://twitter.com/veolia> >>>>>>> >>>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <sourabhba...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I think you need to do something like: >>>>>>>> >>>>>>>> Read (200) -> GroupByKey (200) -> UnGroup(200) [Not this can be on >>>>>>>> 200 different workers] -> combine (20M) -> clean (20M) -> filter >>>>>>>> (20M) -> insert >>>>>>>> >>>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien < >>>>>>>> sebastien.mor...@veolia.com> wrote: >>>>>>>> >>>>>>>>> Yes fusion looks like my problem. A job ID to look at: >>>>>>>>> 2017-06-05_10_14_25-5856213199384263626. >>>>>>>>> >>>>>>>>> The point is in your link: >>>>>>>>> << >>>>>>>>> For example, one case in which fusion can limit Dataflow's ability >>>>>>>>> to optimize worker usage is a "high fan-out" ParDo. In such an >>>>>>>>> operation, >>>>>>>>> you might have an input collection with relatively few elements, but >>>>>>>>> the >>>>>>>>> ParDo produces an output with hundreds or thousands of times as many >>>>>>>>> elements, followed by another ParDo >>>>>>>>> >> >>>>>>>>> >>>>>>>>> This is exactly what I'm doing in the step >>>>>>>>> transform-combine-7d5ad942 in the above job id. >>>>>>>>> >>>>>>>>> As fas as I understand, I should create a GroupByKey after the >>>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the >>>>>>>>> data? >>>>>>>>> (meaning I add two operations in the pipeline to help the worker? >>>>>>>>> >>>>>>>>> Right now: >>>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) -> >>>>>>>>> insert >>>>>>>>> >>>>>>>>> Will become: >>>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M) >>>>>>>>> -> clean (20M) -> filter (20M) -> insert >>>>>>>>> >>>>>>>>> It this the right way? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *Sébastien MORAND* >>>>>>>>> Team Lead Solution Architect >>>>>>>>> Technology & Operations / Digital Factory >>>>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>>>> Bureau 0144C (Ouest) >>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>>>> <http://www.veolia.com> >>>>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>>>> <https://twitter.com/veolia> >>>>>>>>> >>>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <kirpic...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Do you have a Dataflow job ID to look at? >>>>>>>>>> It might be due to fusion >>>>>>>>>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion >>>>>>>>>> >>>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <prabsma...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or >>>>>>>>>>> n1-standard-8 >>>>>>>>>>> >>>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien < >>>>>>>>>>> sebastien.mor...@veolia.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow. >>>>>>>>>>>> >>>>>>>>>>>> My dataflow is pretty simple: I get a list of files from >>>>>>>>>>>> pubsub, so the number of files I'm going to use to feed the flow >>>>>>>>>>>> is well >>>>>>>>>>>> known at the begining. Here are my steps: >>>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of >>>>>>>>>>>> records >>>>>>>>>>>> >>>>>>>>>>>> - *First Step:* Read file contents from storage: files are >>>>>>>>>>>> .tar.gz containing each 4 files (CSV). I return the file >>>>>>>>>>>> content as the >>>>>>>>>>>> whole in a record >>>>>>>>>>>> *OUT:* 200 records (one for each file containing the data >>>>>>>>>>>> of all 4 files). Bascillacy it's a dict : {file1: >>>>>>>>>>>> content_of_file1, file2: >>>>>>>>>>>> content_of_file2, etc...} >>>>>>>>>>>> >>>>>>>>>>>> - *Second step:* Joining the data of the 4 files in one >>>>>>>>>>>> record (the main file contains foreign key to get information >>>>>>>>>>>> from the >>>>>>>>>>>> other files) >>>>>>>>>>>> *OUT:* 20,000,000 records each for every line in the files. >>>>>>>>>>>> Each record is a list of string >>>>>>>>>>>> >>>>>>>>>>>> - *Third step:* cleaning data (convert to prepare >>>>>>>>>>>> integration in bigquery) and set them as a dict where keys are >>>>>>>>>>>> bigquery >>>>>>>>>>>> column name. >>>>>>>>>>>> *OUT:* 20,000,000 records as dict for each record >>>>>>>>>>>> >>>>>>>>>>>> - *Fourth step:* insert into bigquery >>>>>>>>>>>> >>>>>>>>>>>> So the first step return 200 records, but I have 20,000,000 >>>>>>>>>>>> records to insert. >>>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ... >>>>>>>>>>>> >>>>>>>>>>>> If I manually set the number of workers, it's not really >>>>>>>>>>>> faster. So for an unknow reason, it doesn't scale, any ideas how >>>>>>>>>>>> to do it? >>>>>>>>>>>> >>>>>>>>>>>> Thanks for any help. >>>>>>>>>>>> >>>>>>>>>>>> *Sébastien MORAND* >>>>>>>>>>>> Team Lead Solution Architect >>>>>>>>>>>> Technology & Operations / Digital Factory >>>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>>>>>>> Bureau 0144C (Ouest) >>>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>>>>>>> <http://www.veolia.com> >>>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>>>>>>> <https://twitter.com/veolia> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -------------------------------------------------------------------------------------------- >>>>>>>>>>>> This e-mail transmission (message and any attached files) may >>>>>>>>>>>> contain information that is proprietary, privileged and/or >>>>>>>>>>>> confidential to >>>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended >>>>>>>>>>>> exclusively for >>>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>>>>>>> recipient, please notify the sender by return e-mail and delete >>>>>>>>>>>> all copies >>>>>>>>>>>> of this e-mail, including all attachments. Unless expressly >>>>>>>>>>>> authorized, any >>>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of >>>>>>>>>>>> this >>>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>>>>>>> >>>>>>>>>>>> Ce message electronique et ses fichiers attaches sont >>>>>>>>>>>> strictement confidentiels et peuvent contenir des elements dont >>>>>>>>>>>> Veolia >>>>>>>>>>>> Environnement et/ou l'une de ses entites affiliees sont >>>>>>>>>>>> proprietaires. Ils >>>>>>>>>>>> sont donc destines a l'usage de leurs seuls destinataires. Si vous >>>>>>>>>>>> avez >>>>>>>>>>>> recu ce message par erreur, merci de le retourner a son emetteur >>>>>>>>>>>> et de le >>>>>>>>>>>> detruire ainsi que toutes les pieces attachees. L'utilisation, la >>>>>>>>>>>> divulgation, la publication, la distribution, ou la reproduction >>>>>>>>>>>> non >>>>>>>>>>>> expressement autorisees de ce message et de ses pieces attachees >>>>>>>>>>>> sont >>>>>>>>>>>> interdites. >>>>>>>>>>>> >>>>>>>>>>>> -------------------------------------------------------------------------------------------- >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -------------------------------------------------------------------------------------------- >>>>>>>>> This e-mail transmission (message and any attached files) may >>>>>>>>> contain information that is proprietary, privileged and/or >>>>>>>>> confidential to >>>>>>>>> Veolia Environnement and/or its affiliates and is intended >>>>>>>>> exclusively for >>>>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>>>> recipient, please notify the sender by return e-mail and delete all >>>>>>>>> copies >>>>>>>>> of this e-mail, including all attachments. Unless expressly >>>>>>>>> authorized, any >>>>>>>>> use, disclosure, publication, retransmission or dissemination of this >>>>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>>>> >>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement >>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia >>>>>>>>> Environnement >>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le >>>>>>>>> detruire >>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, >>>>>>>>> la >>>>>>>>> publication, la distribution, ou la reproduction non expressement >>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>>>>>>> >>>>>>>>> -------------------------------------------------------------------------------------------- >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------- >>>>>>> This e-mail transmission (message and any attached files) may >>>>>>> contain information that is proprietary, privileged and/or confidential >>>>>>> to >>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively >>>>>>> for >>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>> recipient, please notify the sender by return e-mail and delete all >>>>>>> copies >>>>>>> of this e-mail, including all attachments. Unless expressly authorized, >>>>>>> any >>>>>>> use, disclosure, publication, retransmission or dissemination of this >>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>> >>>>>>> Ce message electronique et ses fichiers attaches sont strictement >>>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement >>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>>>>> message par erreur, merci de le retourner a son emetteur et de le >>>>>>> detruire >>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >>>>>>> publication, la distribution, ou la reproduction non expressement >>>>>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>>>>> >>>>>>> -------------------------------------------------------------------------------------------- >>>>>>> >>>>>> >>>> >>> >> >> >> >> -------------------------------------------------------------------------------------------- >> This e-mail transmission (message and any attached files) may contain >> information that is proprietary, privileged and/or confidential to Veolia >> Environnement and/or its affiliates and is intended exclusively for the >> person(s) to whom it is addressed. If you are not the intended recipient, >> please notify the sender by return e-mail and delete all copies of this >> e-mail, including all attachments. Unless expressly authorized, any use, >> disclosure, publication, retransmission or dissemination of this e-mail >> and/or of its attachments is strictly prohibited. >> >> Ce message electronique et ses fichiers attaches sont strictement >> confidentiels et peuvent contenir des elements dont Veolia Environnement >> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >> message par erreur, merci de le retourner a son emetteur et de le detruire >> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >> publication, la distribution, ou la reproduction non expressement >> autorisees de ce message et de ses pieces attachees sont interdites. >> >> -------------------------------------------------------------------------------------------- >> > >