davisp commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r110437978
 
 

 ##########
 File path: src/couch_replicator/src/couch_replicator_clustering.erl
 ##########
 @@ -0,0 +1,218 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Maintain cluster membership and stability notifications for replications.
+% On changes to cluster membership, broadcast events to `replication` 
gen_event.
+% Listeners will get `{cluster, stable}` or `{cluster, unstable}` events.
+%
+% Cluster stability is defined as "there have been no nodes added or removed in
+% last `QuietPeriod` seconds". QuietPeriod value is configurable. To ensure a
+% speedier startup, during initialization there is a shorter 
StartupQuietPeriod in
+% effect (also configurable).
+%
+% This module is also in charge of calculating ownership of replications based 
on
+% where their _repicator db documents shards live.
+
+
+-module(couch_replicator_clustering).
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+% public API
+-export([start_link/0, owner/2, is_stable/0]).
+-export([link_cluster_event_listener/1]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2,
+         code_change/3, terminate/2]).
+
+% config_listener callbacks
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+-define(DEFAULT_QUIET_PERIOD, 60). % seconds
+-define(DEFAULT_START_PERIOD, 5). % seconds
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+    start_time :: erlang:timestamp(),
+    last_change :: erlang:timestamp(),
+    period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
+    start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
+    timer :: timer:tref()
+}).
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+% owner/2 function computes ownership for a {DbName, DocId} tuple
+% `unstable` if cluster is considered to be unstable i.e. it has changed
+% recently, or returns node() which of the owner.
+%
+-spec owner(Dbname :: binary(), DocId :: binary()) -> node() | unstable.
+owner(<<"shards/", _/binary>> = DbName, DocId) ->
+    case is_stable() of
+        false ->
+            unstable;
+        true ->
+            owner_int(DbName, DocId)
+    end;
+owner(_DbName, _DocId) ->
+    node().
+
+
+-spec is_stable() -> true | false.
+is_stable() ->
+    gen_server:call(?MODULE, is_stable).
+
+
+% Convenience function for gen_servers to subscribe to {cluster, stable} and
+% {cluster, unstable} events from couch_replicator clustering module.
+-spec link_cluster_event_listener(pid()) -> pid().
+link_cluster_event_listener(GenServer) when is_pid(GenServer) ->
+    CallbackFun =
+        fun(Event = {cluster, _}) -> gen_server:cast(GenServer, Event);
+           (_) -> ok
+        end,
+    {ok, Pid} = couch_replicator_notifier:start_link(CallbackFun),
+    Pid.
+
+
+% gen_server callbacks
+
+init([]) ->
+    net_kernel:monitor_nodes(true),
+    ok = config:listen_for_changes(?MODULE, nil),
+    Period = abs(config:get_integer("replicator", "cluster_quiet_period",
+        ?DEFAULT_QUIET_PERIOD)),
+    StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
+        ?DEFAULT_START_PERIOD)),
+    couch_log:debug("Initialized clustering gen_server ~w", [self()]),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    {ok, #state{
+        start_time = os:timestamp(),
+        last_change = os:timestamp(),
+        period = Period,
+        start_period = StartPeriod,
+        timer = new_timer(StartPeriod)
+    }}.
+
+
+terminate(_Reason, _State) ->
+    ok.
+
+
+handle_call(is_stable, _From, State) ->
+    {reply, is_stable(State), State}.
+
+
+handle_cast({set_period, QuietPeriod}, State) when
+    is_integer(QuietPeriod), QuietPeriod > 0 ->
+    {noreply, State#state{period = QuietPeriod}}.
+
+
+handle_info({nodeup, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+
+handle_info({nodedown, Node}, State) ->
+    Timer = new_timer(interval(State)),
+    couch_replicator_notifier:notify({cluster, unstable}),
+    couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+    couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
+    {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+
+handle_info(stability_check, State) ->
+   timer:cancel(State#state.timer),
+   case is_stable(State) of
+       true ->
+           couch_replicator_notifier:notify({cluster, stable}),
+           couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
+           couch_log:notice("~s : publishing cluster `stable` event", 
[?MODULE]),
+           {noreply, State};
+       false ->
+           Timer = new_timer(interval(State)),
+           {noreply, State#state{timer = Timer}}
+   end;
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+%% Internal functions
+
+-spec new_timer(non_neg_integer()) -> timer:tref().
+new_timer(IntervalSec) ->
+    {ok, Timer} = timer:send_after(IntervalSec * 1000, stability_check),
+    Timer.
+
+
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = Period0, start_time = T0}) ->
+    case now_diff_sec(T0) > Period of
 
 Review comment:
   Another bug I spotted and forgot about, I'm pretty sure Period and Period0 
are backwards here. Also, I really think we should probably rename the name 
bindings because obviously either me or the code is wrong and its hard to know 
which (though I'm pretty sure its the code :).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to