On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra <gauravmishra.it...@gmail.com>
wrote:

> Maybe the previous post was too verbose so I will try to summarize my
> question -
> If one instance of DoFn tries to set a timer for a time which is behind
> the pipeline's watermark, can this cause the pipeline to stall for other
> keys as well?
> "stall" meaning here - other keys' timers will start lagging behind.
>

It depends on the runner, but in general timers should be independent.
However practically every worker has only so many threads to process and
timers are processed in order, so if a large number of these "old"
timers are set and they take a long time to process, this could cause some
delays.


> say there are 1 million DoFns running in a steady state(behaving as
> expected), where timers are firing at 5 min boundaries.
>

Do you mean 1 million keys? What do you mean by 1 million DoFns?


> 1 bad key comes which sets its timer to a time which is 1 hour older than
> the current watermark. What happens here? my understanding here is this -
>  the looping timer will fire back to back in quick succession for this bad
> key 12 times and after that this key also joins the group of 1 million keys
> which were firing regularly at 5 min boundaries.
>

Where does the number 12 come from?


> PS - Above DoFn is using default Global Windows and default trigger.
>
>
> On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> Hello,
>> I have a pipeline which is generating heartbeats using looping timers in
>> a stateful dofn. Following is pseudo code for the process element and
>> onTimer methods
>>
>> StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...);
>> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>
>> processElemnt(input) {
>> // read event time from the message
>> Instant currentEventTime = input.getEventTimeEpoc();
>> if(input.state == ONLINE) {
>>    lastSeenMsg.write(input);
>>    // calculate start of looping timer
>>    // which will be next 5 min boundary
>>    long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000;
>>    long offset = currentEventTimeEpocSeconds % 300;
>>    long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>>    loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
>> }
>> else {
>>      // stop hearbeats when entity offline
>>       loopingTimer.clear();
>>    }
>> }
>>
>>
>> onTimer() {
>> // emit out the lastSeenMsg
>> output(lastSeenMsg.read());
>>
>>
>> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
>> }
>>
>>
>> The above pipeline works well in low load scenarios. But on one of my
>> heavy traffic deployment the pipeline seems to be not able to keep up with
>> the load. Input msg from pubsub are state change events for an entity -
>>  Entity Online or Entity Offline messages. Once a entity comes Online we
>> start generating heartbeat every 5 min as long as we do not encounter
>> Offline message for that entity. Number of online entities can be fairly
>> large, more than 10 Million entities can be Online at a given time.
>>
>> I am seeing this particular DoFn starts lagging behind as soon as it gets
>> started. The timers are firing pretty late. The lag went up to 48 hours
>> before I restarted the pipeline. Is there something wrong in what I am
>> doing.
>> Note - I am reading the eventTime embedded in the msg. Intent for this is
>> fire a bunch of timers in quick succession if needed and fill up the DB
>> with heartbeats till current time.
>> So say a msg comes with state = Online and time = 10.02 AM. and current
>> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
>> expect to fire immediately since the watermark is already ahead of this
>> time? (Or this is wrong understanding). Similarly the subsequent call to
>> onTimer method will set next timer to fire at 10:10 and that I also expect
>> to fire immediately. After this point this DoFn should start emitting at
>> same time with all other instances of this DoFn. Is there a mistake in this
>> implementaion?
>> Another thing I am noticing is that this pipeline is running a single
>> dataflow worker and not scaling up automatically. For such a large key
>> space (10 million DoFns and their timers) i expected the pipeline to use a
>> lot of CPU near the 5 minute boudaries and scale up but that is also not
>> happening.
>>
>

Reply via email to