Amir, it seems like your attempting to build a network simulation (
https://en.wikipedia.org/wiki/Network_simulation). Are you sure Apache Beam
is the right tool for this?

On Wed, Aug 24, 2016 at 3:54 PM, Thomas Groh <tg...@google.com> wrote:

> The Beam model generally is agnostic to the rate at which elements are
> produced and consumed. Instead, it uses the concept of a watermark to
> provide a completion metric, and element timestamps to record when an event
> happened (which is independent of when the event was processed). Your
> pipeline should be correct regardless of the input rate by using the
> (data-based) timestamp of arriving elements instead of the time they
> arrived in the Pipeline. This allows you to describe the output of your
> Pipeline in terms of the input records (which have associated timestamps)
> rather than the rate at which input arrived. You can assign timestamps to
> an existing PCollection using the 'WithTimestamps' PTransform, or create a
> new PCollection where elements have associated timestamps using the
> 'Create.timestamped()' PTransform. Some sources will also output elements
> with a Timestamp already associated with the element (e.g. KafkaIO or
> PubSubIO).
>
> If the sole desire is to rate limit your input, using
> CountingInput.unbounded().withRate(Duration) will output elements at a
> continuous rate to your downstream PCollection. This will output elements
> over time in such a way that the desired rate is reached.
>
> On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <amirto...@yahoo.com>
> wrote:
>
>> Thanks for your response Ben.
>> The sleep+read is a part of the problem solution requirements. I know
>> what you mean by why not process them immediately.
>> The problem solution intentionally slows down processing to simulate the
>> traffic in expressway(s).
>> The assumption is that each car in emits a "record" every 30 seconds.
>> Making the story short, at runtime, the behavior I provided below is
>> expected to be implemented to accurately provide a simulated solution.
>> So lets say I want to inject a Sleep(random-seconds) in the pipeline
>> superficially before actually ParDo gets into the action.
>> What are the options to do that?
>> And using TextIO(), how can I buffer the read records by TextIO() while
>> Sleep() is in progress?
>> Thanks for your valuable time.
>>
>>
>> ------------------------------
>> *From:* Ben Chambers <bchamb...@apache.org>
>> *To:* user@beam.incubator.apache.org; amir bahmanyari <
>> amirto...@yahoo.com>
>> *Sent:* Wednesday, August 24, 2016 3:24 PM
>>
>> *Subject:* Re: TextIO().Read pipeline implementation question
>>
>> I think the most important question is why do you want to slow down the
>> reads like that? If this is for testing purposes, there may be other
>> options, such as test specific sources.
>>
>> At a high level, the process you describes sounds somewhat like an
>> Unbounded Source, or perhaps an application of the not-yet-built Splittable
>> DoFn (https://s.apache.org/splittable-do-fn).
>>
>> Even in those cases, "reading 100 records and then sleeping" is normally
>> undesirable because it limits the throughput of the pipeline. If there were
>> 1000 records waiting to be processed, why not process them?
>>
>> In general, a given step doesn't "submit elements to the next step". It
>> just outputs the elements. This is important since there may be two steps
>> that read from that PCollection, meaning thaht there isn't a single ParDo
>> to submit the elements to.
>>
>> -- Ben
>>
>> On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <amirto...@yahoo.com>
>> wrote:
>>
>> Hi Dan,
>> Thanks so much for your response.
>> Lets focus on your "The other side" section below.
>> I provided the target process I am trying to implement in my first email
>> below.
>> According to your "runners do not expose hooks to control how often they
>> read records." looks like I am out  of luck to achieve that on random
>> basis.
>> So, am trying to articulate an equivalent read/process as close as
>> possible to what I want.
>> From the "- Wake-up" step in my algorithm, I should be able to read
>> records but no more than 100.
>> Lets say I sleep for 150 milliseconds, - Wake-up, and read 100 records
>> all at once, and submit it to ParDo DoFn to process one by one.
>> How would that pipeline implementation look like?
>> Is there an example that shows implementation how to "sleep 150 ms" in
>> pipeline, then reading n number of records i.e.100 at once, and then submit
>> them to ParDo to process one by one pls?
>> I have tried so many ways to implement it but keep getting weird
>> compilation errors...
>> I appreciate your help.
>> Amir-
>>
>> ------------------------------
>> *From:* Dan Halperin <dhalp...@google.com>
>> *To:* user@beam.incubator.apache.org; amir bahmanyari <
>> amirto...@yahoo.com>
>> *Sent:* Wednesday, August 24, 2016 1:42 PM
>>
>> *Subject:* Re: TextIO().Read pipeline implementation question
>> Hi Amir,
>>
>> It is very hard to respond without sufficient details to reproduce. Can
>> you please send a full pipeline that we can test with test data (e.g., the
>> LICENSE file), including pipeline options (which runner, etc.)?
>>
>> The other side -- in general, runners do not expose hooks to control how
>> often they read records. If you have something like TextIO.Read |
>> ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot
>> control how this is interleaved with reading. A runner is free to read all
>> the records before sleeping, read one record and sleep in a loop, and
>> everything in between.
>>
>> Thanks,
>> Dan
>> On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <amirto...@yahoo.com>
>> wrote:
>>
>> So here is what happened as a result of inserting Window of random
>> seconds buffering in my TextIO().Read & DoFn<>:
>> the number of records processed got doubled :-((
>> Why is that? Could someone shed light on this pls, I appreciate it very
>> much.
>> Thanks.
>> Amir-
>>
>> ------------------------------
>> *From:* amir bahmanyari <amirto...@yahoo.com>
>> *To:* "user@beam.incubator.apache. org <user@beam.incubator.apache.org>"
>> <user@beam.incubator.apache. org <user@beam.incubator.apache.org>>
>> *Sent:* Tuesday, August 23, 2016 4:40 PM
>> *Subject:* Re: TextIO().Read pipeline implementation question
>>
>> Would this implementation work?
>> I am thinking to buffer records within a window of random seconds,
>> process DoFn them as per each record, and repeat another random window
>> seconds length:
>>
>> p.apply(TextIO.Read.from("/ tmp/LRData.dat")).*apply(
>> Window.<String>into( FixedWindows.of(Duration.
>> standardSeconds((int)(((15-5) * r.nextDouble()) + 5)*))))
>>
>> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>>
>> Thanks for your help.
>> Amir-
>>
>>
>> ------------------------------
>> *From:* amir bahmanyari <amirto...@yahoo.com>
>> *To:* "user@beam.incubator.apache. org <user@beam.incubator.apache.org>"
>> <user@beam.incubator.apache. org <user@beam.incubator.apache.org>>
>> *Sent:* Tuesday, August 23, 2016 3:51 PM
>> *Subject:* TextIO().Read pipeline implementation question
>>
>> Hi Colleagues,
>> I have no problem reading through TextIO() & processing, all by default
>> behavior.
>>
>> p.apply(TextIO.Read.from("/ tmp/LRData.dat"))
>>
>> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>>
>> I want to change this logic like the following:
>>
>> - Start executing TextIo().Read but before reading anything yet
>> - Sleep for a random no of seconds between 5 & 15
>> - Wake-up
>> - Read the records from the file (for the time-stamps) while
>> TextIo().Read was sleep
>> - Process records
>> - Back to putting TextIo() to sleep for  a random no of seconds between 5
>> & 15 and continue til end of the file is reached
>>
>> I appreciate your suggestions and/or if you can point me to an example.
>> Cheers+thanks
>> Amir-
>>
>>
>>
>>
>

Reply via email to