nickva commented on code in PR #4729:
URL: https://github.com/apache/couchdb/pull/4729#discussion_r1302491408


##########
src/fabric/src/fabric_streams.erl:
##########
@@ -152,45 +168,101 @@ handle_stream_start({ok, Error}, _, St) when Error == 
ddoc_updated; Error == ins
 handle_stream_start(Else, _, _) ->
     exit({invalid_stream_start, Else}).
 
-% Spawn an auxiliary rexi worker cleaner. This will be used in cases
-% when the coordinator (request) process is forceably killed and doesn't
+% Spawn an auxiliary rexi worker watchdog which triggers cleanup if;
+% * nothing has been streamed for $timeout duration
+% * the coordinator (request) process is forceably killed and doesn't
 % get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
-    case get(?WORKER_CLEANER) of
+
+% The watchdog is initially disabled. Clients can enable it by calling
+% enable_watchdog/0 before starting a fabric_stream. That client is responsible
+% for calling kick_watchdog/0 on activity to prevent the watchdog from acting.
+% If kick_watchdog/0 is not called at least once in each watchdog interval,
+% the stream coordinator is killed and the workers are cleaned up.
+spawn_worker_watchdog(Coordinator, Workers) ->
+    case get(?WORKER_WATCHDOG) of
         undefined ->
+            State0 = #watchdog_state{
+                coordinator = Coordinator,
+                workers = Workers
+            },
+            Enabled = get(?WATCHDOG_ENABLE),
             Pid = spawn(fun() ->
                 erlang:monitor(process, Coordinator),
-                cleaner_loop(Coordinator, Workers)
+                State1 =
+                    case Enabled of
+                        true ->
+                            reset_watchdog(State0);
+                        undefined ->
+                            State0
+                    end,
+                watchdog_loop(State1)
             end),
-            put(?WORKER_CLEANER, Pid),
+            put(?WORKER_WATCHDOG, Pid),
             Pid;
-        ExistingCleaner ->
-            ExistingCleaner
+        ExistingWatchdog ->
+            ExistingWatchdog
     end.
 
-cleaner_loop(Pid, Workers) ->
+watchdog_loop(#watchdog_state{} = St) ->
     receive
-        {add_worker, Pid, Worker} ->
-            cleaner_loop(Pid, [Worker | Workers]);
-        {'DOWN', _, _, Pid, _} ->
-            fabric_util:cleanup(Workers)
+        {?ADD_WORKER, Pid, Worker} when Pid == St#watchdog_state.coordinator ->
+            Workers = St#watchdog_state.workers,
+            watchdog_loop(St#watchdog_state{workers = [Worker | Workers]});
+        ?WATCHDOG_KICK when ?WATCHDOG_IS_ENABLED ->
+            watchdog_loop(St#watchdog_state{idle = false});
+        ?WATCHDOG_KICK ->
+            watchdog_loop(St);
+        ?WATCHDOG_TIMEOUT when ?WATCHDOG_IS_ENABLED, ?WATCHDOG_IS_IDLE ->
+            couch_log:warning(
+                "watchdog ~p detected idle interval, killing ~p",
+                [self(), St#watchdog_state.coordinator]
+            ),
+            exit(St#watchdog_state.coordinator, kill),

Review Comment:
   minor nit: wonder if there is a useful difference between `kill` and other 
exits (say `shutdown`) as we do some handler try...catch can logging/cleanup in 
[chttpd](https://github.com/apache/couchdb/blob/main/src/chttpd/src/chttpd.erl#L397)
 an others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to