This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 632f303a47bd89a97c831fd0532cb7541b80355d Author: Nick Vatamaniuc <vatam...@apache.org> AuthorDate: Thu Dec 20 12:19:01 2018 -0500 Clean rexi stream workers when coordinator process is killed Sometimes fabric coordinators end up getting brutally terminated [1], and in that case they might never process their `after` clause where their remote rexi workers are killed. Those workers are left lingering around keeping databases active for up to 5 minutes at a time. To prevent that from happening, let coordinators which use streams spawn an auxiliary cleaner process. This process will monitor the main coordinator and if it dies will ensure remote workers are killed, freeing resources immediately. In order not to send 2x the number of kill messages during the normal exit, fabric_util:cleanup() will stop the auxiliary process before continuing. [1] One instance is when the ddoc cache is refreshed: https://github.com/apache/couchdb/blob/master/src/ddoc_cache/src/ddoc_cache_entry.erl#L236 --- src/fabric/src/fabric_streams.erl | 132 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 32217c3..ae0c2be 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -22,6 +22,9 @@ -include_lib("mem3/include/mem3.hrl"). +-define(WORKER_CLEANER, fabric_worker_cleaner). + + start(Workers, Keypos) -> start(Workers, Keypos, undefined, undefined). @@ -32,6 +35,7 @@ start(Workers0, Keypos, StartFun, Replacements) -> start_fun = StartFun, replacements = Replacements }, + spawn_worker_cleaner(self(), Workers0), Timeout = fabric_util:request_timeout(), case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of {ok, #stream_acc{workers=Workers}} -> @@ -47,6 +51,16 @@ start(Workers0, Keypos, StartFun, Replacements) -> cleanup(Workers) -> + % Stop the auxiliary cleaner process as we got to the point where cleanup + % happesn in the regular fashion so we don't want to send 2x the number kill + % messages + case get(?WORKER_CLEANER) of + CleanerPid when is_pid(CleanerPid) -> + erase(?WORKER_CLEANER), + exit(CleanerPid, kill); + _ -> + ok + end, fabric_util:cleanup(Workers). @@ -72,6 +86,7 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) -> {value, {_Range, WorkerReplacements}, NewReplacements} -> FinalWorkers = lists:foldl(fun(Repl, NewWorkers) -> NewWorker = (St#stream_acc.start_fun)(Repl), + add_worker_to_cleaner(self(), NewWorker), fabric_dict:store(NewWorker, waiting, NewWorkers) end, Workers, WorkerReplacements), % Assert that our replaced worker provides us @@ -117,3 +132,120 @@ handle_stream_start({ok, ddoc_updated}, _, St) -> handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). + + +% Spawn an auxiliary rexi worker cleaner. This will be used in cases +% when the coordinator (request) process is forceably killed and doesn't +% get a chance to process its `after` fabric:clean/1 clause. +spawn_worker_cleaner(Coordinator, Workers) -> + case get(?WORKER_CLEANER) of + undefined -> + Pid = spawn(fun() -> + erlang:monitor(process, Coordinator), + cleaner_loop(Coordinator, Workers) + end), + put(?WORKER_CLEANER, Pid), + Pid; + ExistingCleaner -> + ExistingCleaner + end. + + +cleaner_loop(Pid, Workers) -> + receive + {add_worker, Pid, Worker} -> + cleaner_loop(Pid, [Worker | Workers]); + {'DOWN', _, _, Pid, _} -> + fabric_util:cleanup(Workers) + end. + + +add_worker_to_cleaner(CoordinatorPid, Worker) -> + case get(?WORKER_CLEANER) of + CleanerPid when is_pid(CleanerPid) -> + CleanerPid ! {add_worker, CoordinatorPid, Worker}; + _ -> + ok + end. + + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +worker_cleaner_test_() -> + { + "Fabric spawn_worker_cleaner test", { + setup, fun setup/0, fun teardown/1, + fun(_) -> [ + should_clean_workers(), + does_not_fire_if_cleanup_called(), + should_clean_additional_worker_too() + ] end + } + }. + + +should_clean_workers() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive {'DOWN', Ref, _, Cleaner, _} -> ok end, + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +does_not_fire_if_cleanup_called() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()}, + #shard{node = 'n2', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + Ref = erlang:monitor(process, Cleaner), + cleanup(Workers), + Coord ! die, + receive {'DOWN', Ref, _, _, _} -> ok end, + % 2 calls would be from cleanup/1 function. If cleanup process fired + % too it would have been 4 calls total. + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +should_clean_additional_worker_too() -> + ?_test(begin + meck:reset(rexi), + erase(?WORKER_CLEANER), + Workers = [ + #shard{node = 'n1', ref = make_ref()} + ], + {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end), + Cleaner = spawn_worker_cleaner(Coord, Workers), + add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}), + Ref = erlang:monitor(process, Cleaner), + Coord ! die, + receive {'DOWN', Ref, _, Cleaner, _} -> ok end, + ?assertEqual(2, meck:num_calls(rexi, kill, 2)) + end). + + +setup() -> + ok = meck:expect(rexi, kill, fun(_, _) -> ok end). + + +teardown(_) -> + meck:unload(). + +-endif.