Hi Dan,Thanks so much for your response.Lets focus on your "The other side"
section below.I provided the target process I am trying to implement in my
first email below.According to your "runners do not expose hooks to control how
often they read records." looks like I am out of luck to achieve that on
random basis.So, am trying to articulate an equivalent read/process as close as
possible to what I want.From the "- Wake-up" step in my algorithm, I should be
able to read records but no more than 100.Lets say I sleep for 150
milliseconds, - Wake-up, and read 100 records all at once, and submit it to
ParDo DoFn to process one by one.How would that pipeline implementation look
like? Is there an example that shows implementation how to "sleep 150 ms" in
pipeline, then reading n number of records i.e.100 at once, and then submit
them to ParDo to process one by one pls?I have tried so many ways to implement
it but keep getting weird compilation errors...I appreciate your help.Amir-
From: Dan Halperin <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Wednesday, August 24, 2016 1:42 PM
Subject: Re: TextIO().Read pipeline implementation question
Hi Amir,
It is very hard to respond without sufficient details to reproduce. Can you
please send a full pipeline that we can test with test data (e.g., the LICENSE
file), including pipeline options (which runner, etc.)?
The other side -- in general, runners do not expose hooks to control how often
they read records. If you have something like TextIO.Read | ParDo.of(sleep for
1s) you will get 1s sleep per record, but you cannot control how this is
interleaved with reading. A runner is free to read all the records before
sleeping, read one record and sleep in a loop, and everything in between.
Thanks,Dan
On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]> wrote:
So here is what happened as a result of inserting Window of random seconds
buffering in my TextIO().Read & DoFn<>:the number of records processed got
doubled :-((Why is that? Could someone shed light on this pls, I appreciate it
very much.Thanks.Amir-
From: amir bahmanyari <[email protected]>
To: "[email protected]. org" <[email protected]. org>
Sent: Tuesday, August 23, 2016 4:40 PM
Subject: Re: TextIO().Read pipeline implementation question
Would this implementation work?I am thinking to buffer records within a window
of random seconds, process DoFn them as per each record, and repeat another
random window seconds length:
p.apply(TextIO.Read.from("/ tmp/LRData.dat")).apply( Window.<String>into(
FixedWindows.of(Duration. standardSeconds((int)(((15-5) * r.nextDouble()) +
5))))) .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
Thanks for your help.Amir-
From: amir bahmanyari <[email protected]>
To: "[email protected]. org" <[email protected]. org>
Sent: Tuesday, August 23, 2016 3:51 PM
Subject: TextIO().Read pipeline implementation question
Hi Colleagues,I have no problem reading through TextIO() & processing, all by
default behavior.p.apply(TextIO.Read.from("/ tmp/LRData.dat"))
.apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
I want to change this logic like the following:
- Start executing TextIo().Read but before reading anything yet- Sleep for a
random no of seconds between 5 & 15- Wake-up- Read the records from the file
(for the time-stamps) while TextIo().Read was sleep- Process records- Back to
putting TextIo() to sleep for a random no of seconds between 5 & 15 and
continue til end of the file is reached
I appreciate your suggestions and/or if you can point me to an
example.Cheers+thanksAmir-