On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra <gauravmishra.it...@gmail.com> wrote:
> Maybe the previous post was too verbose so I will try to summarize my > question - > If one instance of DoFn tries to set a timer for a time which is behind > the pipeline's watermark, can this cause the pipeline to stall for other > keys as well? > "stall" meaning here - other keys' timers will start lagging behind. > It depends on the runner, but in general timers should be independent. However practically every worker has only so many threads to process and timers are processed in order, so if a large number of these "old" timers are set and they take a long time to process, this could cause some delays. > say there are 1 million DoFns running in a steady state(behaving as > expected), where timers are firing at 5 min boundaries. > Do you mean 1 million keys? What do you mean by 1 million DoFns? > 1 bad key comes which sets its timer to a time which is 1 hour older than > the current watermark. What happens here? my understanding here is this - > the looping timer will fire back to back in quick succession for this bad > key 12 times and after that this key also joins the group of 1 million keys > which were firing regularly at 5 min boundaries. > Where does the number 12 come from? > PS - Above DoFn is using default Global Windows and default trigger. > > > On Thu, Jul 7, 2022 at 11:09 PM gaurav mishra < > gauravmishra.it...@gmail.com> wrote: > >> Hello, >> I have a pipeline which is generating heartbeats using looping timers in >> a stateful dofn. Following is pseudo code for the process element and >> onTimer methods >> >> StateSpec<ValueState<Input>> lastSeenMsg = StateSpecs.value(...); >> TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); >> >> >> processElemnt(input) { >> // read event time from the message >> Instant currentEventTime = input.getEventTimeEpoc(); >> if(input.state == ONLINE) { >> lastSeenMsg.write(input); >> // calculate start of looping timer >> // which will be next 5 min boundary >> long currentEventTimeEpocSeconds = currentEventTime.getMillis() / 1000; >> long offset = currentEventTimeEpocSeconds % 300; >> long nextFireTimeSeconds = currentEventTimeEpocSeconds - offset + 300; >> loopingTimer.set(Instant.ofEpochSecond(nextFireTimeSeconds)); >> } >> else { >> // stop hearbeats when entity offline >> loopingTimer.clear(); >> } >> } >> >> >> onTimer() { >> // emit out the lastSeenMsg >> output(lastSeenMsg.read()); >> >> >> loopingTimer.set(timerContext.timestamp().plus(Duration.standardSeconds(300))); >> } >> >> >> The above pipeline works well in low load scenarios. But on one of my >> heavy traffic deployment the pipeline seems to be not able to keep up with >> the load. Input msg from pubsub are state change events for an entity - >> Entity Online or Entity Offline messages. Once a entity comes Online we >> start generating heartbeat every 5 min as long as we do not encounter >> Offline message for that entity. Number of online entities can be fairly >> large, more than 10 Million entities can be Online at a given time. >> >> I am seeing this particular DoFn starts lagging behind as soon as it gets >> started. The timers are firing pretty late. The lag went up to 48 hours >> before I restarted the pipeline. Is there something wrong in what I am >> doing. >> Note - I am reading the eventTime embedded in the msg. Intent for this is >> fire a bunch of timers in quick succession if needed and fill up the DB >> with heartbeats till current time. >> So say a msg comes with state = Online and time = 10.02 AM. and current >> watermark is at 10.13AM. I set the loopingTimer to start at 10:05, which i >> expect to fire immediately since the watermark is already ahead of this >> time? (Or this is wrong understanding). Similarly the subsequent call to >> onTimer method will set next timer to fire at 10:10 and that I also expect >> to fire immediately. After this point this DoFn should start emitting at >> same time with all other instances of this DoFn. Is there a mistake in this >> implementaion? >> Another thing I am noticing is that this pipeline is running a single >> dataflow worker and not scaling up automatically. For such a large key >> space (10 million DoFns and their timers) i expected the pipeline to use a >> lot of CPU near the 5 minute boudaries and scale up but that is also not >> happening. >> >