Hi Eugene, Thank you very much for this long and detailed answer, it helps me a lot.
I understand what is happening and actually I understand what I'm trying is not possible actually and it makes perfect sense to me now. Is there a way to window by a number of files so it's easier to detect the window is complete and it can process? To remember the point of my current process: Read 500 data -> transform data which become 50,000,000 -> bigquery So because of this high fan-out, I have to group by key the 500 elements and so ensure the scaling is done correctly. This part is working like a charm, but then it first scan the entire file and group by and then it starts handle the 500 files to have the 50,000,000 records treated by the rest of the pipeline. So basically I would like the two parts of my pipeline a little bit more simultaneous. Ideas Regards, *Sébastien MORAND* Team Lead Solution Architect Technology & Operations / Digital Factory Veolia - Group Information Systems & Technology (IS&T) Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 Bureau 0144C (Ouest) 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France *www.veolia.com <http://www.veolia.com>* <http://www.veolia.com> <https://www.facebook.com/veoliaenvironment/> <https://www.youtube.com/user/veoliaenvironnement> <https://www.linkedin.com/company/veolia-environnement> <https://twitter.com/veolia> On 27 June 2017 at 07:57, Eugene Kirpichov <[email protected]> wrote: > Hi Sebastien, > > Suppose you have 10,000 files containing timestamped user activity events, > keyed by user id. Suppose within every file events are NOT ordered by > timestamp, and suppose there's no ordering by timestamp between files > either - it's just that there was a large dataset with events, and they've > been more or less randomly written to 10,000 files. > > You write a pipeline that reads these files, windows the events into > 5-minute fixed windows, groups them by user id, and computes the count in > each group. > > 1) If you were an Apache Beam runner, how would you go about finding what > is the "first window" in the entire dataset and when it "arrives"? Since > the data is not ordered by time in any way, there is no way to find what is > the "first window" without scanning the entire data; and all the data has > already "arrived" - the 10,000 files are already sitting there. > > 2) Suppose one of the user ids is 42. Suppose one of the events of this > user happened on 2017-06-25 3:42PM which falls into the window [2017-06-25 > 3:40PM .. 2017-06-25 3:45PM]. If you were an Apache Beam runner, how would > you go about deciding when you've seen all the activity for this user in > this window and can process the group? Again, since files are not ordered > by time or by user id, the only answer is "after you've scanned the entire > dataset". > > Typical Apache Beam runners in batch mode will scan all the events in the > entire dataset, assign windows to each event, and then perform a huge and > unordered GroupByKey by (key,window), ending up with a lot of > ((key,window):[events]) groups. Each group can only be declared complete > and can be processed after you've scanned the entire dataset, because if > there's any part of the dataset that you haven't yet scanned, there is a > chance that it contains data that might fall into this group. > > Let me know if any of this mismatches your understanding of what Windowing > means in Beam, or how pipelines execute. > > On Mon, Jun 26, 2017 at 10:38 PM Morand, Sebastien < > [email protected]> wrote: > >> Ok It's what I've done, but I thought it was possible that process after >> the group start when the first Window arrives and not when all data is >> processed in the group-by-key, so there is no way to start the second part >> of the pipeline (after the group-by-key) in the same time than the first >> part? >> >> *Sébastien MORAND* >> Team Lead Solution Architect >> Technology & Operations / Digital Factory >> Veolia - Group Information Systems & Technology (IS&T) >> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >> <+33%201%2085%2057%2071%2008> >> Bureau 0144C (Ouest) >> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >> *www.veolia.com <http://www.veolia.com>* >> <http://www.veolia.com> >> <https://www.facebook.com/veoliaenvironment/> >> <https://www.youtube.com/user/veoliaenvironnement> >> <https://www.linkedin.com/company/veolia-environnement> >> <https://twitter.com/veolia> >> >> On 24 June 2017 at 17:55, Robert Bradshaw <[email protected]> wrote: >> >>> Yes, you can use Windowing in batch mode by applying your own >>> timestamps. For example, >>> >>> from apache_beam.transforms.window import TimestampedValue >>> >>> grouped_by_window = (p >>> | Read(...) >>> | beam.Map(lambda x: TimestampedValue(x, timestamp=extract_timestamp(x) >>> ) >>> | beam.WindowInto(FixedWindows(5)) >>> | beam.Map(lambda x: (key(x), value(x))) # If the data wasn't already >>> keyed... >>> | beam.GroupByKey()) >>> >>> Now grouped_by_windows will contain a (k, vs) tuple for each key and >>> five-second window. When running in batch mode, everything up to the >>> group-by-key will be executed in its entirety before anything after, >>> but the data will still be appropriately partitioned. >>> >>> - Robert >>> >>> >>> >>> On Sat, Jun 24, 2017 at 5:08 AM, Morand, Sebastien >>> <[email protected]> wrote: >>> > Hi Ahmet, >>> > >>> > Yes I know, it's exactly what I said in message: streaming is NOT >>> supported >>> > in python yet. >>> > >>> > My question is: is there any way to timestamp the bounded sources and >>> use >>> > the Windowing in BATCH mode? >>> > >>> > Regards, >>> > >>> > >>> > Sébastien MORAND >>> > Team Lead Solution Architect >>> > Technology & Operations / Digital Factory >>> > Veolia - Group Information Systems & Technology (IS&T) >>> > Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>> > Bureau 0144C (Ouest) >>> > 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>> > www.veolia.com >>> > >>> > >>> > >>> > On 23 June 2017 at 21:32, Ahmet Altay <[email protected]> wrote: >>> >> >>> >> Hi Sebastien, >>> >> >>> >> Streaming is not supported in the Python SDK yet. There is a steady >>> >> progress on that and soon we will be able to offer it. It is already >>> >> supported in Java. >>> >> >>> >> Thank you, >>> >> Ahmet >>> >> >>> >> On Fri, Jun 23, 2017 at 12:26 PM, Morand, Sebastien >>> >> <[email protected]> wrote: >>> >>> >>> >>> Ok I think I understand these concepts, so as streamline is not >>> supported >>> >>> in python yet, there is no way to timestamp the bounded sources, >>> isn't >>> >>> there? >>> >>> >>> >>> Is it possible in java? >>> >>> >>> >>> Sébastien MORAND >>> >>> Team Lead Solution Architect >>> >>> Technology & Operations / Digital Factory >>> >>> Veolia - Group Information Systems & Technology (IS&T) >>> >>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>> >>> Bureau 0144C (Ouest) >>> >>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>> >>> www.veolia.com >>> >>> >>> >>> >>> >>> >>> >>> On 23 June 2017 at 17:04, Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> >>> >>>> To elaborate on what Kenn said: the main difference between batch >>> and >>> >>>> streaming execution modes of a typical Beam runner is how they >>> handle >>> >>>> GroupByKey. >>> >>>> >>> >>>> The next stage can start processing the KV<K, Iterable<V>> in a >>> >>>> particular window (assuming Java) when the GBK is sure it's seen all >>> >>>> elements with this key in this window. >>> >>>> >>> >>>> A typical runner in streaming mode assumes that data within a key is >>> >>>> more or less ordered by time, with the watermark giving a bound on >>> how out >>> >>>> of order it's likely to be. Because of that, runners can eg declare >>> a window >>> >>>> of a key "complete" when the watermark reaches the end of window. >>> More >>> >>>> precisely, this is controlled by triggers. >>> >>>> >>> >>>> However, typical data sources used in batch mode (eg files, key >>> value >>> >>>> tables etc) are not ordered by time at all and hence data is >>> assumed to be >>> >>>> completely out of order and no useful watermark can be provided. So >>> runners >>> >>>> have only one option: process all data, and then declare all keys >>> and all >>> >>>> windows complete, and then start the next stage. >>> >>>> >>> >>>> Note that this is not a fundamental limitation of the Beam model: >>> it's >>> >>>> conceivable to have a runner that reads a bounded dataset (eg a set >>> of log >>> >>>> files marked with date in the filename) with some knowledge of >>> their time >>> >>>> ordering and could pipeline the stages. Just current runners treat >>> all >>> >>>> bounded datasets as completely out of order, and the current API >>> for custom >>> >>>> bounded sources doesn't even have a function for reporting a >>> watermark. >>> >>>> >>> >>>> Hope this helps. >>> >>>> >>> >>>> >>> >>>> On Fri, Jun 23, 2017, 7:02 AM Kenneth Knowles <[email protected]> >>> wrote: >>> >>>>> >>> >>>>> Hello! >>> >>>>> >>> >>>>> The behavior you are seeing is what makes something batch mode >>> >>>>> processing. The essential definition of streaming mode processing >>> is that >>> >>>>> you get output before you have processed all the data. >>> >>>>> >>> >>>>> Event time windowing does not control when computations occur - >>> when >>> >>>>> you window into FixedWindows of five seconds, this means your data >>> will be >>> >>>>> grouped according to the window that contains the timestamp on the >>> event. In >>> >>>>> streaming mode, this grouping will generally be output soon after >>> the >>> >>>>> watermark exceeds the end of the window, but this can be >>> customized using >>> >>>>> triggers. >>> >>>>> >>> >>>>> Kenn >>> >>>>> >>> >>>>> On Thu, Jun 22, 2017 at 10:13 AM, Morand, Sebastien >>> >>>>> <[email protected]> wrote: >>> >>>>>> >>> >>>>>> Hi, >>> >>>>>> >>> >>>>>> I'm trying to window a batch, but whatever I try, the timestamp >>> is not >>> >>>>>> working, it's blocked by the group by : >>> >>>>>> >>> >>>>>> >>> >>>>>> I want the transform-comine (the last on my screenshot) starts >>> before >>> >>>>>> the group_source has read all the files in input, and it's never >>> happening. >>> >>>>>> First it's reading all my files in the group by, and when it's >>> over, it >>> >>>>>> starts the transform-combine ... >>> >>>>>> >>> >>>>>> I put a FixedWindow with 5 seconds, doesn't change anything, any >>> way >>> >>>>>> to do so? >>> >>>>>> >>> >>>>>> NB, JOB ID : 2017-06-22_10_03_47-4479358064592021427 >>> >>>>>> >>> >>>>>> thanks by advance >>> >>>>>> >>> >>>>>> Sébastien MORAND >>> >>>>>> Team Lead Solution Architect >>> >>>>>> Technology & Operations / Digital Factory >>> >>>>>> Veolia - Group Information Systems & Technology (IS&T) >>> >>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>> >>>>>> Bureau 0144C (Ouest) >>> >>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>> >>>>>> www.veolia.com >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> ------------------------------------------------------------ >>> -------------------------------- >>> >>>>>> This e-mail transmission (message and any attached files) may >>> contain >>> >>>>>> information that is proprietary, privileged and/or confidential >>> to Veolia >>> >>>>>> Environnement and/or its affiliates and is intended exclusively >>> for the >>> >>>>>> person(s) to whom it is addressed. If you are not the intended >>> recipient, >>> >>>>>> please notify the sender by return e-mail and delete all copies >>> of this >>> >>>>>> e-mail, including all attachments. Unless expressly authorized, >>> any use, >>> >>>>>> disclosure, publication, retransmission or dissemination of this >>> e-mail >>> >>>>>> and/or of its attachments is strictly prohibited. >>> >>>>>> >>> >>>>>> Ce message electronique et ses fichiers attaches sont strictement >>> >>>>>> confidentiels et peuvent contenir des elements dont Veolia >>> Environnement >>> >>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont >>> donc >>> >>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez >>> recu ce >>> >>>>>> message par erreur, merci de le retourner a son emetteur et de le >>> detruire >>> >>>>>> ainsi que toutes les pieces attachees. L'utilisation, la >>> divulgation, la >>> >>>>>> publication, la distribution, ou la reproduction non expressement >>> autorisees >>> >>>>>> de ce message et de ses pieces attachees sont interdites. >>> >>>>>> >>> >>>>>> ------------------------------------------------------------ >>> -------------------------------- >>> >>>>> >>> >>>>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> ------------------------------------------------------------ >>> -------------------------------- >>> >>> This e-mail transmission (message and any attached files) may contain >>> >>> information that is proprietary, privileged and/or confidential to >>> Veolia >>> >>> Environnement and/or its affiliates and is intended exclusively for >>> the >>> >>> person(s) to whom it is addressed. If you are not the intended >>> recipient, >>> >>> please notify the sender by return e-mail and delete all copies of >>> this >>> >>> e-mail, including all attachments. Unless expressly authorized, any >>> use, >>> >>> disclosure, publication, retransmission or dissemination of this >>> e-mail >>> >>> and/or of its attachments is strictly prohibited. >>> >>> >>> >>> Ce message electronique et ses fichiers attaches sont strictement >>> >>> confidentiels et peuvent contenir des elements dont Veolia >>> Environnement >>> >>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont >>> donc >>> >>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>> >>> message par erreur, merci de le retourner a son emetteur et de le >>> detruire >>> >>> ainsi que toutes les pieces attachees. L'utilisation, la >>> divulgation, la >>> >>> publication, la distribution, ou la reproduction non expressement >>> autorisees >>> >>> de ce message et de ses pieces attachees sont interdites. >>> >>> >>> >>> ------------------------------------------------------------ >>> -------------------------------- >>> >> >>> >> >>> > >>> > >>> > >>> > ------------------------------------------------------------ >>> -------------------------------- >>> > This e-mail transmission (message and any attached files) may contain >>> > information that is proprietary, privileged and/or confidential to >>> Veolia >>> > Environnement and/or its affiliates and is intended exclusively for the >>> > person(s) to whom it is addressed. If you are not the intended >>> recipient, >>> > please notify the sender by return e-mail and delete all copies of this >>> > e-mail, including all attachments. Unless expressly authorized, any >>> use, >>> > disclosure, publication, retransmission or dissemination of this e-mail >>> > and/or of its attachments is strictly prohibited. >>> > >>> > Ce message electronique et ses fichiers attaches sont strictement >>> > confidentiels et peuvent contenir des elements dont Veolia >>> Environnement >>> > et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>> > destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>> > message par erreur, merci de le retourner a son emetteur et de le >>> detruire >>> > ainsi que toutes les pieces attachees. L'utilisation, la divulgation, >>> la >>> > publication, la distribution, ou la reproduction non expressement >>> autorisees >>> > de ce message et de ses pieces attachees sont interdites. >>> > ------------------------------------------------------------ >>> -------------------------------- >>> >> >> >> >> ------------------------------------------------------------ >> -------------------------------- >> This e-mail transmission (message and any attached files) may contain >> information that is proprietary, privileged and/or confidential to Veolia >> Environnement and/or its affiliates and is intended exclusively for the >> person(s) to whom it is addressed. If you are not the intended recipient, >> please notify the sender by return e-mail and delete all copies of this >> e-mail, including all attachments. Unless expressly authorized, any use, >> disclosure, publication, retransmission or dissemination of this e-mail >> and/or of its attachments is strictly prohibited. >> >> Ce message electronique et ses fichiers attaches sont strictement >> confidentiels et peuvent contenir des elements dont Veolia Environnement >> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >> message par erreur, merci de le retourner a son emetteur et de le detruire >> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >> publication, la distribution, ou la reproduction non expressement >> autorisees de ce message et de ses pieces attachees sont interdites. >> ------------------------------------------------------------ >> -------------------------------- >> > -- -------------------------------------------------------------------------------------------- This e-mail transmission (message and any attached files) may contain information that is proprietary, privileged and/or confidential to Veolia Environnement and/or its affiliates and is intended exclusively for the person(s) to whom it is addressed. If you are not the intended recipient, please notify the sender by return e-mail and delete all copies of this e-mail, including all attachments. Unless expressly authorized, any use, disclosure, publication, retransmission or dissemination of this e-mail and/or of its attachments is strictly prohibited. Ce message electronique et ses fichiers attaches sont strictement confidentiels et peuvent contenir des elements dont Veolia Environnement et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc destines a l'usage de leurs seuls destinataires. Si vous avez recu ce message par erreur, merci de le retourner a son emetteur et de le detruire ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la publication, la distribution, ou la reproduction non expressement autorisees de ce message et de ses pieces attachees sont interdites. --------------------------------------------------------------------------------------------
