Hi Stefan , Fabian , Keyang is engineer in our team, he has do a lot of efforts on the timers' snapshot async. What do you think of his idea?
Best, Deqiang TIG.JD.COM <http://tig.jd.com/> > 在 2018年4月1日,下午7:21,makeyang <riverbuild...@hotmail.com> 写道: > > I have put a lot of efforts on this issue and try to resolve it: > 1. let me describe current timers' snapshot path first: > a) for each keygroup, invoke > InternalTimeServiceManager.snapshotStateForKeyGroup > b) InternalTimeServiceManager create a > InternalTimerServiceSerializationProxy to write snapshot > c) InternalTimerServiceSerializationProxy iterat <String,//which is > service name, > HeapInternalTimerService> tuple and write service name and > snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to > writeTimersSnapshot > d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first > write keyserializer and namespaceserializer, then get eventTimers and > processingTimers of InternalTimersSnapshot, which is Set of InternalTimer > and serializer them. > > 2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples > and then shallow copy the eventTimers and processingTimers, then use another > thread to snapshot them without blocking the event processing thread. but it > turns out that shallow copy of the eventTimers and processingTimers are time > consumed and this solution failed > > 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and > try to manage timers with it. but after digging more, I found out that there > is a more easy way to achieve asynchronous snapshot timers due to one fact: > InternalTimer is immutable. we can achieve asynchronous with a more easy way > based on this fact: > a)maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing > as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > b)for each InternalTimer, add 2 more properties: create version and > delete version beside 3 existing properties: timestamp, key and namespace. > each time a Timer is registered in timerservice, it is created with > stateTableVersion as its create version while delete version is -1. each > time when timer is deleted in timerservice, it is marked delete for giving > it a delete verison equals to stateTableVersion without physically delete it > from timerservice. > c)each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > d)shallow copy <String,HeapInternalTimerService> tuples > e)then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > f)when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is > guarded by write lock. > g)last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without > physically delete it from timerservice. after this, check if timer's delete > version is less than min value of snapshotVersions with read lock > guarded(which means there is no active timer snapshot running) and if that > is true, physically delete it. the other place to delete is in snapshot > timer's iterat: when timer's delete version is less than min value of > snapshotVersions, which means the timer is deleted and no running snapshot > should keep it. > h) some more additions: processingTimeTimers and eventTimeTimers for > each group used to be hashset and now it is changed to concurrenthashmap > with key+namesapce+timestamp as its hash key. > > the code is done and test is still runnng. I post this comments not only try > to hear u guys voice, but also try to figure out some more questios related > to currently timer snapshot code path. my questions are below: > 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by > another thread of ProcessingTimeService, and in this thread, it will remove > timer in HeapInternalTimerService. while in current timer snapshot path, I > haven't found there is any shallow copy of processingTimeTimers and > eventTimeTimers. how could this won't cause concurrent modification > exception? > 2. since onProcessingTime is trigged in another thread, when timers are > snapshot in working thread, what if then a timer is fired and triggerTarget > is processed, which could cause state changed, then asynchronous > keyedstatsnapshot is trigged. won't this cause state inconsistent? let's > image this case: all kedyed state is only chaned by timer. so Add timer1, > timer2, timer3, timer4 and timer5 and since no timer is processed, keyed > state is nothing. then timer1 and timer2 is processed, keyed state is k2. > and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3, > timer4 and timer5 in synchronous way. then try to snapshot keyed state > asynchronous while timer3 is processed and keyed state is k3. the eventually > snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should > be timer3, timer4, timer5 and k2. please help me out this. > > thanks very much > by the way, if u guys won't mind, can anyone of u open a jira issue to track > this and when time is ok, I'll make contribution on this issue. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/