Amir, it seems like your attempting to build a network simulation ( https://en.wikipedia.org/wiki/Network_simulation). Are you sure Apache Beam is the right tool for this?
On Wed, Aug 24, 2016 at 3:54 PM, Thomas Groh <tg...@google.com> wrote: > The Beam model generally is agnostic to the rate at which elements are > produced and consumed. Instead, it uses the concept of a watermark to > provide a completion metric, and element timestamps to record when an event > happened (which is independent of when the event was processed). Your > pipeline should be correct regardless of the input rate by using the > (data-based) timestamp of arriving elements instead of the time they > arrived in the Pipeline. This allows you to describe the output of your > Pipeline in terms of the input records (which have associated timestamps) > rather than the rate at which input arrived. You can assign timestamps to > an existing PCollection using the 'WithTimestamps' PTransform, or create a > new PCollection where elements have associated timestamps using the > 'Create.timestamped()' PTransform. Some sources will also output elements > with a Timestamp already associated with the element (e.g. KafkaIO or > PubSubIO). > > If the sole desire is to rate limit your input, using > CountingInput.unbounded().withRate(Duration) will output elements at a > continuous rate to your downstream PCollection. This will output elements > over time in such a way that the desired rate is reached. > > On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <amirto...@yahoo.com> > wrote: > >> Thanks for your response Ben. >> The sleep+read is a part of the problem solution requirements. I know >> what you mean by why not process them immediately. >> The problem solution intentionally slows down processing to simulate the >> traffic in expressway(s). >> The assumption is that each car in emits a "record" every 30 seconds. >> Making the story short, at runtime, the behavior I provided below is >> expected to be implemented to accurately provide a simulated solution. >> So lets say I want to inject a Sleep(random-seconds) in the pipeline >> superficially before actually ParDo gets into the action. >> What are the options to do that? >> And using TextIO(), how can I buffer the read records by TextIO() while >> Sleep() is in progress? >> Thanks for your valuable time. >> >> >> ------------------------------ >> *From:* Ben Chambers <bchamb...@apache.org> >> *To:* user@beam.incubator.apache.org; amir bahmanyari < >> amirto...@yahoo.com> >> *Sent:* Wednesday, August 24, 2016 3:24 PM >> >> *Subject:* Re: TextIO().Read pipeline implementation question >> >> I think the most important question is why do you want to slow down the >> reads like that? If this is for testing purposes, there may be other >> options, such as test specific sources. >> >> At a high level, the process you describes sounds somewhat like an >> Unbounded Source, or perhaps an application of the not-yet-built Splittable >> DoFn (https://s.apache.org/splittable-do-fn). >> >> Even in those cases, "reading 100 records and then sleeping" is normally >> undesirable because it limits the throughput of the pipeline. If there were >> 1000 records waiting to be processed, why not process them? >> >> In general, a given step doesn't "submit elements to the next step". It >> just outputs the elements. This is important since there may be two steps >> that read from that PCollection, meaning thaht there isn't a single ParDo >> to submit the elements to. >> >> -- Ben >> >> On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <amirto...@yahoo.com> >> wrote: >> >> Hi Dan, >> Thanks so much for your response. >> Lets focus on your "The other side" section below. >> I provided the target process I am trying to implement in my first email >> below. >> According to your "runners do not expose hooks to control how often they >> read records." looks like I am out of luck to achieve that on random >> basis. >> So, am trying to articulate an equivalent read/process as close as >> possible to what I want. >> From the "- Wake-up" step in my algorithm, I should be able to read >> records but no more than 100. >> Lets say I sleep for 150 milliseconds, - Wake-up, and read 100 records >> all at once, and submit it to ParDo DoFn to process one by one. >> How would that pipeline implementation look like? >> Is there an example that shows implementation how to "sleep 150 ms" in >> pipeline, then reading n number of records i.e.100 at once, and then submit >> them to ParDo to process one by one pls? >> I have tried so many ways to implement it but keep getting weird >> compilation errors... >> I appreciate your help. >> Amir- >> >> ------------------------------ >> *From:* Dan Halperin <dhalp...@google.com> >> *To:* user@beam.incubator.apache.org; amir bahmanyari < >> amirto...@yahoo.com> >> *Sent:* Wednesday, August 24, 2016 1:42 PM >> >> *Subject:* Re: TextIO().Read pipeline implementation question >> Hi Amir, >> >> It is very hard to respond without sufficient details to reproduce. Can >> you please send a full pipeline that we can test with test data (e.g., the >> LICENSE file), including pipeline options (which runner, etc.)? >> >> The other side -- in general, runners do not expose hooks to control how >> often they read records. If you have something like TextIO.Read | >> ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot >> control how this is interleaved with reading. A runner is free to read all >> the records before sleeping, read one record and sleep in a loop, and >> everything in between. >> >> Thanks, >> Dan >> On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <amirto...@yahoo.com> >> wrote: >> >> So here is what happened as a result of inserting Window of random >> seconds buffering in my TextIO().Read & DoFn<>: >> the number of records processed got doubled :-(( >> Why is that? Could someone shed light on this pls, I appreciate it very >> much. >> Thanks. >> Amir- >> >> ------------------------------ >> *From:* amir bahmanyari <amirto...@yahoo.com> >> *To:* "user@beam.incubator.apache. org <user@beam.incubator.apache.org>" >> <user@beam.incubator.apache. org <user@beam.incubator.apache.org>> >> *Sent:* Tuesday, August 23, 2016 4:40 PM >> *Subject:* Re: TextIO().Read pipeline implementation question >> >> Would this implementation work? >> I am thinking to buffer records within a window of random seconds, >> process DoFn them as per each record, and repeat another random window >> seconds length: >> >> p.apply(TextIO.Read.from("/ tmp/LRData.dat")).*apply( >> Window.<String>into( FixedWindows.of(Duration. >> standardSeconds((int)(((15-5) * r.nextDouble()) + 5)*)))) >> >> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { >> >> Thanks for your help. >> Amir- >> >> >> ------------------------------ >> *From:* amir bahmanyari <amirto...@yahoo.com> >> *To:* "user@beam.incubator.apache. org <user@beam.incubator.apache.org>" >> <user@beam.incubator.apache. org <user@beam.incubator.apache.org>> >> *Sent:* Tuesday, August 23, 2016 3:51 PM >> *Subject:* TextIO().Read pipeline implementation question >> >> Hi Colleagues, >> I have no problem reading through TextIO() & processing, all by default >> behavior. >> >> p.apply(TextIO.Read.from("/ tmp/LRData.dat")) >> >> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { >> >> I want to change this logic like the following: >> >> - Start executing TextIo().Read but before reading anything yet >> - Sleep for a random no of seconds between 5 & 15 >> - Wake-up >> - Read the records from the file (for the time-stamps) while >> TextIo().Read was sleep >> - Process records >> - Back to putting TextIo() to sleep for a random no of seconds between 5 >> & 15 and continue til end of the file is reached >> >> I appreciate your suggestions and/or if you can point me to an example. >> Cheers+thanks >> Amir- >> >> >> >> >