jaydoane commented on code in PR #5152:
URL: https://github.com/apache/couchdb/pull/5152#discussion_r1694026229
##########
src/fabric/src/fabric_streams.erl:
##########
@@ -158,39 +176,49 @@ handle_stream_start(Else, _, _) ->
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
% when the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers, ClientReq) ->
+spawn_worker_cleaner(Coordinator, Workers, ClientReq) when
+ is_pid(Coordinator), is_list(Workers)
+->
case get(?WORKER_CLEANER) of
undefined ->
Pid = spawn(fun() ->
erlang:monitor(process, Coordinator),
- cleaner_loop(Coordinator, Workers, ClientReq)
+ NodeRefSet = set_from_list(shards_to_node_refs(Workers)),
+ cleaner_loop(Coordinator, NodeRefSet, ClientReq)
end),
put(?WORKER_CLEANER, Pid),
Pid;
- ExistingCleaner ->
+ ExistingCleaner when is_pid(ExistingCleaner) ->
ExistingCleaner
end.
-cleaner_loop(Pid, Workers, ClientReq) ->
+cleaner_loop(Pid, NodeRefSet, ClientReq) ->
CheckMSec = chttpd_util:mochiweb_client_req_check_msec(),
receive
- {add_worker, Pid, Worker} ->
- cleaner_loop(Pid, [Worker | Workers], ClientReq);
+ {add_node_ref, Pid, {_, _} = NodeRef} ->
+ cleaner_loop(Pid, sets:add_element(NodeRef, NodeRefSet),
ClientReq);
{'DOWN', _, _, Pid, _} ->
- fabric_util:cleanup(Workers)
+ rexi:kill_all(sets:to_list(NodeRefSet))
after CheckMSec ->
chttpd_util:stop_client_process_if_disconnected(Pid, ClientReq),
- cleaner_loop(Pid, Workers, ClientReq)
+ cleaner_loop(Pid, NodeRefSet, ClientReq)
end.
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
+add_worker_to_cleaner(CoordinatorPid, #shard{node = Node, ref = Ref}) ->
Review Comment:
Maybe rename to `add_shard_to_cleaner`?
##########
src/rexi/src/rexi.erl:
##########
@@ -93,36 +124,63 @@ sync_reply(Reply, Timeout) ->
timeout
end.
-%% @doc Start a worker stream
+%% Start a worker stream. Coordinator sends this message to the worker to tell
+%% it to start streaming, after the worker sent a rexi_STREAM_INIT message.
+%%
+%% The `From` should be the value provided by the worker in the
+%% rexi_STREAM_INIT message.
%%
-%% If a coordinator wants to continue using a streaming worker it
-%% should use this function to inform the worker to continue
-%% sending messages. The `From` should be the value provided by
-%% the worker in the rexi_STREAM_INIT message.
-spec stream_start({pid(), any()}) -> ok.
stream_start({Pid, _Tag} = From) when is_pid(Pid) ->
gen_server:reply(From, rexi_STREAM_START).
-%% @doc Cancel a worker stream
+%% Cancel a worker stream
+%%
+%% If a coordinator decides that a worker is not going to be part of the
+%% response, it should use this function to cancel the worker. The `From`
+%% should be the value provided by the worker in the rexi_STREAM_INIT message.
%%
-%% If a coordinator decideds that a worker is not going to be part
-%% of the response it should use this function to cancel the worker.
-%% The `From` should be the value provided by the worker in the
-%% rexi_STREAM_INIT message.
-spec stream_cancel({pid(), any()}) -> ok.
stream_cancel({Pid, _Tag} = From) when is_pid(Pid) ->
gen_server:reply(From, rexi_STREAM_CANCEL).
-%% @equiv stream2(Msg, 5, 300000)
+%% Like stream2(Msg, 5, 300000)
stream2(Msg) ->
Limit = config:get_integer("rexi", "stream_limit", 5),
stream2(Msg, Limit, 300000).
-%% @doc Stream a message back to the coordinator. It limits the
-%% number of unacked messsages to Limit and throws a timeout error
-%% if it doesn't receive an ack in Timeout milliseconds. This
-%% is a combination of the old stream_start and stream functions
-%% which automatically does the stream initialization logic.
+%% Stream messages back to the coordinator. Initializes on first use. Limit
+%% the number of unacked messsages to Limit, and throw a timeout error if it
+%% doesn't receive an ack in Timeout milliseconds.
+%%
+%% The general protocol looks like this:
+%%
+%% Coordinator Worker (one of Q*N usually)
+%% ---------- --------------------------
+%% cast/2,3,4 -> {doit, ...} -> rexi_server:
+%% spawn_monitor worker process.
+%% First time stream2/1 is called it
+%% runs init_stream/1.
+%%
+%% init_stream/1:
+%% <- rexi_STREAM_INIT <- sync send, wait for reply
+%%
+%% Some workers are told to
+%% continue with rexi_STREAM_START.
+%% Others are told to stop with
+%% rexi_STREAM_CANCEL
+%%
+%% -> rexi_STREAM_START ->
+%% Keep calling rexi:stream2/3
+%% to stream data to coordinator...
+%%
+%% <- Caller ! {Ref, self(), Msg} <-
+%% ...
+%% Coordinator must acknowledge.
+%% -> {rexi_ack, 1} ->
+%% Send last message. No need for ack.
+%% <- Caller ! Msg <-
+%%
Review Comment:
This is awesome!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]