I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
    a) for each keygroup, invoke
InternalTimeServiceManager.snapshotStateForKeyGroup
    b) InternalTimeServiceManager create a
InternalTimerServiceSerializationProxy to write snapshot
    c) InternalTimerServiceSerializationProxy iterat <String,//which is
service name,
    HeapInternalTimerService> tuple and write service name and
snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
writeTimersSnapshot
    d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
write keyserializer and namespaceserializer, then get eventTimers and
processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
and serializer them.

2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples
and then shallow copy the eventTimers and processingTimers, then use another
thread to snapshot them without blocking the event processing thread. but it
turns out that shallow copy of the eventTimers and processingTimers are time
consumed and this solution failed

3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
try to manage timers with it. but after digging more, I found out that there
is a more easy way to achieve asynchronous snapshot timers due to one fact:
InternalTimer is immutable. we can achieve asynchronous with a more easy way
based on this fact: 
    a)maintain a stateTableVersion, which is exactly the same thing as
CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
readwrite lock, which is used to protect snapshotVersions and
stateTableVersion
    b)for each InternalTimer, add 2 more properties: create version and
delete version beside 3 existing properties: timestamp, key and namespace.
each time a Timer is registered in timerservice, it is created with
stateTableVersion as its create version while delete version is -1. each
time when timer is deleted in timerservice, it is marked delete for giving
it a delete verison equals to stateTableVersion without physically delete it
from timerservice. 
    c)each time when try to snapshot timers, InternalTimeServiceManager
increase its stateTableVersion and add this stateTableVersion in
snapshotVersions. these 2 operators are protected by write lock of
InternalTimeServiceManager. that current stateTableVersion take as snapshot
version of this snapshot
    d)shallow copy <String,HeapInternalTimerService> tuples 
    e)then use a another thread asynchronous snapshot whole things:
keyserialized, namespaceserializer and timers. for timers which is not
deleted(delete version is -1) and create version less than snapshot version,
serialized it. for timers whose delete version is not -1 and is bigger than
or equals snapshot version, serialized it. otherwise, it will not be
serialized by this snapshot. 
    f)when everything is serialized, remove snapshot version in
snapshotVersions, which is still in another thread and this action is
guarded by write lock.
    g)last thing: timer physical deletion. 2 places to physically delete
timers: each time when timer is deleted in timerservice, it is marked delete
for giving it a delete verison equals to stateTableVersion without
physically delete it from timerservice. after this, check if timer's delete
version is less than min value of snapshotVersions with read lock
guarded(which means there is no active timer snapshot running) and if that
is true, physically delete it. the other place to delete is in snapshot
timer's iterat: when timer's delete version is less than min value of
snapshotVersions, which means the timer is deleted and no running snapshot
should keep it.
    h) some more additions: processingTimeTimers and eventTimeTimers for
each group used to be hashset and now it is changed to concurrenthashmap
with key+namesapce+timestamp as its hash key.

the code is done and test is still runnng. I post this comments not only try
to hear u guys voice, but also try to figure out some more questios related
to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
another thread of ProcessingTimeService, and in this thread, it will remove
timer in HeapInternalTimerService. while in current timer snapshot path, I
haven't found there is any shallow copy of processingTimeTimers and
eventTimeTimers. how could this won't cause concurrent modification
exception?
2. since onProcessingTime is trigged in another thread, when timers are
snapshot in working thread, what if then a timer is fired and triggerTarget
is processed, which could cause state changed, then asynchronous
keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
image this case: all kedyed state is only chaned by timer. so Add timer1,
timer2, timer3, timer4 and timer5 and since no timer is processed, keyed
state is nothing. then timer1 and timer2 is processed, keyed state is k2.
and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3,
timer4 and timer5 in synchronous way. then try to snapshot keyed state
asynchronous while timer3 is processed and keyed state is k3. the eventually
snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should
be timer3, timer4, timer5 and k2. please help me out this. 

thanks very much
by the way, if u guys won't mind, can anyone of u open a jira issue to track
this and when time is ok, I'll make contribution on this issue.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to