This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 632f303a47bd89a97c831fd0532cb7541b80355d
Author: Nick Vatamaniuc <vatam...@apache.org>
AuthorDate: Thu Dec 20 12:19:01 2018 -0500

    Clean rexi stream workers when coordinator process is killed
    
    Sometimes fabric coordinators end up getting brutally terminated [1], and 
in that
    case they might never process their `after` clause where their remote rexi
    workers are killed. Those workers are left lingering around keeping 
databases
    active for up to 5 minutes at a time.
    
    To prevent that from happening, let coordinators which use streams spawn an
    auxiliary cleaner process. This process will monitor the main coordinator 
and
    if it dies will ensure remote workers are killed, freeing resources
    immediately. In order not to send 2x the number of kill messages during the
    normal exit, fabric_util:cleanup() will stop the auxiliary process before
    continuing.
    
    [1] One instance is when the ddoc cache is refreshed:
     
https://github.com/apache/couchdb/blob/master/src/ddoc_cache/src/ddoc_cache_entry.erl#L236
---
 src/fabric/src/fabric_streams.erl | 132 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 132 insertions(+)

diff --git a/src/fabric/src/fabric_streams.erl 
b/src/fabric/src/fabric_streams.erl
index 32217c3..ae0c2be 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -22,6 +22,9 @@
 -include_lib("mem3/include/mem3.hrl").
 
 
+-define(WORKER_CLEANER, fabric_worker_cleaner).
+
+
 start(Workers, Keypos) ->
     start(Workers, Keypos, undefined, undefined).
 
@@ -32,6 +35,7 @@ start(Workers0, Keypos, StartFun, Replacements) ->
         start_fun = StartFun,
         replacements = Replacements
     },
+    spawn_worker_cleaner(self(), Workers0),
     Timeout = fabric_util:request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{workers=Workers}} ->
@@ -47,6 +51,16 @@ start(Workers0, Keypos, StartFun, Replacements) ->
 
 
 cleanup(Workers) ->
+    % Stop the auxiliary cleaner process as we got to the point where cleanup
+    % happesn in the regular fashion so we don't want to send 2x the number 
kill
+    % messages
+    case get(?WORKER_CLEANER) of
+        CleanerPid when is_pid(CleanerPid) ->
+            erase(?WORKER_CLEANER),
+            exit(CleanerPid, kill);
+        _ ->
+            ok
+    end,
     fabric_util:cleanup(Workers).
 
 
@@ -72,6 +86,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
             {value, {_Range, WorkerReplacements}, NewReplacements} ->
                 FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
                     NewWorker = (St#stream_acc.start_fun)(Repl),
+                    add_worker_to_cleaner(self(), NewWorker),
                     fabric_dict:store(NewWorker, waiting, NewWorkers)
                 end, Workers, WorkerReplacements),
                 % Assert that our replaced worker provides us
@@ -117,3 +132,120 @@ handle_stream_start({ok, ddoc_updated}, _, St) ->
 
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
+
+
+% Spawn an auxiliary rexi worker cleaner. This will be used in cases
+% when the coordinator (request) process is forceably killed and doesn't
+% get a chance to process its `after` fabric:clean/1 clause.
+spawn_worker_cleaner(Coordinator, Workers) ->
+    case get(?WORKER_CLEANER) of
+        undefined ->
+            Pid = spawn(fun() ->
+                erlang:monitor(process, Coordinator),
+                cleaner_loop(Coordinator, Workers)
+            end),
+            put(?WORKER_CLEANER, Pid),
+            Pid;
+         ExistingCleaner ->
+            ExistingCleaner
+   end.
+
+
+cleaner_loop(Pid, Workers) ->
+    receive
+        {add_worker, Pid, Worker} ->
+            cleaner_loop(Pid, [Worker | Workers]);
+        {'DOWN', _, _, Pid, _} ->
+            fabric_util:cleanup(Workers)
+    end.
+
+
+add_worker_to_cleaner(CoordinatorPid, Worker) ->
+    case get(?WORKER_CLEANER) of
+        CleanerPid when is_pid(CleanerPid) ->
+            CleanerPid ! {add_worker, CoordinatorPid, Worker};
+        _ ->
+            ok
+    end.
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+worker_cleaner_test_() ->
+    {
+        "Fabric spawn_worker_cleaner test", {
+            setup, fun setup/0, fun teardown/1,
+            fun(_) -> [
+                should_clean_workers(),
+                does_not_fire_if_cleanup_called(),
+                should_clean_additional_worker_too()
+            ] end
+        }
+    }.
+
+
+should_clean_workers() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()},
+            #shard{node = 'n2', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        Ref = erlang:monitor(process, Cleaner),
+        Coord ! die,
+        receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+does_not_fire_if_cleanup_called() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()},
+            #shard{node = 'n2', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        Ref = erlang:monitor(process, Cleaner),
+        cleanup(Workers),
+        Coord ! die,
+        receive {'DOWN', Ref, _, _, _} -> ok end,
+        % 2 calls would be from cleanup/1 function. If cleanup process fired
+        % too it would have been 4 calls total.
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+should_clean_additional_worker_too() ->
+    ?_test(begin
+        meck:reset(rexi),
+        erase(?WORKER_CLEANER),
+        Workers = [
+            #shard{node = 'n1', ref = make_ref()}
+        ],
+        {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+        Cleaner = spawn_worker_cleaner(Coord, Workers),
+        add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
+        Ref = erlang:monitor(process, Cleaner),
+        Coord ! die,
+        receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+        ?assertEqual(2, meck:num_calls(rexi, kill, 2))
+    end).
+
+
+setup() ->
+    ok = meck:expect(rexi, kill, fun(_, _) -> ok end).
+
+
+teardown(_) ->
+    meck:unload().
+
+-endif.

Reply via email to