nickva commented on code in PR #5152:
URL: https://github.com/apache/couchdb/pull/5152#discussion_r1694054379


##########
src/fabric/src/fabric_streams.erl:
##########
@@ -158,39 +176,49 @@ handle_stream_start(Else, _, _) ->
 % Spawn an auxiliary rexi worker cleaner. This will be used in cases
 % when the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers, ClientReq) ->
+spawn_worker_cleaner(Coordinator, Workers, ClientReq) when
+    is_pid(Coordinator), is_list(Workers)
+->
     case get(?WORKER_CLEANER) of
         undefined ->
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers, ClientReq)
+                NodeRefSet = set_from_list(shards_to_node_refs(Workers)),
+                cleaner_loop(Coordinator, NodeRefSet, ClientReq)
             end),
             put(?WORKER_CLEANER, Pid),
             Pid;
-        ExistingCleaner ->
+        ExistingCleaner when is_pid(ExistingCleaner) ->
             ExistingCleaner
     end.
 
-cleaner_loop(Pid, Workers, ClientReq) ->
+cleaner_loop(Pid, NodeRefSet, ClientReq) ->
     CheckMSec = chttpd_util:mochiweb_client_req_check_msec(),
     receive
-        {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers], ClientReq);
+        {add_node_ref, Pid, {_, _} = NodeRef} ->
+            cleaner_loop(Pid, sets:add_element(NodeRef, NodeRefSet), 
ClientReq);
         {'DOWN', _, _, Pid, _} ->
-            fabric_util:cleanup(Workers)
+            rexi:kill_all(sets:to_list(NodeRefSet))
     after CheckMSec ->
         chttpd_util:stop_client_process_if_disconnected(Pid, ClientReq),
-        cleaner_loop(Pid, Workers, ClientReq)
+        cleaner_loop(Pid, NodeRefSet, ClientReq)
     end.
 
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
+add_worker_to_cleaner(CoordinatorPid, #shard{node = Node, ref = Ref}) ->

Review Comment:
   This one is a bit in between. It is a `#shard` record, but since it has a 
ref set and it's tracking a remote job, we call it  a worker by convention. But 
it is kind of confusing using a `#shard` record for both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to