davisp commented on a change in pull request #3364:
URL: https://github.com/apache/couchdb/pull/3364#discussion_r581400024



##########
File path: rel/overlay/etc/default.ini
##########
@@ -482,6 +482,33 @@ ssl_certificate_max_depth = 3
 ; or 403 response this setting is not needed.
 ;session_refresh_interval_sec = 550
 
+; Usage coefficient decays historic fair share usage every scheduling
+; cycle. The value must be between 0.0 and 1.0. Lower values will
+; ensure historic usage decays quicker and higher values means it will
+; be remembered longer.
+;usage_coeff = 0.5
+
+; Priority coefficient decays all the fair share job priorities such
+; that they uniformly drift towards the front of the run queue. At the
+; default value of 0.98 it will take about 430 scheduler cycle (7
+; hours) for a single job which ran for 1 minute to drift towards the
+; front of the queue (get assigned priority 0). 7 hours was picked as
+; it is close the maximum error backoff interval of about 8 hours. The
+; value must be between 0.0 and 1.0. A too low of a value, coupled
+; with a lower max jobs or churn parameter could end up making the
+; majority of job priority 0 too quickly and canceling the effect of
+; the fair share algorithm.
+;priority_coeff = 0.98
+
+
+[replicator.shares]
+; Fair share configuration section. More shares result in a higher
+; chance that jobs from that db get to run. The default value is 100,
+; minimum is 1 and maximum is 1000. The configuration may be set even
+; if the database wasn't created yet.

Review comment:
       Minor language nit, I'd simplify to "if the database does not exist."

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of

Review comment:
       This 7 hour comment is slightly worrisome. My guess is that you're 
talking about in some extreme case where a cluster has significantly more 
replication jobs than bandwidth for processing them, however, folks reading 
that their replications make pause for 7 hours at a time is likely not good. 
I'd wager we just need to reword in what situations this 7 hour thing comes 
into effect.

##########
File path: src/couch_replicator/src/couch_replicator_scheduler.erl
##########
@@ -734,6 +775,24 @@ reset_job_process(#job{} = Job) ->
 
 -spec reschedule(#state{}) -> ok.
 reschedule(State) ->
+    % Charge all running jobs for the current interval
+    RunningJobs = running_jobs(),
+    Now = os:timestamp(),
+    lists:foreach(fun(Job) ->
+        couch_replicator_share:charge(Job, State#state.interval, Now)
+    end, RunningJobs),
+
+    % Update usage table
+    couch_replicator_share:update_usage(),
+
+    % Decay all the process priorities
+    couch_replicator_share:decay_priorities(),
+
+    % Adjust running process priorities
+    lists:foreach(fun(Job) ->
+        couch_replicator_share:update_priority(Job)
+    end, RunningJobs),

Review comment:
       This whole hunk feels as though it should be a single call to 
`couch_replicator_share` and that the multiple calls are leaking implementation 
internals outside of that module.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).

Review comment:
       How does this behave if I delete the shares key when jobs exist? Granted 
I may be about to read what happens.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))

Review comment:
       Probably want to guard on user input here.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).
+    update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+    Key = key(Job),
+    ets:delete(?PRIORITIES, Job#job.id),
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+        N when is_integer(N), N =< 0 ->
+            ets:delete(?NUM_JOBS, Key);
+        N when is_integer(N), N > 0 ->
+            ok
+    end,
+    ok.
+
+
+priority(JobId) ->
+    % Not found means it was removed because it's value was 0
+    case ets:lookup(?PRIORITIES, JobId) of
+        [{_, Priority}] -> Priority;
+        [] -> 0
+    end.
+
+
+usage(Key) ->
+    case ets:lookup(?USAGE, Key) of
+        [{_, Usage}] -> Usage;
+        [] -> 0
+    end.
+
+
+num_jobs(Key) ->
+    case ets:lookup(?NUM_JOBS, Key) of
+        [{_, NumJobs}] -> NumJobs;
+        [] -> 0
+    end.
+
+
+shares(Key) ->
+    case ets:lookup(?SHARES, Key) of
+        [{_, Shares}] -> Shares;
+        [] -> ?DEFAULT_SHARES
+    end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+    0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+    Key = key(Job),
+    Charges = job_charges(Job, Interval, Now),
+    ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+    decay(?PRIORITIES, priority_coeff()),
+    % If priority becomes 0, it's removed. When looking it up, if it
+    % is missing we assume it is 0
+    clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+    Id = Job#job.id,
+    Key = key(Job),
+    Shares = shares(Key),
+    Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+    ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+    decay(?USAGE, usage_coeff()),
+    clear_zero(?USAGE),
+    ets:foldl(fun({Key, Charges}, _) ->
+        ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+    end, 0, ?CHARGES),
+    % Start each interval with a fresh charges table
+    ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+    Head = {'$1', '$2'},
+    Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
+    ets:select_replace(Ets, [{Head, [], [Result]}]).
+
+
+clear_zero(Ets) when is_atom(Ets) ->
+    ets:select_delete(Ets, [{{'_', 0}, [], [true]}]).

Review comment:
       Reckon it makes sense to do a `=<` comparison instead?

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).
+    update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+    Key = key(Job),
+    ets:delete(?PRIORITIES, Job#job.id),
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+        N when is_integer(N), N =< 0 ->
+            ets:delete(?NUM_JOBS, Key);
+        N when is_integer(N), N > 0 ->
+            ok
+    end,
+    ok.
+
+
+priority(JobId) ->
+    % Not found means it was removed because it's value was 0
+    case ets:lookup(?PRIORITIES, JobId) of
+        [{_, Priority}] -> Priority;
+        [] -> 0
+    end.
+
+
+usage(Key) ->
+    case ets:lookup(?USAGE, Key) of
+        [{_, Usage}] -> Usage;
+        [] -> 0
+    end.
+
+
+num_jobs(Key) ->
+    case ets:lookup(?NUM_JOBS, Key) of
+        [{_, NumJobs}] -> NumJobs;
+        [] -> 0
+    end.
+
+
+shares(Key) ->
+    case ets:lookup(?SHARES, Key) of
+        [{_, Shares}] -> Shares;
+        [] -> ?DEFAULT_SHARES
+    end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+    0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+    Key = key(Job),
+    Charges = job_charges(Job, Interval, Now),
+    ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+    decay(?PRIORITIES, priority_coeff()),
+    % If priority becomes 0, it's removed. When looking it up, if it
+    % is missing we assume it is 0
+    clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+    Id = Job#job.id,
+    Key = key(Job),
+    Shares = shares(Key),
+    Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+    ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+    decay(?USAGE, usage_coeff()),
+    clear_zero(?USAGE),
+    ets:foldl(fun({Key, Charges}, _) ->
+        ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+    end, 0, ?CHARGES),
+    % Start each interval with a fresh charges table
+    ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+    Head = {'$1', '$2'},
+    Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},

Review comment:
       Sneaky sneaky sneaky.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).
+    update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+    Key = key(Job),
+    ets:delete(?PRIORITIES, Job#job.id),
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+        N when is_integer(N), N =< 0 ->
+            ets:delete(?NUM_JOBS, Key);
+        N when is_integer(N), N > 0 ->
+            ok
+    end,
+    ok.
+
+
+priority(JobId) ->
+    % Not found means it was removed because it's value was 0
+    case ets:lookup(?PRIORITIES, JobId) of
+        [{_, Priority}] -> Priority;
+        [] -> 0
+    end.
+
+
+usage(Key) ->
+    case ets:lookup(?USAGE, Key) of
+        [{_, Usage}] -> Usage;
+        [] -> 0
+    end.
+
+
+num_jobs(Key) ->
+    case ets:lookup(?NUM_JOBS, Key) of
+        [{_, NumJobs}] -> NumJobs;
+        [] -> 0
+    end.
+
+
+shares(Key) ->
+    case ets:lookup(?SHARES, Key) of
+        [{_, Shares}] -> Shares;
+        [] -> ?DEFAULT_SHARES
+    end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+    0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+    Key = key(Job),
+    Charges = job_charges(Job, Interval, Now),
+    ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+    decay(?PRIORITIES, priority_coeff()),
+    % If priority becomes 0, it's removed. When looking it up, if it
+    % is missing we assume it is 0
+    clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+    Id = Job#job.id,
+    Key = key(Job),
+    Shares = shares(Key),
+    Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+    ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+    decay(?USAGE, usage_coeff()),
+    clear_zero(?USAGE),
+    ets:foldl(fun({Key, Charges}, _) ->
+        ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+    end, 0, ?CHARGES),
+    % Start each interval with a fresh charges table
+    ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+    Head = {'$1', '$2'},
+    Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
+    ets:select_replace(Ets, [{Head, [], [Result]}]).
+
+
+clear_zero(Ets) when is_atom(Ets) ->
+    ets:select_delete(Ets, [{{'_', 0}, [], [true]}]).
+
+
+key(#job{} = Job) ->
+    Rep = Job#job.rep,
+    case is_binary(Rep#rep.db_name) of
+        true -> mem3:dbname(Rep#rep.db_name);
+        false -> (Rep#rep.user_ctx)#user_ctx.name
+    end.
+
+
+% Jobs are charged based on the amount of time the job was running during the
+% last scheduling interval. The time units used are microseconds in order to
+% have a large enough usage values so that when priority is calculated the
+% rounded value won't be rounded off to 0 easily. The formula for the priority
+% calculation is:
+%
+%    Priority = (Usage * NumJobs) / Shares^2
+%
+% Then in the worst case of a single job in the db, running only for one
+% second,for one job, with 1000 (max) shares, the priority would be:
+%
+%    1000000 * 1 / (1000^2) = 1
+%
+job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) ->
+    TimeRunning = timer:now_diff(Now, last_started(Job)),
+    IntervalUSec = IntervalMSec * 1000,

Review comment:
       The unit conversion here strikes me as not awesome since the conversion 
logic is implicitly shared between modules rather than being an internal detail 
of this module.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).

Review comment:
       You might leave a note that update_counter applies the increment even if 
the default `{Key, 0}` is needed. I had to double check the docs to make sure 
my memory was correct.

##########
File path: src/couch_replicator/src/couch_replicator_share.erl
##########
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy
+% Kay and Piers Lauder [1].
+%
+% [1] : 
https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
+%
+
+-module(couch_replicator_share).
+
+-export([
+    init/0,
+    clear/0,
+
+    update_shares/2,
+    reset_shares/1,
+
+    job_added/1,
+    job_removed/1,
+
+    priority/1,
+    usage/1,
+    num_jobs/1,
+    shares/1,
+
+    charge/3,
+
+    decay_priorities/0,
+    update_priority/1,
+    update_usage/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+
+% Usage coefficient decays historic usage every scheduling cycle. For example,
+% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
+% minute). If the job stops running it will take about 26 cycles (minutes) for
+% it to decay to 0 and the system to "forget" about it completely:
+%
+%  trunc(60000000 * math:pow(0.5, 26)) = 0
+%
+-define(DEFAULT_USAGE_COEFF, 0.5).
+
+% Priority coefficient decays all the job priorities such that they slowly
+% drift towards the front of the run queue. The priority value for a single job
+% which ran one for 1 minute scheduler cycle and has the default number of 100
+% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take
+% about 430 cycles i.e. about 7 hours for the job to drift towards the front of
+% the queue:
+%
+%   trunc(6000 * math:pow(0.98, 431)) = 0
+%   430 / 60 = 7.2 hrs
+%
+-define(DEFAULT_PRIORITY_COEFF, 0.98).
+
+
+-define(MIN_SHARES, 1).
+-define(MAX_SHARES, 1000).
+-define(DEFAULT_SHARES, 100).
+
+-define(SHARES, couch_replicator_shares).
+-define(PRIORITIES, couch_replicator_priorities).
+-define(USAGE, couch_replicator_usage).
+-define(CHARGES, couch_replicator_stopped_usage).
+-define(NUM_JOBS, couch_replicator_num_jobs).
+
+
+init() ->
+    EtsOpts = [named_table, public],
+    ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares}
+    ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority}
+    ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage}
+    ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges}
+    ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs}
+    lists:foreach(fun({K, V}) ->
+        update_shares(list_to_binary(K), list_to_integer(V))
+    end, config:get("replicator.shares")).
+
+
+clear() ->
+    Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
+    lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
+
+
+% This should be called when user updates the replicator.shares config section
+%
+update_shares(Key, Shares) when is_integer(Shares) ->
+    ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}).
+
+
+% Called when the config value is deleted and shares are reset to the default
+% value.
+reset_shares(Key) ->
+    ets:delete(?SHARES, Key).
+
+
+job_added(#job{} = Job) ->
+    Key = key(Job),
+    ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
+    % Update job's priority as if it ran during one scheduler cycle. This is so
+    % new jobs don't get to be at priority 0 (highest).
+    update_priority(Job).
+
+
+job_removed(#job{} = Job) ->
+    Key = key(Job),
+    ets:delete(?PRIORITIES, Job#job.id),
+    case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
+        N when is_integer(N), N =< 0 ->
+            ets:delete(?NUM_JOBS, Key);
+        N when is_integer(N), N > 0 ->
+            ok
+    end,
+    ok.
+
+
+priority(JobId) ->
+    % Not found means it was removed because it's value was 0
+    case ets:lookup(?PRIORITIES, JobId) of
+        [{_, Priority}] -> Priority;
+        [] -> 0
+    end.
+
+
+usage(Key) ->
+    case ets:lookup(?USAGE, Key) of
+        [{_, Usage}] -> Usage;
+        [] -> 0
+    end.
+
+
+num_jobs(Key) ->
+    case ets:lookup(?NUM_JOBS, Key) of
+        [{_, NumJobs}] -> NumJobs;
+        [] -> 0
+    end.
+
+
+shares(Key) ->
+    case ets:lookup(?SHARES, Key) of
+        [{_, Shares}] -> Shares;
+        [] -> ?DEFAULT_SHARES
+    end.
+
+
+charge(#job{pid = undefined}, _, _) ->
+    0;
+
+charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
+    Key = key(Job),
+    Charges = job_charges(Job, Interval, Now),
+    ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
+
+
+% In [1] this described in the "Decay of Process Priorities" section
+%
+decay_priorities() ->
+    decay(?PRIORITIES, priority_coeff()),
+    % If priority becomes 0, it's removed. When looking it up, if it
+    % is missing we assume it is 0
+    clear_zero(?PRIORITIES).
+
+
+% This is the main part of the alrgorithm. In [1] it is described in the
+% "Priority Adjustment" section.
+%
+update_priority(#job{} = Job) ->
+    Id = Job#job.id,
+    Key = key(Job),
+    Shares = shares(Key),
+    Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
+    ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
+
+
+% This is the "User-Level Scheduling" part from [1]
+%
+update_usage() ->
+    decay(?USAGE, usage_coeff()),
+    clear_zero(?USAGE),
+    ets:foldl(fun({Key, Charges}, _) ->
+        ets:update_counter(?USAGE, Key, Charges, {Key, 0})
+    end, 0, ?CHARGES),
+    % Start each interval with a fresh charges table
+    ets:delete_all_objects(?CHARGES).
+
+
+% Private helper functions
+
+decay(Ets, Coeff) when is_atom(Ets) ->
+    Head = {'$1', '$2'},
+    Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
+    ets:select_replace(Ets, [{Head, [], [Result]}]).
+
+
+clear_zero(Ets) when is_atom(Ets) ->
+    ets:select_delete(Ets, [{{'_', 0}, [], [true]}]).
+
+
+key(#job{} = Job) ->
+    Rep = Job#job.rep,
+    case is_binary(Rep#rep.db_name) of
+        true -> mem3:dbname(Rep#rep.db_name);
+        false -> (Rep#rep.user_ctx)#user_ctx.name
+    end.
+
+
+% Jobs are charged based on the amount of time the job was running during the
+% last scheduling interval. The time units used are microseconds in order to
+% have a large enough usage values so that when priority is calculated the
+% rounded value won't be rounded off to 0 easily. The formula for the priority
+% calculation is:
+%
+%    Priority = (Usage * NumJobs) / Shares^2
+%
+% Then in the worst case of a single job in the db, running only for one
+% second,for one job, with 1000 (max) shares, the priority would be:
+%
+%    1000000 * 1 / (1000^2) = 1
+%
+job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) ->
+    TimeRunning = timer:now_diff(Now, last_started(Job)),
+    IntervalUSec = IntervalMSec * 1000,
+    min(IntervalUSec, max(0, TimeRunning)).
+
+
+last_started(#job{} = Job) ->
+    case lists:keyfind(started, 1, Job#job.history) of
+        false -> {0, 0, 0};  % In case user set too low of a max history
+        {started, When} -> When
+    end.
+
+
+% Config helper functions
+
+priority_coeff() ->
+    % This is the K2 coefficient from [1]
+    Default = ?DEFAULT_PRIORITY_COEFF,
+    Val = float_val(config:get("replicator", "priority_coeff"), Default),
+    max(0.0, min(1.0, Val)).

Review comment:
       Minor nit, but a helper of `bounded(0.0, 1.0, Val)` or something of that 
nature might be slightly easier to read since this min/max snippet is repeated 
a number of times.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to