GitHub user shanthoosh opened a pull request:

    https://github.com/apache/samza/pull/526

    Schedule ZkBarrierChangeHandler into debounce thread for execution. 

    In existing implementation, `ZkBarrierChangeHandler` is executed from the 
`ZkEventThread` and has following drawbacks:
    * `ZkWatch` events are buffered into a in-memory queue(maintained by 
ZkClient) and delivered one at a time to ZkClient listener implementations. If 
the exeuction of a delivered `ZkWatch` event is in progress, then no other 
`ZkWatch` event will be delivered to the listeners. If `ZkBarrierChangeHandler` 
is executed from `ZkEventThread`,  any increase in processing latency will 
delay the delivery of other `ZkWatch` events(buffered in in-memory queue of 
ZkClient).
    * During session expiration(zkConnection error scenario), buffering all 
events into `ScheduleAfterDebounceTime` helps us to garbage collect older 
generation events(to ensure correctness and not execute older generation 
`ZkWatch` events). 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shanthoosh/samza 
schedule_barrier_change_in_debounce_thread

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/526.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #526
    
----
commit 7c2540dcb2a9327b342679c02babc7fe7475c0ac
Author: Shanthoosh Venkataraman <svenkataraman@...>
Date:   2018-04-30T04:28:14Z

    SAMZA-1692: Standalone stability fixes.
    
    - Currently, on session expiration processorListener with incorrect 
generationId is registered with zookeeper(ZkUtils generationId is incremented 
on reconnect but the generationId in processorListener is zero all the time). 
When a session reconnect happens to a processor successive to leader, leader 
expiration event will be skipped. This will prevent leader re-election on a 
current leader death and will stall the processors group. Fix is to 
reinstantiate and then register processorChangeListener on session expiration.
    - Add processorId to debounce thread name (this can aid debugging when 
multiple processors are running within a jvm).
    - After ScheduleAfterDebounceTime queue is shutdown, don't accept new 
schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the 
following steps
         - Shutdown the ScheduleAfterDebounceTime queue.
         - Stop the zkClient  and relinquish it's resources.
    
    After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, 
any new operations can be scheduled in ScheduleAfterDebounceTime queue. This 
will result in RejectedExecutionException, since executorService is stopped.
    
    ```
    Caused by: java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@23f962a8 
rejected from java.util.concurrent.ScheduledThreadPoolExecutor@43408be8
    ```

commit ee27c18502b1e6398b2d3f739033b29c1dfb3cf1
Author: Shanthoosh Venkataraman <santhoshvenkat1988@...>
Date:   2018-05-07T20:23:58Z

    Review comments.

commit 0f9a5a810c39f690cfb40ee5f90896e23a6b5ad9
Author: Shanthoosh Venkataraman <santhoshvenkat1988@...>
Date:   2018-05-17T23:25:22Z

    Changes:
    
    In current implementation, barrier completion logic is executed from 
ZkClient EventThread.
    Schedule this logic to be executed from debounce thread.

----


---

Reply via email to