nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r110466537
 
 

 ##########
 File path: src/couch_replicator/src/couch_replicator_clustering.erl
 ##########
 @@ -0,0 +1,218 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Maintain cluster membership and stability notifications for replications.
+% On changes to cluster membership, broadcast events to `replication` 
gen_event.
+% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
+%
+% Cluster stability is defined as "there have been no nodes added or removed in
+% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
+% speedier startup, during initialization there is a shorter 
StartupQuietPeriod in
+% effect (also configurable).
+%
+% This module is also in charge of calculating ownership of replications based 
on
+% where their _repicator db documents shards live.
+
+
+-module(couch_replicator_clustering).
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+% public API
+-export([start_link/0, owner/2, is_stable/0]).
+-export([link_cluster_event_listener/1]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2,
+         code_change/3, terminate/2]).
+
+% config_listener callbacks
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(DEFAULT_QUIET_PERIOD, 60). % seconds
+-define(DEFAULT_START_PERIOD, 5). % seconds
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+    start_time :: erlang:timestamp(),
+    last_change :: erlang:timestamp(),
+    period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
+    start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
+    timer :: timer:tref()
+}).
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+% owner/2 function computes ownership for a {DbName, DocId} tuple
+% `unstable` if cluster is considered to be unstable i.e. it has changed
+% recently, or returns node() which of the owner.
+%
+-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
+owner(<<"shards/", _/binary>> = DbName, DocId) ->
+    case is_stable() of
+        false ->
+            unstable;
+        true ->
+            owner_int(DbName, DocId)
+    end;
+owner(_DbName, _DocId) ->
+    node().
+
+
+-spec is_stable() -> true | false.
+is_stable() ->
+    gen_server:call(?MODULE, is_stable).
+
+
+% Convenience function for gen_servers to subscribe to {cluster, stable} and
+% {cluster, unstable} events from couch_replicator clustering module.
+-spec link_cluster_event_listener(pid()) -> pid().
+link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
+    CallbackFun =
+        fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event);
+           (_) -> ok
+        end,
+    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
+    Pid.
+
+
+% gen_server callbacks
+
+init([]) ->
+    net_kernel:monitor_nodes(true),
+    ok = config:listen_for_changes(?MODULE, nil),
+    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
+        ?DEFAULT_QUIET_PERIOD)),
+    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
+        ?DEFAULT_START_PERIOD)),
+    couch_log:debug("Initialized clustering gen_server ~w", [self()]),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    {ok, #state{
+        start_time = os:timestamp(),
+        last_change = os:timestamp(),
+        period = Period,
+        start_period = StartPeriod,
+        timer = new_timer(StartPeriod)
+    }}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_call(is_stable, _From, State) ->
+    {reply, is_stable(State), State}.
+
+
+handle_cast({set_period, QuietPeriod}, State) when
+    is_integer(QuietPeriod), QuietPeriod > 0 ->
+    {noreply, State#state{period = QuietPeriod}}.
+
+
+handle_info({nodeup, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+
+handle_info({nodedown, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+
+handle_info(stability_check, State) ->
+   timer:cancel(State#state.timer),
+   case is_stable(State) of
+       true ->
+           couch_replicator_notifier:notify({cluster, stable}),
+           couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
+           couch_log:notice("~s : publishing cluster `stable` event", 
[?MODULE]),
+           {noreply, State};
+       false ->
+           Timer = new_timer(interval(State)),
+           {noreply, State#state{timer = Timer}}
+   end;
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+%% Internal functions
+
+-spec new_timer(non_neg_integer()) -> timer:tref().
+new_timer(IntervalSec) ->
+    {ok, Timer} = timer:send_after(IntervalSec * 1000, stability_check),
+    Timer.
+
+
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = Period0, start_time = T0}) ->
+    case now_diff_sec(T0) > Period of
 
 Review comment:
   Naming start period Period0 was silly. The logic also assumes that start 
period is a shorter than normal period. 
   
   So startup we want to wait a bit for nodes to connect and cluster to 
stabilize, but not for whole minute. But if cluster has been up for a while, 
and we notice membership changes, chances are there are probably restarts 
happening so the wait is longer.
   
   The logic there says, if we time since startup is greater than a full 
periods (minutes) then use normal period to wait. But if time since startup is 
short (seconds) assume we are in the start phase so just wait a few seconds 
before rescan.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to