davisp commented on a change in pull request #470: Scheduling Replicator URL: https://github.com/apache/couchdb/pull/470#discussion_r110438366
########## File path: src/couch_replicator/src/couch_replicator_clustering.erl ########## @@ -0,0 +1,218 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% Maintain cluster membership and stability notifications for replications. +% On changes to cluster membership, broadcast events to `replication` gen_event. +% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events. +% +% Cluster stability is defined as "there have been no nodes added or removed in +% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a +% speedier startup, during initialization there is a shorter StartupQuietPeriod in +% effect (also configurable). +% +% This module is also in charge of calculating ownership of replications based on +% where their _repicator db documents shards live. + + +-module(couch_replicator_clustering). +-behaviour(gen_server). +-behaviour(config_listener). + +% public API +-export([start_link/0, owner/2, is_stable/0]). +-export([link_cluster_event_listener/1]). + +% gen_server callbacks +-export([init/1, handle_call/3, handle_info/2, handle_cast/2, + code_change/3, terminate/2]). + +% config_listener callbacks +-export([handle_config_change/5, handle_config_terminate/3]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +-define(DEFAULT_QUIET_PERIOD, 60). % seconds +-define(DEFAULT_START_PERIOD, 5). % seconds +-define(RELISTEN_DELAY, 5000). + +-record(state, { + start_time :: erlang:timestamp(), + last_change :: erlang:timestamp(), + period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(), + start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(), + timer :: timer:tref() +}). + + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +% owner/2 function computes ownership for a {DbName, DocId} tuple +% `unstable` if cluster is considered to be unstable i.e. it has changed +% recently, or returns node() which of the owner. +% +-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable. +owner(<<"shards/", _/binary>> = DbName, DocId) -> + case is_stable() of + false -> + unstable; + true -> + owner_int(DbName, DocId) + end; +owner(_DbName, _DocId) -> + node(). + + +-spec is_stable() -> true | false. +is_stable() -> + gen_server:call(?MODULE, is_stable). + + +% Convenience function for gen_servers to subscribe to {cluster, stable} and +% {cluster, unstable} events from couch_replicator clustering module. +-spec link_cluster_event_listener(pid()) -> pid(). +link_cluster_event_listener(GenServer) when is_pid(GenServer) -> + CallbackFun = + fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event); + (_) -> ok + end, + {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun), + Pid. + + +% gen_server callbacks + +init([]) -> + net_kernel:monitor_nodes(true), + ok = config:listen_for_changes(?MODULE, nil), + Period = abs(config:get_integer("replicator", "cluster_quiet_period", + ?DEFAULT_QUIET_PERIOD)), + StartPeriod = abs(config:get_integer("replicator", "cluster_start_period", + ?DEFAULT_START_PERIOD)), + couch_log:debug("Initialized clustering gen_server ~w", [self()]), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + {ok, #state{ + start_time = os:timestamp(), + last_change = os:timestamp(), + period = Period, + start_period = StartPeriod, + timer = new_timer(StartPeriod) + }}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call(is_stable, _From, State) -> + {reply, is_stable(State), State}. + + +handle_cast({set_period, QuietPeriod}, State) when + is_integer(QuietPeriod), QuietPeriod > 0 -> + {noreply, State#state{period = QuietPeriod}}. + + +handle_info({nodeup, Node}, State) -> + Timer = new_timer(interval(State)), + couch_replicator_notifier:notify({cluster, unstable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]), + {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; + +handle_info({nodedown, Node}, State) -> + Timer = new_timer(interval(State)), + couch_replicator_notifier:notify({cluster, unstable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]), + {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; + +handle_info(stability_check, State) -> + timer:cancel(State#state.timer), + case is_stable(State) of + true -> + couch_replicator_notifier:notify({cluster, stable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), + couch_log:notice("~s : publishing cluster `stable` event", [?MODULE]), + {noreply, State}; + false -> + Timer = new_timer(interval(State)), + {noreply, State#state{timer = Timer}} + end; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Internal functions + +-spec new_timer(non_neg_integer()) -> timer:tref(). +new_timer(IntervalSec) -> + {ok, Timer} = timer:send_after(IntervalSec * 1000, stability_check), + Timer. + + +-spec interval(#state{}) -> non_neg_integer(). +interval(#state{period = Period, start_period = Period0, start_time = T0}) -> + case now_diff_sec(T0) > Period of + true -> + % Normal operation + Period; + false -> + % During startup + Period0 + end. + + +-spec is_stable(#state{}) -> boolean(). +is_stable(#state{last_change = TS} = State) -> + now_diff_sec(TS) > interval(State). Review comment: Given that this is in seconds should we go with `>=`? Or not? I can't convince myself either way if equal to period is correct or not or harmless that its interval+1 in reality. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
