davisp commented on a change in pull request #470: Scheduling Replicator URL: https://github.com/apache/couchdb/pull/470#discussion_r110428590
########## File path: src/couch_replicator/src/couch_replicator_scheduler.erl ########## @@ -0,0 +1,1380 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler). +-behaviour(gen_server). +-behaviour(config_listener). +-vsn(1). + +-include("couch_replicator_scheduler.hrl"). +-include("couch_replicator.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +%% public api +-export([start_link/0, add_job/1, remove_job/1, reschedule/0]). +-export([rep_state/1, find_jobs_by_dbname/1, find_jobs_by_doc/2]). +-export([job_summary/2, health_threshold/0]). +-export([jobs/0, job/1]). + +%% gen_server callbacks +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). +-export([format_status/2]). + +%% config_listener callback +-export([handle_config_change/5, handle_config_terminate/3]). + +%% types +-type event_type() :: added | started | stopped | {crashed, any()}. +-type event() :: {Type:: event_type(), When :: erlang:timestamp()}. +-type history() :: nonempty_list(event()). + +%% definitions +-define(MAX_BACKOFF_EXPONENT, 10). +-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000). +-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60). +-define(RELISTEN_DELAY, 5000). + +-define(DEFAULT_MAX_JOBS, 500). +-define(DEFAULT_MAX_CHURN, 20). +-define(DEFAULT_MAX_HISTORY, 20). +-define(DEFAULT_SCHEDULER_INTERVAL, 60000). +-record(state, {interval, timer, max_jobs, max_churn, max_history}). +-record(job, { + id :: job_id() | '$1' | '_', + rep :: #rep{} | '_', + pid :: undefined | pid() | '$1' | '_', + monitor :: undefined | reference() | '_', + history :: history() | '_'}). + +-record(stats_acc, { + now :: erlang:timestamp(), + pending_t = 0 :: non_neg_integer(), + running_t = 0 :: non_neg_integer(), + crashed_t = 0 :: non_neg_integer(), + pending_n = 0 :: non_neg_integer(), + running_n = 0 :: non_neg_integer(), + crashed_n = 0 :: non_neg_integer()}). + + +%% public functions + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +-spec add_job(#rep{}) -> ok. +add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> + Job = #job{ + id = Rep#rep.id, + rep = Rep, + history = [{added, os:timestamp()}]}, + gen_server:call(?MODULE, {add_job, Job}, infinity). + + +-spec remove_job(job_id()) -> ok. +remove_job(Id) -> + gen_server:call(?MODULE, {remove_job, Id}, infinity). + + +-spec reschedule() -> ok. +% Trigger a manual reschedule. Used for testing and/or ops. +reschedule() -> + gen_server:call(?MODULE, reschedule, infinity). + + +-spec rep_state(rep_id()) -> #rep{} | nil. +rep_state(RepId) -> + case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of + {'EXIT',{badarg, _}} -> + nil; + Rep -> + Rep + end. + + +-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil. +job_summary(JobId, HealthThreshold) -> + case job_by_id(JobId) of + {ok, #job{pid = Pid, history = History, rep = Rep}} -> + ErrorCount = consecutive_crashes(History, HealthThreshold), + {State, Info} = case {Pid, ErrorCount} of + {undefined, 0} -> + {pending, null}; + {undefined, ErrorCount} when ErrorCount > 0 -> + [{{crashed, Error}, _When} | _] = History, + ErrMsg = couch_replicator_utils:rep_error_to_binary(Error), + {crashing, ErrMsg}; + {Pid, ErrorCount} when is_pid(Pid) -> + {running, null} + end, + [ + {source, iolist_to_binary(ejson_url(Rep#rep.source))}, + {target, iolist_to_binary(ejson_url(Rep#rep.target))}, + {state, State}, + {info, Info}, + {error_count, ErrorCount}, + {last_updated, last_updated(History)}, + {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}, + {proxy, job_proxy_url(Rep#rep.source)} + ]; + {error, not_found} -> + nil % Job might have just completed + end. + + +job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) -> + list_to_binary(couch_util:url_strip_password(ProxyUrl)); + +job_proxy_url(_Endpoint) -> + null. + + +-spec health_threshold() -> non_neg_integer(). +health_threshold() -> + config:get_integer("replicator", "health_threshold", + ?DEFAULT_HEALTH_THRESHOLD_SEC). + + +-spec find_jobs_by_dbname(binary()) -> list(#rep{}). +find_jobs_by_dbname(DbName) -> + Rep = #rep{db_name = DbName, _ = '_'}, + MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, + [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. + + +-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}). +find_jobs_by_doc(DbName, DocId) -> + Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, + MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, + [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. + + + + +%% gen_server functions + +init(_) -> + EtsOpts = [named_table, {read_concurrency, true}, {keypos, #job.id}], + ?MODULE = ets:new(?MODULE, EtsOpts), + ok = config:listen_for_changes(?MODULE, nil), + Interval = config:get_integer("replicator", "interval", ?DEFAULT_SCHEDULER_INTERVAL), + MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS), + MaxChurn = config:get_integer("replicator", "max_churn", ?DEFAULT_MAX_CHURN), + MaxHistory = config:get_integer("replicator", "max_history", ?DEFAULT_MAX_HISTORY), + {ok, Timer} = timer:send_after(Interval, reschedule), + State = #state{ + interval = Interval, + max_jobs = MaxJobs, + max_churn = MaxChurn, + max_history = MaxHistory, + timer = Timer + }, + {ok, State}. + + +handle_call({add_job, Job}, _From, State) -> + ok = maybe_remove_job_int(Job#job.id, State), + true = add_job_int(Job), + ok = maybe_start_newly_added_job(Job, State), + couch_stats:increment_counter([couch_replicator, jobs, adds]), + TotalJobs = ets:info(?MODULE, size), + couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs), + {reply, ok, State}; + +handle_call({remove_job, Id}, _From, State) -> + ok = maybe_remove_job_int(Id, State), + {reply, ok, State}; + +handle_call(reschedule, _From, State) -> + ok = reschedule(State), + {reply, ok, State}; + +handle_call(_, _From, State) -> + {noreply, State}. + + +handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), MaxJobs >= 0 -> + couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]), + {noreply, State#state{max_jobs = MaxJobs}}; + +handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), MaxChurn > 0 -> + couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]), + {noreply, State#state{max_churn = MaxChurn}}; + +handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), MaxHistory > 0 -> + couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]), + {noreply, State#state{max_history = MaxHistory}}; + +handle_cast({set_interval, Interval}, State) when is_integer(Interval), Interval > 0 -> + couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), + {noreply, State#state{interval = Interval}}; + +handle_cast(_, State) -> + {noreply, State}. + + +handle_info(reschedule, State) -> + ok = reschedule(State), + {ok, cancel} = timer:cancel(State#state.timer), + {ok, Timer} = timer:send_after(State#state.interval, reschedule), + {noreply, State#state{timer = Timer}}; + +handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> + {ok, Job} = job_by_pid(Pid), + couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), + remove_job_int(Job), + update_running_jobs_stats(), + {noreply, State}; + +handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> + {ok, Job} = job_by_pid(Pid), + ok = handle_crashed_job(Job, Reason, State), + {noreply, State}; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok. + + +format_status(_Opt, [_PDict, State]) -> + [{max_jobs, State#state.max_jobs}, + {running_jobs, running_job_count()}, + {pending_jobs, pending_job_count()}]. + + +%% config listener functions + +handle_config_change("replicator", "max_jobs", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "max_churn", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "interval", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "max_history", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_history, list_to_integer(V)}), + {ok, S}; + +handle_config_change(_, _, _, _, S) -> + {ok, S}. + + +handle_config_terminate(_, stop, _) -> + ok; + +handle_config_terminate(_, _, _) -> + Pid = whereis(?MODULE), + erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). + + +%% private functions + + +% Handle crashed jobs. Handling differs between transient and permanent jobs. +% Transient jobs are those posted to the _replicate endpoint. They don't have a +% db associated with them. When those jobs crash, they are not restarted. That +% is also consistent with behavior when the node they run on, crashed and they +% do not migrate to other nodes. Permanent jobs are those created from +% replicator documents. Those jobs, once they pass basic validation and end up +% in the scheduler will be retried indefinitely (with appropriate exponential +% backoffs). +-spec handle_crashed_job(#job{}, any(), #state{}) -> ok. +handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, _State) -> + Msg = "~p : Transient job ~p failed, removing. Error: ~p", + ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason), + couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]), + remove_job_int(Job), + update_running_jobs_stats(), + ok; + +handle_crashed_job(Job, Reason, State) -> + ok = update_state_crashed(Job, Reason, State), + case couch_replicator_doc_processor:update_docs() of + true -> + couch_replicator_docs:update_error(Job#job.rep, Reason); + false -> + ok + end, + case ets:info(?MODULE, size) < State#state.max_jobs of + true -> + % Starting pending jobs is an O(TotalJobsCount) operation. Only do + % it if there is a relatively small number of jobs. Otherwise + % scheduler could be blocked if there is a cascade of lots failing + % jobs in a row. + start_pending_jobs(State), + update_running_jobs_stats(), + ok; + false -> + ok + end. + + +% Attempt to start a newly added job. First quickly check if total jobs +% already exceed max jobs, then do a more expensive check which runs a +% select (an O(n) operation) to check pending jobs specifically. +-spec maybe_start_newly_added_job(#job{}, #state{}) -> ok. +maybe_start_newly_added_job(Job, State) -> + MaxJobs = State#state.max_jobs, + TotalJobs = ets:info(?MODULE, size), + case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of + true -> + start_job_int(Job, State), + update_running_jobs_stats(), + ok; + false -> + ok + end. + +% Return up to a given number of oldest, not recently crashed jobs. Try to be +% memory efficient and use ets:foldl to accumulate jobs. +-spec pending_jobs(non_neg_integer()) -> [#job{}]. +pending_jobs(0) -> + % Handle this case as user could set max_churn to 0. If this is passed to + % other function clause it will crash as gb_sets:largest assumes set is not + % empty. + []; + +pending_jobs(Count) when is_integer(Count), Count > 0 -> + Set0 = gb_sets:new(), % [{LastStart, Job},...] + Now = os:timestamp(), + Acc0 = {Set0, Now, Count, health_threshold()}, + {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), + [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. + + +pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> + Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), + gb_sets:size(Set) >= Count} of + {true, true} -> + % Job is healthy but already reached accumulated limit, so might + % have to replace one of the accumulated jobs + pending_maybe_replace(Job, Set); + {true, false} -> + % Job is healthy and we haven't reached the limit, so add job + % to accumulator + gb_sets:add_element({last_started(Job), Job}, Set); + {false, _} -> + % This jobs is not healthy (has crashed too recently), so skip it. + Set + end, + {Set1, Now, Count, HealthThreshold}. + + +% Replace Job in the accumulator if it is older than youngest job there. +pending_maybe_replace(Job, Set) -> + Started = last_started(Job), + {Youngest, YoungestJob} = gb_sets:largest(Set), + case Started < Youngest of + true -> + Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), + gb_sets:add_element({Started, Job}, Set1); + false -> + Set + end. + + +start_jobs(Count, State) -> + [start_job_int(Job, State) || Job <- pending_jobs(Count)], + ok. + + +-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer(). +stop_jobs(Count, IsContinuous, State) -> + Running0 = running_jobs(), + ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, + Running1 = lists:filter(ContinuousPred, Running0), + Running2 = lists:sort(fun oldest_job_first/2, Running1), Review comment: I think this is just confusion, but it seems odd that we're prefering to insert old jobs in pending_maybe_replace/2, but preferring to remove the oldest job when we stop a subset. I'm going to run on the assumption that the use of "oldest" means two different things here. In pending_maybe_replace I'm going to assume its time since last activity, and when here its "been active the longest". Assuming that's the case we may want to rename things. Reading it as is, it sounds like we're stoping and starting the same set of jobs repeatedly and never actually progressing through the queue. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
