Hey Amir,

One thing you might want to do is use something like
`CountingInput.unbounded().withRate`, which will let you generate a Long
with predictable rate. Then you can use a follow-on ParDo to sleep some
random period, then you can convert your Long to a simulated car, for
example.

Would that help?
Dan

On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <[email protected]>
wrote:

> Thanks for your response Ben.
> The sleep+read is a part of the problem solution requirements. I know what
> you mean by why not process them immediately.
> The problem solution intentionally slows down processing to simulate the
> traffic in expressway(s).
> The assumption is that each car in emits a "record" every 30 seconds.
> Making the story short, at runtime, the behavior I provided below is
> expected to be implemented to accurately provide a simulated solution.
> So lets say I want to inject a Sleep(random-seconds) in the pipeline
> superficially before actually ParDo gets into the action.
> What are the options to do that?
> And using TextIO(), how can I buffer the read records by TextIO() while
> Sleep() is in progress?
> Thanks for your valuable time.
>
>
> ------------------------------
> *From:* Ben Chambers <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Wednesday, August 24, 2016 3:24 PM
>
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> I think the most important question is why do you want to slow down the
> reads like that? If this is for testing purposes, there may be other
> options, such as test specific sources.
>
> At a high level, the process you describes sounds somewhat like an
> Unbounded Source, or perhaps an application of the not-yet-built Splittable
> DoFn (https://s.apache.org/splittable-do-fn).
>
> Even in those cases, "reading 100 records and then sleeping" is normally
> undesirable because it limits the throughput of the pipeline. If there were
> 1000 records waiting to be processed, why not process them?
>
> In general, a given step doesn't "submit elements to the next step". It
> just outputs the elements. This is important since there may be two steps
> that read from that PCollection, meaning thaht there isn't a single ParDo
> to submit the elements to.
>
> -- Ben
>
> On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <[email protected]>
> wrote:
>
> Hi Dan,
> Thanks so much for your response.
> Lets focus on your "The other side" section below.
> I provided the target process I am trying to implement in my first email
> below.
> According to your "runners do not expose hooks to control how often they
> read records." looks like I am out  of luck to achieve that on random
> basis.
> So, am trying to articulate an equivalent read/process as close as
> possible to what I want.
> From the "- Wake-up" step in my algorithm, I should be able to read
> records but no more than 100.
> Lets say I sleep for 150 milliseconds, - Wake-up, and read 100 records all
> at once, and submit it to ParDo DoFn to process one by one.
> How would that pipeline implementation look like?
> Is there an example that shows implementation how to "sleep 150 ms" in
> pipeline, then reading n number of records i.e.100 at once, and then submit
> them to ParDo to process one by one pls?
> I have tried so many ways to implement it but keep getting weird
> compilation errors...
> I appreciate your help.
> Amir-
>
> ------------------------------
> *From:* Dan Halperin <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Wednesday, August 24, 2016 1:42 PM
>
> *Subject:* Re: TextIO().Read pipeline implementation question
> Hi Amir,
>
> It is very hard to respond without sufficient details to reproduce. Can
> you please send a full pipeline that we can test with test data (e.g., the
> LICENSE file), including pipeline options (which runner, etc.)?
>
> The other side -- in general, runners do not expose hooks to control how
> often they read records. If you have something like TextIO.Read |
> ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot
> control how this is interleaved with reading. A runner is free to read all
> the records before sleeping, read one record and sleep in a loop, and
> everything in between.
>
> Thanks,
> Dan
> On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]>
> wrote:
>
> So here is what happened as a result of inserting Window of random seconds
> buffering in my TextIO().Read & DoFn<>:
> the number of records processed got doubled :-((
> Why is that? Could someone shed light on this pls, I appreciate it very
> much.
> Thanks.
> Amir-
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]. org <[email protected]>" 
> <[email protected].
> org <[email protected]>>
> *Sent:* Tuesday, August 23, 2016 4:40 PM
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> Would this implementation work?
> I am thinking to buffer records within a window of random seconds, process 
> DoFn
> them as per each record, and repeat another random window seconds length:
>
> p.apply(TextIO.Read.from("/ tmp/LRData.dat")).*apply(
> Window.<String>into( FixedWindows.of(Duration.
> standardSeconds((int)(((15-5) * r.nextDouble()) + 5)*))))
>
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> Thanks for your help.
> Amir-
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]. org <[email protected]>" 
> <[email protected].
> org <[email protected]>>
> *Sent:* Tuesday, August 23, 2016 3:51 PM
> *Subject:* TextIO().Read pipeline implementation question
>
> Hi Colleagues,
> I have no problem reading through TextIO() & processing, all by default
> behavior.
>
> p.apply(TextIO.Read.from("/ tmp/LRData.dat"))
>
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> I want to change this logic like the following:
>
> - Start executing TextIo().Read but before reading anything yet
> - Sleep for a random no of seconds between 5 & 15
> - Wake-up
> - Read the records from the file (for the time-stamps) while TextIo().Read
> was sleep
> - Process records
> - Back to putting TextIo() to sleep for  a random no of seconds between 5
> & 15 and continue til end of the file is reached
>
> I appreciate your suggestions and/or if you can point me to an example.
> Cheers+thanks
> Amir-
>
>
>
>

Reply via email to