On Sat, Jul 9, 2022 at 9:28 AM Reuven Lax via user <[email protected]> wrote:
> > > On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra <[email protected]> > wrote: > >> Maybe the previous post was too verbose so I will try to summarize my >> question - >> If one instance of DoFn tries to set a timer for a time which is behind >> the pipeline's watermark, can this cause the pipeline to stall for other >> keys as well? >> "stall" meaning here - other keys' timers will start lagging behind. >> > > It depends on the runner, but in general timers should be independent. > However practically every worker has only so many threads to process and > timers are processed in order, so if a large number of these "old" > timers are set and they take a long time to process, this could cause some > delays. > > >> say there are 1 million DoFns running in a steady state(behaving as >> expected), where timers are firing at 5 min boundaries. >> > > Do you mean 1 million keys? What do you mean by 1 million DoFns? > yes I meant one million keys here > > >> 1 bad key comes which sets its timer to a time which is 1 hour older than >> the current watermark. What happens here? my understanding here is this - >> the looping timer will fire back to back in quick succession for this bad >> key 12 times and after that this key also joins the group of 1 million keys >> which were firing regularly at 5 min boundaries. >> > > Where does the number 12 come from? > assuming code is trying to generate heart beats every 5 mins. So 60/5. > > >> PS - Above DoFn is using default Global Windows and default trigger. >> >> >> On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra < >> [email protected]> wrote: >> >>> Hello, >>> I have a pipeline which is generating heartbeats using looping timers in >>> a stateful dofn. Following is pseudo code for the process element and >>> onTimer methods >>> >>> StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...); >>> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); >>> >>> >>> processElemnt(input) { >>> // read event time from the message >>> Instant currentEventTime = input.getEventTimeEpoc(); >>> if(input.state == ONLINE) { >>> lastSeenMsg.write(input); >>> // calculate start of looping timer >>> // which will be next 5 min boundary >>> long currentEventTimeEpocSeconds = currentEventTime.getMillis() / >>> 1000; >>> long offset = currentEventTimeEpocSeconds % 300; >>> long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300; >>> loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds)); >>> } >>> else { >>> // stop hearbeats when entity offline >>> loopingTimer.clear(); >>> } >>> } >>> >>> >>> onTimer() { >>> // emit out the lastSeenMsg >>> output(lastSeenMsg.read()); >>> >>> >>> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300))); >>> } >>> >>> >>> The above pipeline works well in low load scenarios. But on one of my >>> heavy traffic deployment the pipeline seems to be not able to keep up with >>> the load. Input msg from pubsub are state change events for an entity - >>> Entity Online or Entity Offline messages. Once a entity comes Online we >>> start generating heartbeat every 5 min as long as we do not encounter >>> Offline message for that entity. Number of online entities can be fairly >>> large, more than 10 Million entities can be Online at a given time. >>> >>> I am seeing this particular DoFn starts lagging behind as soon as it >>> gets started. The timers are firing pretty late. The lag went up to 48 >>> hours before I restarted the pipeline. Is there something wrong in what I >>> am doing. >>> Note - I am reading the eventTime embedded in the msg. Intent for this >>> is fire a bunch of timers in quick succession if needed and fill up the DB >>> with heartbeats till current time. >>> So say a msg comes with state = Online and time = 10.02 AM. and current >>> watermark is at 10.13AM. I set the loopingTimer to start at 10:05, which i >>> expect to fire immediately since the watermark is already ahead of this >>> time? (Or this is wrong understanding). Similarly the subsequent call to >>> onTimer method will set next timer to fire at 10:10 and that I also expect >>> to fire immediately. After this point this DoFn should start emitting at >>> same time with all other instances of this DoFn. Is there a mistake in this >>> implementaion? >>> Another thing I am noticing is that this pipeline is running a single >>> dataflow worker and not scaling up automatically. For such a large key >>> space (10 million DoFns and their timers) i expected the pipeline to use a >>> lot of CPU near the 5 minute boudaries and scale up but that is also not >>> happening. >>> >>
