davisp commented on a change in pull request #470: Scheduling Replicator URL: https://github.com/apache/couchdb/pull/470#discussion_r110713419
########## File path: src/couch_replicator/src/couch_replicator_doc_processor.erl ########## @@ -0,0 +1,946 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_doc_processor). + +-behaviour(gen_server). +-behaviour(couch_multidb_changes). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3 +]). + +-export([ + db_created/2, + db_deleted/2, + db_found/2, + db_change/3 +]). + +-export([ + docs/1, + doc/2, + update_docs/0, + get_worker_ref/1, + notify_cluster_event/2 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator.hrl"). + +-import(couch_replicator_utils, [ + get_json_value/2, + get_json_value/3 +]). + +-define(DEFAULT_UPDATE_DOCS, false). +-define(ERROR_MAX_BACKOFF_EXPONENT, 12). % ~ 1 day on average +-define(TS_DAY_SEC, 86400). + +-type filter_type() :: nil | view | user | docids | mango. +-type repstate() :: initializing | error | scheduled. + + +-record(rdoc, { + id :: db_doc_id() | '_' | {any(), '_'}, + state :: repstate() | '_', + rep :: #rep{} | nil | '_', + rid :: rep_id() | nil | '_', + filter :: filter_type() | '_', + info :: binary() | nil | '_', + errcnt :: non_neg_integer() | '_', + worker :: reference() | nil | '_', + last_updated :: erlang:timestamp() | '_' +}). + + +% couch_multidb_changes API callbacks + +db_created(DbName, Server) -> + couch_stats:increment_counter([couch_replicator, docs, dbs_created]), + couch_replicator_docs:ensure_rep_ddoc_exists(DbName), + Server. + + +db_deleted(DbName, Server) -> + couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), + ok = gen_server:call(?MODULE, {clean_up_replications, DbName}, infinity), + Server. + + +db_found(DbName, Server) -> + couch_stats:increment_counter([couch_replicator, docs, dbs_found]), + couch_replicator_docs:ensure_rep_ddoc_exists(DbName), + Server. + + +db_change(DbName, {ChangeProps} = Change, Server) -> + couch_stats:increment_counter([couch_replicator, docs, db_changes]), + try + ok = process_change(DbName, Change) + catch + _Tag:Error -> + {RepProps} = get_json_value(doc, ChangeProps), + DocId = get_json_value(<<"_id">>, RepProps), + couch_replicator_docs:update_failed(DbName, DocId, Error) + end, + Server. + + +-spec get_worker_ref(db_doc_id()) -> reference() | nil. +get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> + case ets:lookup(?MODULE, {DbName, DocId}) of + [#rdoc{worker = WRef}] when is_reference(WRef) -> + WRef; + [#rdoc{worker = nil}] -> + nil; + [] -> + nil + end. + + +% Cluster membership change notification callback +-spec notify_cluster_event(pid(), {cluster, any()}) -> ok. +notify_cluster_event(Server, {cluster, _} = Event) -> + gen_server:cast(Server, Event). + + +% Private helpers for multidb changes API, these updates into the doc +% processor gen_server + +process_change(DbName, {Change}) -> + {RepProps} = JsonRepDoc = get_json_value(doc, Change), + DocId = get_json_value(<<"_id">>, RepProps), + Owner = couch_replicator_clustering:owner(DbName, DocId), + Id = {DbName, DocId}, + case {Owner, get_json_value(deleted, Change, false)} of + {_, true} -> + ok = gen_server:call(?MODULE, {removed, Id}, infinity); + {unstable, false} -> + couch_log:notice("Not starting '~s' as cluster is unstable", [DocId]); + {ThisNode, false} when ThisNode =:= node() -> + case get_json_value(<<"_replication_state">>, RepProps) of + undefined -> + ok = process_updated(Id, JsonRepDoc); + <<"triggered">> -> + maybe_remove_state_fields(DbName, DocId), + ok = process_updated(Id, JsonRepDoc); + <<"completed">> -> + ok = gen_server:call(?MODULE, {completed, Id}, infinity); + <<"error">> -> + % Handle replications started from older versions of replicator + % which wrote transient errors to replication docs + maybe_remove_state_fields(DbName, DocId), + ok = process_updated(Id, JsonRepDoc); + <<"failed">> -> + ok + end; + {Owner, false} -> + ok + end, + ok. + + +maybe_remove_state_fields(DbName, DocId) -> + case update_docs() of + true -> + ok; + false -> + couch_replicator_docs:remove_state_fields(DbName, DocId) + end. + + +process_updated({DbName, _DocId} = Id, JsonRepDoc) -> + % Parsing replication doc (but not calculating the id) could throw an + % exception which would indicate this document is malformed. This exception + % should propagate to db_change function and will be recorded as permanent + % failure in the document. User will have to delete and re-create the + % document to fix the problem. + Rep0 = couch_replicator_docs:parse_rep_doc_without_id(JsonRepDoc), + Rep = Rep0#rep{db_name = DbName, start_time = os:timestamp()}, + Filter = case couch_replicator_filters:parse(Rep#rep.options) of + {ok, nil} -> + nil; + {ok, {user, _FName, _QP}} -> + user; + {ok, {view, _FName, _QP}} -> + view; + {ok, {docids, _DocIds}} -> + docids; + {ok, {mango, _Selector}} -> + mango; + {error, FilterError} -> + throw(FilterError) + end, + gen_server:call(?MODULE, {updated, Id, Rep, Filter}, infinity). + + +% Doc processor gen_server API and callbacks + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init([]) -> + ?MODULE = ets:new(?MODULE, [ordered_set, named_table, {keypos, #rdoc.id}]), + couch_replicator_clustering:link_cluster_event_listener(?MODULE, + notify_cluster_event, [self()]), + {ok, nil}. + + +terminate(_Reason, _State) -> + ok. + + +handle_call({updated, Id, Rep, Filter}, _From, State) -> + ok = updated_doc(Id, Rep, Filter), + {reply, ok, State}; + +handle_call({removed, Id}, _From, State) -> + ok = removed_doc(Id), + {reply, ok, State}; + +handle_call({completed, Id}, _From, State) -> + true = ets:delete(?MODULE, Id), + {reply, ok, State}; + +handle_call({clean_up_replications, DbName}, _From, State) -> + ok = removed_db(DbName), + {reply, ok, State}. + +handle_cast({cluster, unstable}, State) -> + % Ignoring unstable state transition + {noreply, State}; + +handle_cast({cluster, stable}, State) -> + % Membership changed recheck all the replication document ownership + nil = ets:foldl(fun cluster_membership_foldl/2, nil, ?MODULE), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {error, unexpected_message, Msg}, State}. + + +handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref, + result = Res}}, State) -> + ok = worker_returned(Ref, Id, Res), + {noreply, State}; + +handle_info(_Msg, State) -> + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +% Doc processor gen_server private helper functions + +% Handle doc update -- add to ets, then start a worker to try to turn it into +% a replication job. In most cases it will succeed quickly but for filtered +% replicationss or if there are duplicates, it could take longer +% (theoretically indefinitely) until a replication could be started. Before +% adding replication job, make sure to delete all old jobs associated with +% same document. +-spec updated_doc(db_doc_id(), #rep{}, filter_type()) -> ok. +updated_doc(Id, Rep, Filter) -> + case normalize_rep(current_rep(Id)) == normalize_rep(Rep) of + false -> + removed_doc(Id), + Row = #rdoc{ + id = Id, + state = initializing, + rep = Rep, + rid = nil, + filter = Filter, + info = nil, + errcnt = 0, + worker = nil, + last_updated = os:timestamp() + }, + true = ets:insert(?MODULE, Row), + ok = maybe_start_worker(Id); + true -> + ok + end. + + +% Return current #rep{} record if any. If replication hasn't been submitted +% to the scheduler yet, #rep{} record will be in the document processor's +% ETS table, otherwise query scheduler for the #rep{} record. +-spec current_rep({binary(), binary()}) -> #rep{} | nil. +current_rep({DbName, DocId}) when is_binary(DbName), is_binary(DocId) -> + case ets:lookup(?MODULE, {DbName, DocId}) of + [] -> + nil; + [#rdoc{state = scheduled, rep = nil, rid = JobId}] -> + % When replication is scheduled, #rep{} record which can be quite + % large compared to other bits in #rdoc is removed in order to avoid + % having to keep 2 copies of it. So have to fetch it from the + % scheduler. + couch_replicator_scheduler:rep_state(JobId); + [#rdoc{rep = Rep}] -> + Rep + end. + + +% Normalize a #rep{} record such that it doesn't contain time dependent fields +% pids (like httpc pools), and options / props are sorted. This function would +% used during comparisons. +-spec normalize_rep(#rep{} | nil) -> #rep{} | nil. +normalize_rep(nil) -> + nil; + +normalize_rep(#rep{} = Rep)-> + #rep{ + source = couch_replicator_api_wrap:normalize_db(Rep#rep.source), + target = couch_replicator_api_wrap:normalize_db(Rep#rep.target), + options = Rep#rep.options, % already sorted in make_options/1 + type = Rep#rep.type, + view = Rep#rep.view, + doc_id = Rep#rep.doc_id, + db_name = Rep#rep.db_name + }. + + +-spec worker_returned(reference(), db_doc_id(), rep_start_result()) -> ok. +worker_returned(Ref, Id, {ok, RepId}) -> + case ets:lookup(?MODULE, Id) of + [#rdoc{worker = Ref} = Row] -> + Row0 = Row#rdoc{ + state = scheduled, + errcnt = 0, + worker = nil, + last_updated = os:timestamp() + }, + NewRow = case Row0 of + #rdoc{rid = RepId, filter = user} -> + % Filtered replication id didn't change. + Row0; + #rdoc{rid = nil, filter = user} -> + % Calculated new replication id for a filtered replication. Make + % sure to schedule another check as filter code could change. + % Replication starts could have been failing, so also clear + % error count. + Row0#rdoc{rid = RepId}; + #rdoc{rid = OldRepId, filter = user} -> + % Replication id of existing replication job with filter has + % changed. Remove old replication job from scheduler and + % schedule check to check for future changes. + ok = couch_replicator_scheduler:remove_job(OldRepId), + Msg = io_lib:format("Replication id changed: ~p -> ~p", [ + OldRepId, RepId]), + Row0#rdoc{rid = RepId, info = couch_util:to_binary(Msg)}; + #rdoc{rid = nil} -> + % Calculated new replication id for non-filtered replication. + % Remove replication doc body, after this we won't needed any Review comment: "we won't needed" seems like its missing a word. And assuming its "we won't be needed" or something its not entirely clear what's not needed or what we're doing about it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
