Hi Amir, It is very hard to respond without sufficient details to reproduce. Can you please send a full pipeline that we can test with test data (e.g., the LICENSE file), including pipeline options (which runner, etc.)?
The other side -- in general, runners do not expose hooks to control how often they read records. If you have something like TextIO.Read | ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot control how this is interleaved with reading. A runner is free to read all the records before sleeping, read one record and sleep in a loop, and everything in between. Thanks, Dan On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]> wrote: > So here is what happened as a result of inserting Window of random seconds > buffering in my TextIO().Read & DoFn<>: > the number of records processed got doubled :-(( > Why is that? Could someone shed light on this pls, I appreciate it very > much. > Thanks. > Amir- > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]" <[email protected]> > *Sent:* Tuesday, August 23, 2016 4:40 PM > *Subject:* Re: TextIO().Read pipeline implementation question > > Would this implementation work? > I am thinking to buffer records within a window of random seconds, process > DoFn > them as per each record, and repeat another random window seconds length: > > p.apply(TextIO.Read.from("/tmp/LRData.dat")).*apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds((int)(((15-5) > * r.nextDouble()) + 5)*)))) > .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { > > Thanks for your help. > Amir- > > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]" <[email protected]> > *Sent:* Tuesday, August 23, 2016 3:51 PM > *Subject:* TextIO().Read pipeline implementation question > > Hi Colleagues, > I have no problem reading through TextIO() & processing, all by default > behavior. > p.apply(TextIO.Read.from("/tmp/LRData.dat")) > .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { > > I want to change this logic like the following: > > - Start executing TextIo().Read but before reading anything yet > - Sleep for a random no of seconds between 5 & 15 > - Wake-up > - Read the records from the file (for the time-stamps) while TextIo().Read > was sleep > - Process records > - Back to putting TextIo() to sleep for a random no of seconds between 5 > & 15 and continue til end of the file is reached > > I appreciate your suggestions and/or if you can point me to an example. > Cheers+thanks > Amir- > > > > >
