[jira] [Commented] (COUCHDB-3324) Scheduling Replicator

2017-04-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/COUCHDB-3324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15968502#comment-15968502
 ] 

ASF GitHub Bot commented on COUCHDB-3324:
-

Github user nickva closed the pull request at:

https://github.com/apache/couchdb-couch-replicator/pull/64


> Scheduling Replicator
> -
>
> Key: COUCHDB-3324
> URL: https://issues.apache.org/jira/browse/COUCHDB-3324
> Project: CouchDB
>  Issue Type: New Feature
>Reporter: Nick Vatamaniuc
>
> Improve CouchDB replicator
>  * Allow running a large number of replication jobs
>  * Improve API with a focus on ease of use and performance. Avoid updating 
> replication document with transient state updates. Instead create a proper 
> API for querying replication states. At the same time provide a compatibility 
> mode to let users keep existing behavior (of getting updates in documents).
>  * Improve network resource usage and performance. Multiple connection to the 
> same cluster could share socket connections
>  * Handle rate limiting on target and source HTTP endpoints. Let replication 
> request auto-discover rate limit capacity based on a proven algorithm such as 
> Additive Increase / Multiplicative Decrease feedback control loop.
>  * Improve performance by avoiding repeatedly retrying failing replication 
> jobs. Instead use exponential backoff. 
>  * Improve recovery from long (but temporary) network failure. Currently if 
> replications jobs fail to start 10 times in a row they will not be retried 
> anymore. This is not always desirable. In case of a long enough DNS (or other 
> network) failure replication jobs will effectively stop until they are 
> manually restarted.
>  * Better handling of filtered replications: Failing to fetch filters could 
> block couch replicator manager, lead to message queue backups and memory 
> exhaustion. Also, when replication filter code changes update replication 
> accordingly (replication job ID should change in that case)
>  * Provide better metrics to introspect replicator behavior.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] couchdb-couch-replicator pull request #64: 63012 scheduler

2017-04-13 Thread nickva
Github user nickva closed the pull request at:

https://github.com/apache/couchdb-couch-replicator/pull/64


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (COUCHDB-3374) 500 instead of 401 when trying to create an index without authorization

2017-04-13 Thread Jonathan Hall (JIRA)
Jonathan Hall created COUCHDB-3374:
--

 Summary: 500 instead of 401 when trying to create an index without 
authorization
 Key: COUCHDB-3374
 URL: https://issues.apache.org/jira/browse/COUCHDB-3374
 Project: CouchDB
  Issue Type: Bug
  Components: HTTP Interface
Reporter: Jonathan Hall


To reproduce, try to create an index without authorization:

curl -v -X POST http://localhost:6001/new/_index -H "Content-Type: 
application/json" -d '{"index":{"fields":["foo"]}}'

result:

< HTTP/1.1 500 Internal Server Error
< Cache-Control: must-revalidate
< Content-Length: 102
< Content-Type: application/json
< Date: Thu, 13 Apr 2017 23:56:09 GMT
< Server: CouchDB/2.0.0 (Erlang OTP/17)
< X-Couch-Request-ID: c404e0504a
< X-CouchDB-Body-Time: 0
< 
{"error":"error_saving_ddoc","reason":"Unknown error while saving the design 
document: unauthorized"}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] wohali commented on issue #2: Add snap packaging that was previously in the main repo

2017-04-13 Thread git
wohali commented on issue #2: Add snap packaging that was previously in the 
main repo
URL: https://github.com/apache/couchdb-pkg/pull/2#issuecomment-294004625
 
 
   Thanks @mhall119 . We'll want to review some of the build dependencies in 
the near future, and standardize the package short and long descriptions with 
our other packages, but this is a good starting point. Please add the header I 
mentioned and I'll merge this straight away.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali commented on a change in pull request #2: Add snap packaging that was previously in the main repo

2017-04-13 Thread git
wohali commented on a change in pull request #2: Add snap packaging that was 
previously in the main repo
URL: https://github.com/apache/couchdb-pkg/pull/2#discussion_r111472499
 
 

 ##
 File path: snap/snap_run
 ##
 @@ -0,0 +1,9 @@
+#!/bin/sh
+
 
 Review comment:
   Please add the Apache license header to this file, as in 
https://github.com/apache/couchdb/blob/master/src/couch/priv/spawnkillable/couchspawnkillable.sh#L3-L17
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali commented on issue #43: Removing _rev

2017-04-13 Thread git
wohali commented on issue #43: Removing _rev
URL: 
https://github.com/apache/couchdb-documentation/pull/43#issuecomment-294002052
 
 
   @timmillwood I'm going to close out this PR for now. If you'd like to 
revisit just submit a new PR and we'll be happy to get it into the official 
docs.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali closed pull request #43: Removing _rev

2017-04-13 Thread git
wohali closed pull request #43: Removing _rev
URL: https://github.com/apache/couchdb-documentation/pull/43
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali closed pull request #122: Additions to _find and _index from original Mango docs PR

2017-04-13 Thread git
wohali closed pull request #122: Additions to _find and _index from original 
Mango docs PR
URL: https://github.com/apache/couchdb-documentation/pull/122
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali commented on issue #122: Additions to _find and _index from original Mango docs PR

2017-04-13 Thread git
wohali commented on issue #122: Additions to _find and _index from original 
Mango docs PR
URL: 
https://github.com/apache/couchdb-documentation/pull/122#issuecomment-294001803
 
 
   +1, thanks!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali closed issue #111: Requesting a CONTRIBUTING.md document

2017-04-13 Thread git
wohali closed issue #111: Requesting a CONTRIBUTING.md document
URL: https://github.com/apache/couchdb-documentation/issues/111
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali closed issue #126: Update published version of docs

2017-04-13 Thread git
wohali closed issue #126: Update published version of docs
URL: https://github.com/apache/couchdb-documentation/issues/126
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wohali commented on issue #126: Update published version of docs

2017-04-13 Thread git
wohali commented on issue #126: Update published version of docs
URL: 
https://github.com/apache/couchdb-documentation/issues/126#issuecomment-294001020
 
 
   Hi @lornajane,
   
   The docs for 2.0.0 are at this point frozen. You can see the correct 
instructions here:
   
   http://docs.couchdb.org/en/latest/contributing.html
   
   When we release 2.1.0 the instructions will update.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] nickva commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111464234
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] nickva commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111456689
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] nickva commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111456624
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] nickva commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111456453
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_doc_processor.erl
 ##
 @@ -0,0 +1,957 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_processor).
+
+-behaviour(gen_server).
+-behaviour(couch_multidb_changes).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3
+]).
+
+-export([
+db_created/2,
+db_deleted/2,
+db_found/2,
+db_change/3
+]).
+
+-export([
+docs/1,
+doc/2,
+update_docs/0,
+get_worker_ref/1,
+notify_cluster_event/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_replicator_utils, [
+get_json_value/2,
+get_json_value/3
+]).
+
+-define(DEFAULT_UPDATE_DOCS, false).
+-define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
+-define(TS_DAY_SEC, 86400).
+
+-type filter_type() ::  nil | view | user | docids | mango.
+-type repstate() :: initializing | error | scheduled.
+
+
+-record(rdoc, {
+id :: db_doc_id() | '_' | {any(), '_'},
+state :: repstate() | '_',
+rep :: #rep{} | nil | '_',
+rid :: rep_id() | nil | '_',
+filter :: filter_type() | '_',
+info :: binary() | nil | '_',
+errcnt :: non_neg_integer() | '_',
+worker :: reference() | nil | '_',
+last_updated :: erlang:timestamp() | '_'
+}).
+
+
+% couch_multidb_changes API callbacks
+
+db_created(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+Server.
+
+
+db_deleted(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
+ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
+Server.
+
+
+db_found(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
+couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+Server.
+
+
+db_change(DbName, {ChangeProps} = Change, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+try
+ok = process_change(DbName, Change)
+catch
+_Tag:Error ->
+{RepProps} = get_json_value(doc, ChangeProps),
+DocId = get_json_value(<<"_id">>, RepProps),
+couch_replicator_docs:update_failed(DbName, DocId, Error)
+end,
+Server.
+
+
+-spec get_worker_ref(db_doc_id()) -> reference() | nil.
+get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+case ets:lookup(?MODULE, {DbName, DocId}) of
+[#rdoc{worker = WRef}] when is_reference(WRef) ->
+WRef;
+[#rdoc{worker = nil}] ->
+nil;
+[] ->
+nil
+end.
+
+
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+gen_server:cast(Server, Event).
+
+
+process_change(DbName, {Change}) ->
+{RepProps} = JsonRepDoc = get_json_value(doc, Change),
+DocId = get_json_value(<<"_id">>, RepProps),
+Owner = couch_replicator_clustering:owner(DbName, DocId),
+Id = {DbName, DocId},
+case {Owner, get_json_value(deleted, Change, false)} of
+{_, true} ->
+ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+{unstable, false} ->
+couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+{ThisNode, false} when ThisNode =:= node() ->
+case get_json_value(<<"_replication_state">>, RepProps) of
+undefined ->
+ok = process_updated(Id, JsonRepDoc);
+<<"triggered">> ->
+maybe_remove_state_fields(DbName, DocId),
+ok = process_updated(Id, JsonRepDoc);
+<<"completed">> ->
+ok = gen_server:call(?MODULE, {completed, Id}, infinity);
+<<"error">> ->
+% Handle replications started from older versions of replicator
+% which wrote transient errors to replication docs
+maybe_remove_state_fields(DbName, DocId),
+ok = process_updated(Id, JsonRepDoc);
+<<"failed">> ->
+ok
+end;
+{Owner, false} ->
+ok
+end,
+ok.
+
+
+maybe_remove_state_fields(DbName, DocId) 

[jira] [Commented] (COUCHDB-3324) Scheduling Replicator

2017-04-13 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/COUCHDB-3324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15968014#comment-15968014
 ] 

ASF subversion and git services commented on COUCHDB-3324:
--

Commit ec36f9f046f13f67f999c1497e75ddb367b45d41 in couchdb's branch 
refs/heads/63012-scheduler from [~vatamane]
[ https://git-wip-us.apache.org/repos/asf?p=couchdb.git;h=ec36f9f ]

Implement fabric-based _scheduler/docs endpoint

Previously _scheduler/docs implementation was not optimal. All documents
were fetched via rpc:multicall from each of the nodes.

Switch implementation to mimic _all_docs behavior. The algorithm is roughly
as follows:

 * chttpd endpoint:
   - parses query args like it does for any view query
   - parses states to filter by, states are kept in the `extra` query arg

 * Call is made to couch_replicator_fabric. This is equivalent to
   fabric:all_docs. Here the typical fabric / rexi setup is happening.

 * Fabric worker is in `couch_replicator_fabric_rpc:docs/3`. This worker is
   similar to fabric_rpc's all_docs handler. However it is a bit more intricate
   to handle both replication document in terminal state as well as those which
   are active.

   - Before emitting it queries the state of the document to see if it is in a
 terminal state. If it is, it filters it and decides if it should be
 emitted or not.

   - If the document state cannot be found from the document. It tries to
 fetch active state from local node's doc processor via key based lookup.
 If it finds, it can also filter it based on state and emit it or skip.

   - If the document cannot be found in the node's local doc processor ETS
 table, the row is emitted with a doc value of `undecided`. This will
 let the coordinator fetch the state by possibly querying other nodes's
 doc processors.

  * Coordinator then starts handling messages. This also mostly mimics all_docs.
At this point the most interesting thing is handling `undecided` docs. If
one is found, then `replicator:active_doc/2` is queried. There, all nodes
where document shards live are queries. This is better than a previous
implementation where all nodes were queries all the time.

  * The final work happens in `couch_replicator_httpd` where the emitting
callback is. There we only the doc is emitted (not keys, rows, values).
Another thing that happens is the `Total` value is decremented to
account for the always-present _design  doc.

Because of this a bunch of stuff was removed. Including an extra view which
was build and managed by the previous implementation.

As a bonus, other view-related parameters such as skip and limit seems to
work out of the box and don't have to be implemented ad-hoc.

Also, most importantly  many thanks to Paul Davis for suggesting this approach.

Jira: COUCHDB-3324


> Scheduling Replicator
> -
>
> Key: COUCHDB-3324
> URL: https://issues.apache.org/jira/browse/COUCHDB-3324
> Project: CouchDB
>  Issue Type: New Feature
>Reporter: Nick Vatamaniuc
>
> Improve CouchDB replicator
>  * Allow running a large number of replication jobs
>  * Improve API with a focus on ease of use and performance. Avoid updating 
> replication document with transient state updates. Instead create a proper 
> API for querying replication states. At the same time provide a compatibility 
> mode to let users keep existing behavior (of getting updates in documents).
>  * Improve network resource usage and performance. Multiple connection to the 
> same cluster could share socket connections
>  * Handle rate limiting on target and source HTTP endpoints. Let replication 
> request auto-discover rate limit capacity based on a proven algorithm such as 
> Additive Increase / Multiplicative Decrease feedback control loop.
>  * Improve performance by avoiding repeatedly retrying failing replication 
> jobs. Instead use exponential backoff. 
>  * Improve recovery from long (but temporary) network failure. Currently if 
> replications jobs fail to start 10 times in a row they will not be retried 
> anymore. This is not always desirable. In case of a long enough DNS (or other 
> network) failure replication jobs will effectively stop until they are 
> manually restarted.
>  * Better handling of filtered replications: Failing to fetch filters could 
> block couch replicator manager, lead to message queue backups and memory 
> exhaustion. Also, when replication filter code changes update replication 
> accordingly (replication job ID should change in that case)
>  * Provide better metrics to introspect replicator behavior.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] lornajane opened a new issue #126: Update published version of docs

2017-04-13 Thread git
lornajane opened a new issue #126: Update published version of docs
URL: https://github.com/apache/couchdb-documentation/issues/126
 
 
   The instructions for contributing to the CouchDB documentation (I'm reading 
this here: http://docs.couchdb.org/en/2.0.0/contributing.html) reference the 
main repo but the docs are now in (this!) separate repo.  The source here has 
already been updated, so would it be possible to regenerate the documentation 
to reflect this and any other recent changes?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wqj2101 opened a new issue #40: Destroying An Undefined Doc Removes the Whole Database

2017-04-13 Thread git
wqj2101 opened a new issue #40: Destroying An Undefined Doc Removes the Whole 
Database
URL: https://github.com/apache/couchdb-nano/issues/40
 
 
   Recently discovered a bug when trying to delete a doc from a CouchDB 
database. The action of deleting a document with "undefined" inputs actually 
removes the whole database. The .destroy() call should really be 
restricted to destroying documents and not the whole DB itself.  If an 
undefined is sent in, I think nano should ideally return an error?  
   
   This is a sample code to reproduce what happens:
   
   const nano = require( 'nano' )( CouchDB );
   const dbName = nano.db.use( 'dbName' );
   
   dbName.destroy( undefined, undefined, ( err ) => {
 **//DB is destroyed at this point**
   } );
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] iilyak commented on a change in pull request #474: 3364 fix view compactor unknown info

2017-04-13 Thread git
iilyak commented on a change in pull request #474: 3364 fix view compactor 
unknown info
URL: https://github.com/apache/couchdb/pull/474#discussion_r111420397
 
 

 ##
 File path: src/couch_index/src/couch_index_compactor.erl
 ##
 @@ -81,6 +85,14 @@ handle_cast(_Mesg, State) ->
 
 handle_info({'EXIT', Pid, normal}, #st{pid=Pid}=State) ->
 {noreply, State#st{pid=undefined}};
+handle_info({'EXIT', Pid, Reason}, #st{pid = Pid} = State) ->
+#st{idx = Idx, mod = Mod} = State,
+{ok, IdxState} = gen_server:call(Idx, {compaction_failed, Reason}),
 
 Review comment:
   > First, there are nothing inherently wrong with using gen_server's call. 
   
   I disagree, for few reasons:
   1. I do believe that message format changes more often
   2. I can use a stable expression for grep (such as `:`) to 
find all places which need to be updated in the case we need to update in case 
we have to.
   3. If we have an API function in the module which implements `gen_server` 
behavior we can use guards to do basic argument validation. This is helpful 
because most of the time we want to crash the caller and not the gen_server 
process.
   
   Anyway I am fine with merging it as is. Since couchdb code base uses 
`gen_server:call` everywhere. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111410619
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111409805
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111409552
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111409432
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111409125
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_scheduler.erl
 ##
 @@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3,
+   format_status/2
+]).
+
+-export([
+   add_job/1,
+   remove_job/1,
+   reschedule/0,
+   rep_state/1,
+   find_jobs_by_dbname/1,
+   find_jobs_by_doc/2,
+   job_summary/2,
+   health_threshold/0,
+   jobs/0,
+   job/1
+]).
+
+%% config_listener callbacks
+-export([
+handle_config_change/5,
+handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 6).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+id :: job_id() | '$1' | '_',
+rep :: #rep{} | '_',
+pid :: undefined | pid() | '$1' | '_',
+monitor :: undefined | reference() | '_',
+history :: history() | '_'
+}).
+
+-record(stats_acc, {
+pending_n = 0 :: non_neg_integer(),
+running_n = 0 :: non_neg_integer(),
+crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+Job = #job{
+id = Rep#rep.id,
+rep = Rep,
+history = [{added, os:timestamp()}]
+},
+gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+{'EXIT',{badarg, _}} ->
+nil;
+Rep ->
+Rep
+end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+case job_by_id(JobId) of
+{ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ErrorCount = consecutive_crashes(History, HealthThreshold),
+{State, Info} = case {Pid, ErrorCount} of
+{undefined, 0}  ->
+{pending, null};
+{undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = 
couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+{Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+end,
+[
+{source, iolist_to_binary(ejson_url(Rep#rep.source))},
+{target, iolist_to_binary(ejson_url(Rep#rep.target))},
+{state, State},
+{info, Info},
+{error_count, ErrorCount},
+{last_updated, last_updated(History)},
+{start_time,
+couch_replicator_utils:iso8601(Rep#rep.start_time)},
+{proxy, job_proxy_url(Rep#rep.source)}
+];
+{error, not_found} ->
+nil  % Job might have just completed
+end.
+
+
+job_proxy_url(#httpdb{

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111407419
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator_doc_processor.erl
 ##
 @@ -0,0 +1,957 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_doc_processor).
+
+-behaviour(gen_server).
+-behaviour(couch_multidb_changes).
+
+-export([
+start_link/0
+]).
+
+-export([
+   init/1,
+   terminate/2,
+   handle_call/3,
+   handle_info/2,
+   handle_cast/2,
+   code_change/3
+]).
+
+-export([
+db_created/2,
+db_deleted/2,
+db_found/2,
+db_change/3
+]).
+
+-export([
+docs/1,
+doc/2,
+update_docs/0,
+get_worker_ref/1,
+notify_cluster_event/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_replicator_utils, [
+get_json_value/2,
+get_json_value/3
+]).
+
+-define(DEFAULT_UPDATE_DOCS, false).
+-define(ERROR_MAX_BACKOFF_EXPONENT, 12).  % ~ 1 day on average
+-define(TS_DAY_SEC, 86400).
+
+-type filter_type() ::  nil | view | user | docids | mango.
+-type repstate() :: initializing | error | scheduled.
+
+
+-record(rdoc, {
+id :: db_doc_id() | '_' | {any(), '_'},
+state :: repstate() | '_',
+rep :: #rep{} | nil | '_',
+rid :: rep_id() | nil | '_',
+filter :: filter_type() | '_',
+info :: binary() | nil | '_',
+errcnt :: non_neg_integer() | '_',
+worker :: reference() | nil | '_',
+last_updated :: erlang:timestamp() | '_'
+}).
+
+
+% couch_multidb_changes API callbacks
+
+db_created(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_created]),
+couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+Server.
+
+
+db_deleted(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]),
+ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity),
+Server.
+
+
+db_found(DbName, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, dbs_found]),
+couch_replicator_docs:ensure_rep_ddoc_exists(DbName),
+Server.
+
+
+db_change(DbName, {ChangeProps} = Change, Server) ->
+couch_stats:increment_counter([couch_replicator, docs, db_changes]),
+try
+ok = process_change(DbName, Change)
+catch
+_Tag:Error ->
+{RepProps} = get_json_value(doc, ChangeProps),
+DocId = get_json_value(<<"_id">>, RepProps),
+couch_replicator_docs:update_failed(DbName, DocId, Error)
+end,
+Server.
+
+
+-spec get_worker_ref(db_doc_id()) -> reference() | nil.
+get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
+case ets:lookup(?MODULE, {DbName, DocId}) of
+[#rdoc{worker = WRef}] when is_reference(WRef) ->
+WRef;
+[#rdoc{worker = nil}] ->
+nil;
+[] ->
+nil
+end.
+
+
+% Cluster membership change notification callback
+-spec notify_cluster_event(pid(), {cluster, any()}) -> ok.
+notify_cluster_event(Server, {cluster, _} = Event) ->
+gen_server:cast(Server, Event).
+
+
+process_change(DbName, {Change}) ->
+{RepProps} = JsonRepDoc = get_json_value(doc, Change),
+DocId = get_json_value(<<"_id">>, RepProps),
+Owner = couch_replicator_clustering:owner(DbName, DocId),
+Id = {DbName, DocId},
+case {Owner, get_json_value(deleted, Change, false)} of
+{_, true} ->
+ok = gen_server:call(?MODULE, {removed, Id}, infinity);
+{unstable, false} ->
+couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]);
+{ThisNode, false} when ThisNode =:= node() ->
+case get_json_value(<<"_replication_state">>, RepProps) of
+undefined ->
+ok = process_updated(Id, JsonRepDoc);
+<<"triggered">> ->
+maybe_remove_state_fields(DbName, DocId),
+ok = process_updated(Id, JsonRepDoc);
+<<"completed">> ->
+ok = gen_server:call(?MODULE, {completed, Id}, infinity);
+<<"error">> ->
+% Handle replications started from older versions of replicator
+% which wrote transient errors to replication docs
+maybe_remove_state_fields(DbName, DocId),
+ok = process_updated(Id, JsonRepDoc);
+<<"failed">> ->
+ok
+end;
+{Owner, false} ->
+ok
+end,
+ok.
+
+
+maybe_remove_state_fields(DbName, DocId) 

[GitHub] iilyak commented on a change in pull request #470: Scheduling Replicator

2017-04-13 Thread git
iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111405816
 
 

 ##
 File path: src/couch_replicator/src/couch_replicator.erl
 ##
 @@ -191,847 +129,314 @@ wait_for_result(RepId) ->
 end.
 
 
-cancel_replication({BaseId, Extension}) ->
-FullRepId = BaseId ++ Extension,
-couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-ok ->
-couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-ok ->
-{ok, {cancelled, ?l2b(FullRepId)}};
-{error, not_found} ->
-{ok, {cancelled, ?l2b(FullRepId)}};
-Error ->
-Error
-end;
-Error ->
-couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, 
Error]),
-Error
-end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-case lists:member(<<"_admin">>, Roles) of
-true ->
-cancel_replication(RepId);
-false ->
-case find_replicator(RepId) of
-{ok, Pid} ->
-case details(Pid) of
-{ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-cancel_replication(RepId);
-{ok, _} ->
-throw({unauthorized,
-<<"Can't cancel a replication triggered by another 
user">>});
-Error ->
-Error
-end;
-Error ->
-Error
-end
+-spec cancel_replication(rep_id()) ->
+{ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+FullRepId = BasedId ++ Extension,
+couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+case couch_replicator_scheduler:rep_state(RepId) of
+#rep{} ->
+ok = couch_replicator_scheduler:remove_job(RepId),
+couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+{ok, {cancelled, ?l2b(FullRepId)}};
+nil ->
+couch_log:notice("Replication '~s' not found", [FullRepId]),
+{error, not_found}
 end.
 
-find_replicator({BaseId, Ext} = _RepId) ->
-case lists:keysearch(
-BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) 
of
-{value, {_, Pid, _, _}} when is_pid(Pid) ->
-{ok, Pid};
-_ ->
-{error, not_found}
-end.
 
-details(Pid) ->
-case (catch gen_server:call(Pid, get_details)) of
-{ok, Rep} ->
-{ok, Rep};
-{'EXIT', {noproc, {gen_server, call, _}}} ->
-{error, not_found};
-Error ->
-throw(Error)
+-spec replication_states() -> [atom()].
+replication_states() ->
+?REPLICATION_STATES.
+
+
+-spec stream_terminal_docs_info(binary(), user_doc_cb(), any(), [atom()]) ->
+any().
+stream_terminal_docs_info(Db, Cb, UserAcc, States) ->
+DDoc = <<"_replicator">>,
+View = <<"terminal_states">>,
+QueryCb = fun handle_replicator_doc_query/2,
+Args = #mrargs{view_type = map, reduce = false},
+Acc = {Db, Cb, UserAcc, States},
+try fabric:query_view(Db, DDoc, View, QueryCb, Acc, Args) of
+{ok, {Db, Cb, UserAcc1, States}} ->
+UserAcc1
+catch
+error:database_does_not_exist ->
+UserAcc;
+error:{badmatch, {not_found, Reason}} ->
+Msg = "Could not find _design/~s ~s view in replicator db ~s : ~p",
+couch_log:error(Msg, [DDoc, View, Db, Reason]),
+couch_replicator_docs:ensure_cluster_rep_ddoc_exists(Db),
+timer:sleep(?DESIGN_DOC_CREATION_DELAY_MSEC),
+stream_terminal_docs_info(Db, Cb, UserAcc, States)
 end.
 
-init(InitArgs) ->
-{ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-process_flag(trap_exit, true),
-
-random:seed(os:timestamp()),
-
-#rep_state{
-source = Source,
-target = Target,
-source_name = SourceName,
-target_name = TargetName,
-start_seq = {_Ts, StartSeq},
-committed_seq = {_, CommittedSeq},
-highest_seq_done = {_, HighestSeq},
-checkpoint_interval = CheckpointInterval
-} = State = init_state(Rep),
-
-NumWorkers = get_value(worker_processes, Options),
-BatchSize = get_value(worker_batch_size, Options),
-{ok, ChangesQueue} = couch_work_queue:new([
-{max_items, BatchSize * NumWorkers * 2},
-{max_size, 100 * 1024 * NumWorkers}
-]),
-% This starts the _changes reader process. It adds the changes from
-% the source db to the ChangesQueue.
-{ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-StartSeq, Source, ChangesQueue, Options
-),
-% Changes manager - responsible for dequeing batches from the changes queue
-% and