Hi Stefan , Fabian ,
                Keyang  is engineer in our team, he has do a lot of efforts on 
the timers' snapshot async. What do you think of his idea?

TIG.JD.COM <http://tig.jd.com/>

> 在 2018年4月1日,下午7:21,makeyang <riverbuild...@hotmail.com> 写道:
> I have put a lot of efforts on this issue and try to resolve it:
> 1. let me describe current timers' snapshot path first:
>    a) for each keygroup, invoke
> InternalTimeServiceManager.snapshotStateForKeyGroup
>    b) InternalTimeServiceManager create a
> InternalTimerServiceSerializationProxy to write snapshot
>    c) InternalTimerServiceSerializationProxy iterat <String,//which is
> service name,
>    HeapInternalTimerService> tuple and write service name and
> snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
> writeTimersSnapshot
>    d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
> write keyserializer and namespaceserializer, then get eventTimers and
> processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
> and serializer them.
> 2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples
> and then shallow copy the eventTimers and processingTimers, then use another
> thread to snapshot them without blocking the event processing thread. but it
> turns out that shallow copy of the eventTimers and processingTimers are time
> consumed and this solution failed
> 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
> try to manage timers with it. but after digging more, I found out that there
> is a more easy way to achieve asynchronous snapshot timers due to one fact:
> InternalTimer is immutable. we can achieve asynchronous with a more easy way
> based on this fact: 
>    a)maintain a stateTableVersion, which is exactly the same thing as
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
> as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
> readwrite lock, which is used to protect snapshotVersions and
> stateTableVersion
>    b)for each InternalTimer, add 2 more properties: create version and
> delete version beside 3 existing properties: timestamp, key and namespace.
> each time a Timer is registered in timerservice, it is created with
> stateTableVersion as its create version while delete version is -1. each
> time when timer is deleted in timerservice, it is marked delete for giving
> it a delete verison equals to stateTableVersion without physically delete it
> from timerservice. 
>    c)each time when try to snapshot timers, InternalTimeServiceManager
> increase its stateTableVersion and add this stateTableVersion in
> snapshotVersions. these 2 operators are protected by write lock of
> InternalTimeServiceManager. that current stateTableVersion take as snapshot
> version of this snapshot
>    d)shallow copy <String,HeapInternalTimerService> tuples 
>    e)then use a another thread asynchronous snapshot whole things:
> keyserialized, namespaceserializer and timers. for timers which is not
> deleted(delete version is -1) and create version less than snapshot version,
> serialized it. for timers whose delete version is not -1 and is bigger than
> or equals snapshot version, serialized it. otherwise, it will not be
> serialized by this snapshot. 
>    f)when everything is serialized, remove snapshot version in
> snapshotVersions, which is still in another thread and this action is
> guarded by write lock.
>    g)last thing: timer physical deletion. 2 places to physically delete
> timers: each time when timer is deleted in timerservice, it is marked delete
> for giving it a delete verison equals to stateTableVersion without
> physically delete it from timerservice. after this, check if timer's delete
> version is less than min value of snapshotVersions with read lock
> guarded(which means there is no active timer snapshot running) and if that
> is true, physically delete it. the other place to delete is in snapshot
> timer's iterat: when timer's delete version is less than min value of
> snapshotVersions, which means the timer is deleted and no running snapshot
> should keep it.
>    h) some more additions: processingTimeTimers and eventTimeTimers for
> each group used to be hashset and now it is changed to concurrenthashmap
> with key+namesapce+timestamp as its hash key.
> the code is done and test is still runnng. I post this comments not only try
> to hear u guys voice, but also try to figure out some more questios related
> to currently timer snapshot code path. my questions are below:
> 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
> another thread of ProcessingTimeService, and in this thread, it will remove
> timer in HeapInternalTimerService. while in current timer snapshot path, I
> haven't found there is any shallow copy of processingTimeTimers and
> eventTimeTimers. how could this won't cause concurrent modification
> exception?
> 2. since onProcessingTime is trigged in another thread, when timers are
> snapshot in working thread, what if then a timer is fired and triggerTarget
> is processed, which could cause state changed, then asynchronous
> keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
> image this case: all kedyed state is only chaned by timer. so Add timer1,
> timer2, timer3, timer4 and timer5 and since no timer is processed, keyed
> state is nothing. then timer1 and timer2 is processed, keyed state is k2.
> and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3,
> timer4 and timer5 in synchronous way. then try to snapshot keyed state
> asynchronous while timer3 is processed and keyed state is k3. the eventually
> snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should
> be timer3, timer4, timer5 and k2. please help me out this. 
> thanks very much
> by the way, if u guys won't mind, can anyone of u open a jira issue to track
> this and when time is ok, I'll make contribution on this issue.
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to