davisp commented on a change in pull request #470: Scheduling Replicator URL: https://github.com/apache/couchdb/pull/470#discussion_r110434761
########## File path: src/couch_replicator/src/couch_replicator_docs.erl ########## @@ -0,0 +1,736 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_docs). + +-export([parse_rep_doc/1, parse_rep_doc/2, parse_rep_db/3]). +-export([parse_rep_doc_without_id/1, parse_rep_doc_without_id/2]). +-export([before_doc_update/2, after_doc_read/2]). +-export([ensure_rep_db_exists/0, ensure_rep_ddoc_exists/1]). +-export([ensure_cluster_rep_ddoc_exists/1]). +-export([ + remove_state_fields/2, + update_doc_completed/4, + update_failed/4, + update_rep_id/1 +]). +-export([update_triggered/2, update_error/2]). + + +-define(REP_DB_NAME, <<"_replicator">>). +-define(REP_DESIGN_DOC, <<"_design/_replicator">>). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("ibrowse/include/ibrowse.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include("couch_replicator.hrl"). + +-include("couch_replicator_js_functions.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + +-import(couch_replicator_utils, [ + get_json_value/2, + get_json_value/3 +]). + + +-define(OWNER, <<"owner">>). +-define(CTX, {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}}). + +-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})). + +remove_state_fields(DbName, DocId) -> + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, undefined}, + {<<"_replication_state_time">>, undefined}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_start_time">>, undefined}, + {<<"_replication_id">>, undefined}, + {<<"_replication_stats">>, undefined}]). + +-spec update_doc_completed(binary(), binary(), [_], erlang:timestamp()) -> any(). +update_doc_completed(DbName, DocId, Stats, StartTime) -> + StartTimeBin = couch_replicator_utils:iso8601(StartTime), + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"completed">>}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_start_time">>, StartTimeBin}, + {<<"_replication_stats">>, {Stats}}]), + couch_stats:increment_counter([couch_replicator, docs, completed_state_updates]). + + +-spec update_failed(binary(), binary(), any(), erlang:timestamp()) -> any(). +update_failed(DbName, DocId, Error, StartTime) -> + Reason = error_reason(Error), + couch_log:error("Error processing replication doc `~s` from `~s`: ~s", + [DocId, DbName, Reason]), + StartTimeBin = couch_replicator_utils:iso8601(StartTime), + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"failed">>}, + {<<"_replication_start_time">>, StartTimeBin}, + {<<"_replication_stats">>, undefined}, + {<<"_replication_state_reason">>, Reason}]), + couch_stats:increment_counter([couch_replicator, docs, failed_state_updates]). + + +-spec update_triggered(#rep{}, rep_id()) -> ok. +update_triggered(Rep, {Base, Ext}) -> + #rep{ + db_name = DbName, + doc_id = DocId, + start_time = StartTime + } = Rep, + StartTimeBin = couch_replicator_utils:iso8601(StartTime), + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"triggered">>}, + {<<"_replication_state_reason">>, undefined}, + {<<"_replication_id">>, iolist_to_binary([Base, Ext])}, + {<<"_replication_start_time">>, StartTimeBin}, + {<<"_replication_stats">>, undefined}]), + ok. + + +-spec update_error(#rep{}, any()) -> ok. +update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> + Reason = error_reason(Error), + BinRepId = case RepId of + {Base, Ext} -> + iolist_to_binary([Base, Ext]); + _Other -> + null + end, + update_rep_doc(DbName, DocId, [ + {<<"_replication_state">>, <<"error">>}, + {<<"_replication_state_reason">>, Reason}, + {<<"_replication_stats">>, undefined}, + {<<"_replication_id">>, BinRepId}]), + ok. + + +-spec ensure_rep_db_exists() -> {ok, #db{}}. +ensure_rep_db_exists() -> + Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of + {ok, Db0} -> + Db0; + _Error -> + {ok, Db0} = couch_db:create(?REP_DB_NAME, [?CTX, sys_db]), + Db0 + end, + ok = ensure_rep_ddoc_exists(?REP_DB_NAME), + {ok, Db}. + + +-spec ensure_rep_ddoc_exists(binary()) -> ok. +ensure_rep_ddoc_exists(RepDb) -> + case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of + true -> + ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); + false -> + ok + end. + + +-spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. +ensure_rep_ddoc_exists(RepDb, DDocId) -> + case open_rep_doc(RepDb, DDocId) of + {not_found, no_db_file} -> + %% database was deleted. + ok; + {not_found, _Reason} -> + DocProps = replication_design_doc_props(DDocId), + DDoc = couch_doc:from_json_obj({DocProps}), + couch_log:notice("creating replicator ddoc", []), + {ok, _Rev} = save_rep_doc(RepDb, DDoc); + {ok, Doc} -> + Latest = replication_design_doc_props(DDocId), + {Props0} = couch_doc:to_json_obj(Doc, []), + {value, {_, Rev}, Props} = lists:keytake(<<"_rev">>, 1, Props0), + case compare_ejson({Props}, {Latest}) of + true -> + ok; + false -> + LatestWithRev = [{<<"_rev">>, Rev} | Latest], + DDoc = couch_doc:from_json_obj({LatestWithRev}), + couch_log:notice("updating replicator ddoc", []), + try + {ok, _} = save_rep_doc(RepDb, DDoc) + catch + throw:conflict -> + %% ignore, we'll retry next time + ok + end + end + end, + ok. + + +-spec ensure_cluster_rep_ddoc_exists(binary()) -> ok. +ensure_cluster_rep_ddoc_exists(RepDb) -> + DDocId = ?REP_DESIGN_DOC, + [#shard{name = DbShard} | _] = mem3:shards(RepDb, DDocId), + ensure_rep_ddoc_exists(DbShard, DDocId). + + +-spec compare_ejson({[_]}, {[_]}) -> boolean(). +compare_ejson(EJson1, EJson2) -> + EjsonSorted1 = couch_replicator_filters:ejsort(EJson1), + EjsonSorted2 = couch_replicator_filters:ejsort(EJson2), + EjsonSorted1 == EjsonSorted2. + + +-spec replication_design_doc_props(binary()) -> [_]. +replication_design_doc_props(DDocId) -> + TerminalViewEJson = {[ + {<<"map">>, ?REP_DB_TERMINAL_STATE_VIEW_MAP_FUN}, + {<<"reduce">>, <<"_count">>} + ]}, + DocProps = [ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}, + {<<"views">>, {[ + {<<"terminal_states">>, TerminalViewEJson} + ]}} + ]. + + +% Note: parse_rep_doc can handle filtered replications. During parsing of the +% replication doc it will make possibly remote http requests to the source +% database. If failure or parsing of filter docs fails, parse_doc throws a +% {filter_fetch_error, Error} excation. This exception should be considered transient +% in respect to the contents of the document itself, since it depends on +% netowrk availability of the source db and other factors. +-spec parse_rep_doc({[_]}) -> #rep{}. +parse_rep_doc(RepDoc) -> + {ok, Rep} = try + parse_rep_doc(RepDoc, rep_user_ctx(RepDoc)) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + throw:{filter_fetch_error, Reason} -> + throw({filter_fetch_error, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end, + Rep. + + +-spec parse_rep_doc_without_id({[_]}) -> #rep{}. +parse_rep_doc_without_id(RepDoc) -> + {ok, Rep} = try + parse_rep_doc_without_id(RepDoc, rep_user_ctx(RepDoc)) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end, + Rep. + + +-spec parse_rep_doc({[_]}, #user_ctx{}) -> {ok, #rep{}}. +parse_rep_doc(Doc, UserCtx) -> + {ok, Rep} = parse_rep_doc_without_id(Doc, UserCtx), + Cancel = get_value(cancel, Rep#rep.options, false), + Id = get_value(id, Rep#rep.options, nil), + case {Cancel, Id} of + {true, nil} -> + % Cancel request with no id, must parse id out of body contents + {ok, update_rep_id(Rep)}; + {true, Id} -> + % Cancel request with an id specified, so do not parse id from body + {ok, Rep}; + {false, _Id} -> + % Not a cancel request, regular replication doc + {ok, update_rep_id(Rep)} + end. + + +-spec parse_rep_doc_without_id({[_]}, #user_ctx{}) -> {ok, #rep{}}. +parse_rep_doc_without_id({Props}, UserCtx) -> + Proxy = get_value(<<"proxy">>, Props, <<>>), + Opts = make_options(Props), + case get_value(cancel, Opts, false) andalso + (get_value(id, Opts, nil) =/= nil) of + true -> + {ok, #rep{options = Opts, user_ctx = UserCtx}}; + false -> + Source = parse_rep_db(get_value(<<"source">>, Props), Proxy, Opts), + Target = parse_rep_db(get_value(<<"target">>, Props), Proxy, Opts), + {Type, View} = case couch_replicator_filters:view_type(Props, Opts) of + {error, Error} -> + throw({bad_request, Error}); + Result -> + Result + end, + Rep = #rep{ + source = Source, + target = Target, + options = Opts, + user_ctx = UserCtx, + type = Type, + view = View, + doc_id = get_value(<<"_id">>, Props, null) + }, + % Check if can parse filter code, if not throw exception + case couch_replicator_filters:parse(Opts) of + {error, FilterError} -> + throw({error, FilterError}); + {ok, _Filter} -> + ok + end, + {ok, Rep} + end. + + +% Update a #rep{} record with a replication_id. Calculating the id might involve +% fetching a filter from the source db, and so it could fail intermetently. +% In case of a failure to fetch the filter this function will throw a +% `{filter_fetch_error, Reason} exception. +update_rep_id(Rep) -> + RepId = couch_replicator_ids:replication_id(Rep), + Rep#rep{id = RepId}. + + +update_rep_doc(RepDbName, RepDocId, KVs) -> + update_rep_doc(RepDbName, RepDocId, KVs, 1). + +update_rep_doc(RepDbName, RepDocId, KVs, Wait) when is_binary(RepDocId) -> + try + case open_rep_doc(RepDbName, RepDocId) of + {ok, LastRepDoc} -> + update_rep_doc(RepDbName, LastRepDoc, KVs, Wait * 2); + _ -> + ok + end + catch + throw:conflict -> + Msg = "Conflict when updating replication document `~s`. Retrying.", + couch_log:error(Msg, [RepDocId]), + ok = timer:sleep(random:uniform(erlang:min(128, Wait)) * 100), + update_rep_doc(RepDbName, RepDocId, KVs, Wait * 2) + end; +update_rep_doc(RepDbName, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) -> + NewRepDocBody = lists:foldl( + fun({K, undefined}, Body) -> + lists:keydelete(K, 1, Body); + ({<<"_replication_state">> = K, State} = KV, Body) -> + case get_json_value(K, Body) of + State -> + Body; + _ -> + Body1 = lists:keystore(K, 1, Body, KV), + Timestamp = couch_replicator_utils:iso8601(os:timestamp()), + lists:keystore( + <<"_replication_state_time">>, 1, Body1, + {<<"_replication_state_time">>, Timestamp}) + end; + ({K, _V} = KV, Body) -> + lists:keystore(K, 1, Body, KV) + end, + RepDocBody, KVs), + case NewRepDocBody of + RepDocBody -> + ok; + _ -> + % Might not succeed - when the replication doc is deleted right + % before this update (not an error, ignore). + save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}) + end. + +open_rep_doc(DbName, DocId) -> + case couch_db:open_int(DbName, [?CTX, sys_db]) of + {ok, Db} -> + try + couch_db:open_doc(Db, DocId, [ejson_body]) + after + couch_db:close(Db) + end; + Else -> + Else + end. + +save_rep_doc(DbName, Doc) -> + {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), + try + couch_db:update_doc(Db, Doc, []) + after + couch_db:close(Db) + end. + + +-spec rep_user_ctx({[_]}) -> #user_ctx{}. +rep_user_ctx({RepDoc}) -> + case get_json_value(<<"user_ctx">>, RepDoc) of + undefined -> + #user_ctx{}; + {UserCtx} -> + #user_ctx{ + name = get_json_value(<<"name">>, UserCtx, null), + roles = get_json_value(<<"roles">>, UserCtx, []) + } + end. + +-spec parse_rep_db({[_]} | binary(), binary(), [_]) -> #httpd{} | binary(). +parse_rep_db({Props}, Proxy, Options) -> + ProxyParams = parse_proxy_params(Proxy), + ProxyURL = case ProxyParams of + [] -> undefined; + _ -> binary_to_list(Proxy) + end, + Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)), + {AuthProps} = get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = get_value(<<"headers">>, Props, {[]}), + Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]), + DefaultHeaders = (#httpdb{})#httpdb.headers, + OAuth = case get_value(<<"oauth">>, AuthProps) of + undefined -> + nil; + {OauthProps} -> + #oauth{ + consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)), + token = ?b2l(get_value(<<"token">>, OauthProps)), + token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)), + consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)), + signature_method = + case get_value(<<"signature_method">>, OauthProps) of + undefined -> hmac_sha1; + <<"PLAINTEXT">> -> plaintext; + <<"HMAC-SHA1">> -> hmac_sha1; + <<"RSA-SHA1">> -> rsa_sha1 + end + } + end, + #httpdb{ + url = Url, + oauth = OAuth, + headers = lists:ukeymerge(1, Headers, DefaultHeaders), + ibrowse_options = lists:keysort(1, + [{socket_options, get_value(socket_options, Options)} | + ProxyParams ++ ssl_params(Url)]), + timeout = get_value(connection_timeout, Options), + http_connections = get_value(http_connections, Options), + retries = get_value(retries, Options), + proxy_url = ProxyURL + }; +parse_rep_db(<<"http://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); +parse_rep_db(<<"https://", _/binary>> = Url, Proxy, Options) -> + parse_rep_db({[{<<"url">>, Url}]}, Proxy, Options); +parse_rep_db(<<DbName/binary>>, _Proxy, _Options) -> + DbName; +parse_rep_db(undefined, _Proxy, _Options) -> + throw({error, <<"Missing replicator database">>}). + + +-spec maybe_add_trailing_slash(binary() | list()) -> list(). +maybe_add_trailing_slash(Url) when is_binary(Url) -> + maybe_add_trailing_slash(?b2l(Url)); +maybe_add_trailing_slash(Url) -> + case lists:last(Url) of + $/ -> + Url; + _ -> + Url ++ "/" + end. + +-spec make_options([_]) -> [_]. +make_options(Props) -> + Options0 = lists:ukeysort(1, convert_options(Props)), + Options = check_options(Options0), + DefWorkers = config:get("replicator", "worker_processes", "4"), + DefBatchSize = config:get("replicator", "worker_batch_size", "500"), + DefConns = config:get("replicator", "http_connections", "20"), + DefTimeout = config:get("replicator", "connection_timeout", "30000"), + DefRetries = config:get("replicator", "retries_per_request", "10"), + UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), + DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"), + {ok, DefSocketOptions} = couch_util:parse_term( + config:get("replicator", "socket_options", + "[{keepalive, true}, {nodelay, false}]")), + lists:ukeymerge(1, Options, lists:keysort(1, [ + {connection_timeout, list_to_integer(DefTimeout)}, + {retries, list_to_integer(DefRetries)}, + {http_connections, list_to_integer(DefConns)}, + {socket_options, DefSocketOptions}, + {worker_batch_size, list_to_integer(DefBatchSize)}, + {worker_processes, list_to_integer(DefWorkers)}, + {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, + {checkpoint_interval, list_to_integer(DefCheckpointInterval)} + ])). + + +-spec convert_options([_]) -> [_]. +convert_options([])-> + []; +convert_options([{<<"cancel">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `cancel` must be a boolean">>}); +convert_options([{<<"cancel">>, V} | R]) -> + [{cancel, V} | convert_options(R)]; +convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>; + IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> -> + [{id, couch_replicator_ids:convert(V)} | convert_options(R)]; +convert_options([{<<"create_target">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `create_target` must be a boolean">>}); +convert_options([{<<"create_target">>, V} | R]) -> + [{create_target, V} | convert_options(R)]; +convert_options([{<<"continuous">>, V} | _R]) when not is_boolean(V)-> + throw({bad_request, <<"parameter `continuous` must be a boolean">>}); +convert_options([{<<"continuous">>, V} | R]) -> + [{continuous, V} | convert_options(R)]; +convert_options([{<<"filter">>, V} | R]) -> + [{filter, V} | convert_options(R)]; +convert_options([{<<"query_params">>, V} | R]) -> + [{query_params, V} | convert_options(R)]; +convert_options([{<<"doc_ids">>, null} | R]) -> + convert_options(R); +convert_options([{<<"doc_ids">>, V} | _R]) when not is_list(V) -> + throw({bad_request, <<"parameter `doc_ids` must be an array">>}); +convert_options([{<<"doc_ids">>, V} | R]) -> + % Ensure same behaviour as old replicator: accept a list of percent + % encoded doc IDs. + DocIds = lists:usort([?l2b(couch_httpd:unquote(Id)) || Id <- V]), + [{doc_ids, DocIds} | convert_options(R)]; +convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) -> + throw({bad_request, <<"parameter `selector` must be a JSON object">>}); +convert_options([{<<"selector">>, V} | R]) -> + [{selector, V} | convert_options(R)]; +convert_options([{<<"worker_processes">>, V} | R]) -> + [{worker_processes, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"worker_batch_size">>, V} | R]) -> + [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"http_connections">>, V} | R]) -> + [{http_connections, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"connection_timeout">>, V} | R]) -> + [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"retries_per_request">>, V} | R]) -> + [{retries, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"socket_options">>, V} | R]) -> + {ok, SocketOptions} = couch_util:parse_term(V), + [{socket_options, SocketOptions} | convert_options(R)]; +convert_options([{<<"since_seq">>, V} | R]) -> + [{since_seq, V} | convert_options(R)]; +convert_options([{<<"use_checkpoints">>, V} | R]) -> + [{use_checkpoints, V} | convert_options(R)]; +convert_options([{<<"checkpoint_interval">>, V} | R]) -> + [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([_ | R]) -> % skip unknown option + convert_options(R). + +-spec check_options([_]) -> [_]. +check_options(Options) -> + DocIds = lists:keyfind(doc_ids, 1, Options), + Filter = lists:keyfind(filter, 1, Options), + Selector = lists:keyfind(selector, 1, Options), + case {DocIds, Filter, Selector} of + {false, false, false} -> Options; + {false, false, _} -> Options; + {false, _, false} -> Options; + {_, false, false} -> Options; + _ -> + throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"}) + end. + +-spec parse_proxy_params(binary() | [_]) -> [_]. +parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> + parse_proxy_params(?b2l(ProxyUrl)); +parse_proxy_params([]) -> + []; +parse_proxy_params(ProxyUrl) -> + #url{ + host = Host, + port = Port, + username = User, + password = Passwd, + protocol = Protocol + } = ibrowse_lib:parse_url(ProxyUrl), + [{proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port}] ++ + case is_list(User) andalso is_list(Passwd) of Review comment: This is an odd pattern. On the one hand it makes the ++ on the previous line more obvious. On the other its a made up style. You might want to consider something like: ```erlang [ {proxy_protocol, Protocol}, {proxy_host, Host}, {proxy_port, Port} ] ++ case is_list(User) andalso is_list(Passwd) of ... ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
