On Sat, Jul 9, 2022 at 9:28 AM Reuven Lax via user <[email protected]>
wrote:

>
>
> On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra <[email protected]>
> wrote:
>
>> Maybe the previous post was too verbose so I will try to summarize my
>> question -
>> If one instance of DoFn tries to set a timer for a time which is behind
>> the pipeline's watermark, can this cause the pipeline to stall for other
>> keys as well?
>> "stall" meaning here - other keys' timers will start lagging behind.
>>
>
> It depends on the runner, but in general timers should be independent.
> However practically every worker has only so many threads to process and
> timers are processed in order, so if a large number of these "old"
> timers are set and they take a long time to process, this could cause some
> delays.
>
>
>> say there are 1 million DoFns running in a steady state(behaving as
>> expected), where timers are firing at 5 min boundaries.
>>
>
> Do you mean 1 million keys? What do you mean by 1 million DoFns?
>
yes I meant one million keys here

>
>
>> 1 bad key comes which sets its timer to a time which is 1 hour older than
>> the current watermark. What happens here? my understanding here is this -
>>  the looping timer will fire back to back in quick succession for this bad
>> key 12 times and after that this key also joins the group of 1 million keys
>> which were firing regularly at 5 min boundaries.
>>
>
> Where does the number 12 come from?
>
assuming code is trying to generate heart beats every 5 mins. So 60/5.

>
>
>> PS - Above DoFn is using default Global Windows and default trigger.
>>
>>
>> On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra <
>> [email protected]> wrote:
>>
>>> Hello,
>>> I have a pipeline which is generating heartbeats using looping timers in
>>> a stateful dofn. Following is pseudo code for the process element and
>>> onTimer methods
>>>
>>> StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...);
>>> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>
>>> processElemnt(input) {
>>> // read event time from the message
>>> Instant currentEventTime = input.getEventTimeEpoc();
>>> if(input.state == ONLINE) {
>>>    lastSeenMsg.write(input);
>>>    // calculate start of looping timer
>>>    // which will be next 5 min boundary
>>>    long currentEventTimeEpocSeconds = currentEventTime.getMillis() /
>>> 1000;
>>>    long offset = currentEventTimeEpocSeconds % 300;
>>>    long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300;
>>>    loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds));
>>> }
>>> else {
>>>      // stop hearbeats when entity offline
>>>       loopingTimer.clear();
>>>    }
>>> }
>>>
>>>
>>> onTimer() {
>>> // emit out the lastSeenMsg
>>> output(lastSeenMsg.read());
>>>
>>>
>>> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300)));
>>> }
>>>
>>>
>>> The above pipeline works well in low load scenarios. But on one of my
>>> heavy traffic deployment the pipeline seems to be not able to keep up with
>>> the load. Input msg from pubsub are state change events for an entity -
>>>  Entity Online or Entity Offline messages. Once a entity comes Online we
>>> start generating heartbeat every 5 min as long as we do not encounter
>>> Offline message for that entity. Number of online entities can be fairly
>>> large, more than 10 Million entities can be Online at a given time.
>>>
>>> I am seeing this particular DoFn starts lagging behind as soon as it
>>> gets started. The timers are firing pretty late. The lag went up to 48
>>> hours before I restarted the pipeline. Is there something wrong in what I
>>> am doing.
>>> Note - I am reading the eventTime embedded in the msg. Intent for this
>>> is fire a bunch of timers in quick succession if needed and fill up the DB
>>> with heartbeats till current time.
>>> So say a msg comes with state = Online and time = 10.02 AM. and current
>>> watermark is at 10.13AM.  I set the loopingTimer to start at 10:05, which i
>>> expect to fire immediately since the watermark is already ahead of this
>>> time? (Or this is wrong understanding). Similarly the subsequent call to
>>> onTimer method will set next timer to fire at 10:10 and that I also expect
>>> to fire immediately. After this point this DoFn should start emitting at
>>> same time with all other instances of this DoFn. Is there a mistake in this
>>> implementaion?
>>> Another thing I am noticing is that this pipeline is running a single
>>> dataflow worker and not scaling up automatically. For such a large key
>>> space (10 million DoFns and their timers) i expected the pipeline to use a
>>> lot of CPU near the 5 minute boudaries and scale up but that is also not
>>> happening.
>>>
>>

Reply via email to