 File path: src/couch_replicator/src/couch_replicator_docs.erl
 @@ -0,0 +1,756 @@
+    parse_rep_doc/1,
+    parse_rep_doc/2,
+    parse_rep_db/3,
+    parse_rep_doc_without_id/1,
+    parse_rep_doc_without_id/2,
+    before_doc_update/2,
+    after_doc_read/2,
+    ensure_rep_db_exists/0,
+    ensure_rep_ddoc_exists/1,
+    ensure_cluster_rep_ddoc_exists/1,
+    remove_state_fields/2,
+    update_doc_completed/3,
+    update_failed/3,
+    update_rep_id/1,
+    update_triggered/2,
+    update_error/2
+-import(couch_util, [
+    get_value/2,
+    get_value/3,
+    to_binary/1
+-import(couch_replicator_utils, [
+    get_json_value/2,
+    get_json_value/3
+-define(REP_DB_NAME, <<"_replicator">>).
+-define(REP_DESIGN_DOC, <<"_design/_replicator">>).
+-define(OWNER, <<"owner">>).
+-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}).
+-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
+remove_state_fields(DbName, DocId) ->
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, undefined},
+        {<<"_replication_state_time">>, undefined},
+        {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_id">>, undefined},
+        {<<"_replication_stats">>, undefined}]).
+-spec update_doc_completed(binary(), binary(), [_]) -> any().
+update_doc_completed(DbName, DocId, Stats) ->
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"completed">>},
+        {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_stats">>, {Stats}}]),
+    couch_stats:increment_counter([couch_replicator, docs,
+        completed_state_updates]).
+-spec update_failed(binary(), binary(), any()) -> any().
+update_failed(DbName, DocId, Error) ->
+    Reason = error_reason(Error),
+    couch_log:error("Error processing replication doc `~s` from `~s`: ~s",
+        [DocId, DbName, Reason]),
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"failed">>},
+        {<<"_replication_stats">>, undefined},
+        {<<"_replication_state_reason">>, Reason}]),
+    couch_stats:increment_counter([couch_replicator, docs,
+        failed_state_updates]).
+-spec update_triggered(#rep{}, rep_id()) -> ok.
+update_triggered(Rep, {Base, Ext}) ->
+    #rep{
+        db_name = DbName,
+        doc_id = DocId
+    } = Rep,
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"triggered">>},
+        {<<"_replication_state_reason">>, undefined},
+        {<<"_replication_id">>, iolist_to_binary([Base, Ext])},
+        {<<"_replication_stats">>, undefined}]),
+    ok.
+-spec update_error(#rep{}, any()) -> ok.
+update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
+    Reason = error_reason(Error),
+    BinRepId = case RepId of
+        {Base, Ext} ->
+            iolist_to_binary([Base, Ext]);
+        _Other ->
+            null
+    end,
+    update_rep_doc(DbName, DocId, [
+        {<<"_replication_state">>, <<"error">>},
+        {<<"_replication_state_reason">>, Reason},
+        {<<"_replication_stats">>, undefined},
+        {<<"_replication_id">>, BinRepId}]),
+    ok.
+-spec ensure_rep_db_exists() -> {ok, #db{}}.
+ensure_rep_db_exists() ->
+    Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
+            nologifmissing]) of
+        {ok, Db0} ->
+            Db0;
+        _Error ->
+            {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]),
+            Db0
+    end,
+    ok = ensure_rep_ddoc_exists(?REP_DB_NAME),
+    {ok, Db}.
+-spec ensure_rep_ddoc_exists(binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb) ->
+    case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of
+        true ->
+            ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC);
+        false ->
+            ok
+    end.
+-spec ensure_rep_ddoc_exists(binary(), binary()) -> ok.
+ensure_rep_ddoc_exists(RepDb, DDocId) ->
+    case open_rep_doc(RepDb, DDocId) of
+        {not_found, no_db_file} ->
+            %% database was deleted.
+            ok;
+        {not_found, _Reason} ->
+            DocProps = replication_design_doc_props(DDocId),
+            DDoc = couch_doc:from_json_obj({DocProps}),
+            couch_log:notice("creating replicator ddoc", []),
+            {ok, _Rev} = save_rep_doc(RepDb, DDoc);
+        {ok, Doc} ->
+            Latest = replication_design_doc_props(DDocId),
+            {Props0} = couch_doc:to_json_obj(Doc, []),
+            {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0),
+            case compare_ejson({Props}, {Latest}) of
+                true ->
+                    ok;
+                false ->
+                    LatestWithRev = [{<<"_rev">>, Rev} | Latest],
+                    DDoc = couch_doc:from_json_obj({LatestWithRev}),
+                    couch_log:notice("updating replicator ddoc", []),
+                    try
+                        {ok, _} = save_rep_doc(RepDb, DDoc)
+                    catch
+                        throw:conflict ->
+                            %% ignore, we'll retry next time
+                            ok
+                    end
+            end
+    end,
+    ok.
+-spec ensure_cluster_rep_ddoc_exists(binary()) -> ok.
+ensure_cluster_rep_ddoc_exists(RepDb) ->
+    DDocId = ?REP_DESIGN_DOC,
+    [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId),
+    ensure_rep_ddoc_exists(DbShard, DDocId).
+-spec compare_ejson({[_]}, {[_]}) -> boolean().
+compare_ejson(EJson1, EJson2) ->
+    EjsonSorted1 = couch_replicator_filters:ejsort(EJson1),
+    EjsonSorted2 = couch_replicator_filters:ejsort(EJson2),
+    EjsonSorted1 == EjsonSorted2.
+-spec replication_design_doc_props(binary()) -> [_].
+replication_design_doc_props(DDocId) ->
+    [
+        {<<"_id">>, DDocId},
+        {<<"language">>, <<"javascript">>},
+        {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+    ].
+% Note: parse_rep_doc can handle filtered replications. During parsing of the
+% replication doc it will make possibly remote http requests to the source
+% database. If failure or parsing of filter docs fails, parse_doc throws a
+% {filter_fetch_error, Error} excation. This exception should be considered
+% transient in respect to the contents of the document itself, since it depends
+% on netowrk availability of the source db and other factors.
+-spec parse_rep_doc({[_]}) -> #rep{}.
+parse_rep_doc(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep_doc(RepDoc, rep_user_ctx(RepDoc))
+    catch
+        throw:{error, Reason} ->
+            throw({bad_rep_doc, Reason});
+        throw:{filter_fetch_error, Reason} ->
+            throw({filter_fetch_error, Reason});
+        Tag:Err ->
+            throw({bad_rep_doc, to_binary({Tag, Err})})
+    end,
+    Rep.
+-spec parse_rep_doc_without_id({[_]}) -> #rep{}.
+parse_rep_doc_without_id(RepDoc) ->
+    {ok, Rep} = try
+        parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc))
+    catch
+        throw:{error, Reason} ->
+            throw({bad_rep_doc, Reason});
+        Tag:Err ->
+            throw({bad_rep_doc, to_binary({Tag, Err})})
+    end,
+    Rep.
+-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc(Doc, UserCtx) ->
+    {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx),
+    Cancel = get_value(cancel, Rep#rep.options, false),
+    Id = get_value(id, Rep#rep.options, nil),
+    case {Cancel, Id} of
+        {true, nil} ->
+            % Cancel request with no id, must parse id out of body contents
+            {ok, update_rep_id(Rep)};
+        {true, Id} ->
+            % Cancel request with an id specified, so do not parse id from body
+            {ok, Rep};
+        {false, _Id} ->
+            % Not a cancel request, regular replication doc
+            {ok, update_rep_id(Rep)}
+    end.
+-spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}.
+parse_rep_doc_without_id({Props}, UserCtx) ->
+    Proxy = get_value(<<"proxy">>, Props, <<>>),
+    Opts = make_options(Props),
+    case get_value(cancel, Opts, false) andalso
+        (get_value(id, Opts, nil) =/= nil) of
+    true ->
+        {ok, #rep{options = Opts, user_ctx = UserCtx}};
+    false ->
+        Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts),
+        Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts),
+        {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of
+        {error, Error} ->
+            throw({bad_request, Error});
+        Result ->
+            Result
+        end,
+        Rep = #rep{
+            source = Source,
+            target = Target,
+            options = Opts,
+            user_ctx = UserCtx,
+            type = Type,
+            view = View,
+            doc_id = get_value(<<"_id">>, Props, null)
+        },
+        % Check if can parse filter code, if not throw exception
+        case couch_replicator_filters:parse(Opts) of
+        {error, FilterError} ->
+            throw({error, FilterError});
+        {ok, _Filter} ->
+             ok
+        end,
+        {ok, Rep}
+    end.
+% Update a #rep{} record with a replication_id. Calculating the id might 
+% fetching a filter from the source db, and so it could fail intermetently.
+% In case of a failure to fetch the filter this function will throw a
+%  `{filter_fetch_error, Reason} exception.
+update_rep_id(Rep) ->
+    RepId = couch_replicator_ids:replication_id(Rep),
+    Rep#rep{id = RepId}.
+update_rep_doc(RepDbName, RepDocId, KVs) ->
+    update_rep_doc(RepDbName, RepDocId, KVs, 1).
+update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) ->
+    try
+        case open_rep_doc(RepDbName, RepDocId) of
+            {ok, LastRepDoc} ->
+                update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2);
+            _ ->
+                ok
+        end
+    catch
+        throw:conflict ->
+            Msg = "Conflict when updating replication doc `~s`. Retrying.",
+            couch_log:error(Msg, [RepDocId]),
+            ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100),
+            update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2)
 Review comment:
   This breaks tail-recursiveness of the function. 
