nickva commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r112815052
##########
File path: src/couch_replicator/src/couch_replicator.erl
##########
@@ -191,847 +120,244 @@ wait_for_result(RepId) ->
end.
-cancel_replication({BaseId, Extension}) ->
- FullRepId = BaseId ++ Extension,
- couch_log:notice("Canceling replication `~s`...", [FullRepId]),
- case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
- ok ->
- couch_log:notice("Replication `~s` canceled.", [FullRepId]),
- case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
- ok ->
- {ok, {cancelled, ?l2b(FullRepId)}};
- {error, not_found} ->
- {ok, {cancelled, ?l2b(FullRepId)}};
- Error ->
- Error
- end;
- Error ->
- couch_log:error("Error canceling replication `~s`: ~p", [FullRepId,
Error]),
- Error
- end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
- case lists:member(<<"_admin">>, Roles) of
- true ->
- cancel_replication(RepId);
- false ->
- case find_replicator(RepId) of
- {ok, Pid} ->
- case details(Pid) of
- {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
- cancel_replication(RepId);
- {ok, _} ->
- throw({unauthorized,
- <<"Can't cancel a replication triggered by another
user">>});
- Error ->
- Error
- end;
- Error ->
- Error
- end
- end.
-
-find_replicator({BaseId, Ext} = _RepId) ->
- case lists:keysearch(
- BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup))
of
- {value, {_, Pid, _, _}} when is_pid(Pid) ->
- {ok, Pid};
- _ ->
- {error, not_found}
- end.
-
-details(Pid) ->
- case (catch gen_server:call(Pid, get_details)) of
- {ok, Rep} ->
- {ok, Rep};
- {'EXIT', {noproc, {gen_server, call, _}}} ->
- {error, not_found};
- Error ->
- throw(Error)
+-spec cancel_replication(rep_id()) ->
+ {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+ FullRepId = BasedId ++ Extension,
+ couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+ case couch_replicator_scheduler:rep_state(RepId) of
+ #rep{} ->
+ ok = couch_replicator_scheduler:remove_job(RepId),
+ couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+ {ok, {cancelled, ?l2b(FullRepId)}};
+ nil ->
+ couch_log:notice("Replication '~s' not found", [FullRepId]),
+ {error, not_found}
end.
-init(InitArgs) ->
- {ok, InitArgs, 0}.
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
- process_flag(trap_exit, true),
-
- random:seed(os:timestamp()),
-
- #rep_state{
- source = Source,
- target = Target,
- source_name = SourceName,
- target_name = TargetName,
- start_seq = {_Ts, StartSeq},
- committed_seq = {_, CommittedSeq},
- highest_seq_done = {_, HighestSeq},
- checkpoint_interval = CheckpointInterval
- } = State = init_state(Rep),
-
- NumWorkers = get_value(worker_processes, Options),
- BatchSize = get_value(worker_batch_size, Options),
- {ok, ChangesQueue} = couch_work_queue:new([
- {max_items, BatchSize * NumWorkers * 2},
- {max_size, 100 * 1024 * NumWorkers}
- ]),
- % This starts the _changes reader process. It adds the changes from
- % the source db to the ChangesQueue.
- {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
- StartSeq, Source, ChangesQueue, Options
- ),
- % Changes manager - responsible for dequeing batches from the changes queue
- % and deliver them to the worker processes.
- ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
- % This starts the worker processes. They ask the changes queue manager for
a
- % a batch of _changes rows to process -> check which revs are missing in
the
- % target, and for the missing ones, it copies them from the source to the
target.
- MaxConns = get_value(http_connections, Options),
- Workers = lists:map(
- fun(_) ->
- couch_stats:increment_counter([couch_replicator, workers_started]),
- {ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns),
- Pid
- end,
- lists:seq(1, NumWorkers)),
+-spec replication_states() -> [atom()].
+replication_states() ->
+ ?REPLICATION_STATES.
- couch_task_status:add_task([
- {type, replication},
- {user, UserCtx#user_ctx.name},
- {replication_id, ?l2b(BaseId ++ Ext)},
- {database, Rep#rep.db_name},
- {doc_id, Rep#rep.doc_id},
- {source, ?l2b(SourceName)},
- {target, ?l2b(TargetName)},
- {continuous, get_value(continuous, Options, false)},
- {revisions_checked, 0},
- {missing_revisions_found, 0},
- {docs_read, 0},
- {docs_written, 0},
- {changes_pending, get_pending_count(State)},
- {doc_write_failures, 0},
- {source_seq, HighestSeq},
- {checkpointed_source_seq, CommittedSeq},
- {checkpoint_interval, CheckpointInterval}
- ]),
- couch_task_status:set_update_frequency(1000),
- % Until OTP R14B03:
- %
- % Restarting a temporary supervised child implies that the original
arguments
- % (#rep{} record) specified in the MFA component of the supervisor
- % child spec will always be used whenever the child is restarted.
- % This implies the same replication performance tunning parameters will
- % always be used. The solution is to delete the child spec (see
- % cancel_replication/1) and then start the replication again, but this is
- % unfortunately not immune to race conditions.
-
- couch_log:notice("Replication `~p` is using:~n"
- "~c~p worker processes~n"
- "~ca worker batch size of ~p~n"
- "~c~p HTTP connections~n"
- "~ca connection timeout of ~p milliseconds~n"
- "~c~p retries per request~n"
- "~csocket options are: ~s~s",
- [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
- MaxConns, $\t, get_value(connection_timeout, Options),
- $\t, get_value(retries, Options),
- $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
- case StartSeq of
- ?LOWEST_SEQ ->
- "";
- _ ->
- io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
- end]),
-
- couch_log:debug("Worker pids are: ~p", [Workers]),
-
- couch_replicator_manager:replication_started(Rep),
-
- {ok, State#rep_state{
- changes_queue = ChangesQueue,
- changes_manager = ChangesManager,
- changes_reader = ChangesReader,
- workers = Workers
- }
- }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
- Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
- couch_log:notice(Msg, [RepId]),
- Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
- Src.
-
-handle_info(shutdown, St) ->
- {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
- couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
- couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State)
->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State)
->
- couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
- couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
- couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
- couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
- case Workers -- [Pid] of
- Workers ->
- couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
- {noreply, State#rep_state{workers = Workers}};
- %% not clear why a stop was here before
- %%{stop, {unknown_process_died, Pid, normal}, State};
- [] ->
- catch unlink(State#rep_state.changes_manager),
- catch exit(State#rep_state.changes_manager, kill),
- do_last_checkpoint(State);
- Workers2 ->
- {noreply, State#rep_state{workers = Workers2}}
- end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
- State2 = cancel_timer(State),
- case lists:member(Pid, Workers) of
- false ->
- {stop, {unknown_process_died, Pid, Reason}, State2};
- true ->
- couch_stats:increment_counter([couch_replicator, worker_deaths]),
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
- end;
-
-handle_info(timeout, InitArgs) ->
- try do_init(InitArgs) of {ok, State} ->
- {noreply, State}
- catch Class:Error ->
- Stack = erlang:get_stacktrace(),
- {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+-spec strip_url_creds(binary() | {[_]}) -> binary().
+strip_url_creds(Endpoint) ->
+ case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+ #httpdb{url=Url} ->
+ iolist_to_binary(couch_util:url_strip_password(Url));
+ LocalDb when is_binary(LocalDb) ->
+ LocalDb
end.
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
- {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
- {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
- #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done =
HighestDone,
- current_through_seq = ThroughSeq, stats = Stats} = State) ->
- gen_server:reply(From, ok),
- {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
- [] ->
- {Seq, []};
- [Seq | Rest] ->
- {Seq, Rest};
- [_ | _] ->
- {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
- end,
- NewHighestDone = lists:max([HighestDone, Seq]),
- NewThroughSeq = case NewSeqsInProgress of
- [] ->
- lists:max([NewThroughSeq0, NewHighestDone]);
- _ ->
- NewThroughSeq0
- end,
- couch_log:debug("Worker reported seq ~p, through seq was ~p, "
- "new through seq is ~p, highest seq done was ~p, "
- "new highest seq done is ~p~n"
- "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
- [Seq, ThroughSeq, NewThroughSeq, HighestDone,
- NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
- NewState = State#rep_state{
- stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
- current_through_seq = NewThroughSeq,
- seqs_in_progress = NewSeqsInProgress,
- highest_seq_done = NewHighestDone
- },
- update_task(NewState),
- {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
- #rep_state{source = #db{name = DbName} = Source} = State) ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
- #rep_state{target = #db{name = DbName} = Target} = State) ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
- #rep_state{rep_details = #rep{} = Rep} = State,
- case couch_replicator_manager:continue(Rep) of
- {true, _} ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints,
success]),
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints,
failure]),
- {stop, Error, State}
- end;
- {false, Owner} ->
- couch_replicator_manager:replication_usurped(Rep, Owner),
- {stop, shutdown, State}
- end;
-
-handle_cast({report_seq, Seq},
- #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
- NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
- {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
- {ok, State}.
-
-
-headers_strip_creds([], Acc) ->
- lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
- Value = case string:to_lower(Key) of
- "authorization" ->
- "****";
- _ ->
- Value0
- end,
- headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
- HttpDb#httpdb{
- url = couch_util:url_strip_password(Url),
- headers = headers_strip_creds(Headers, [])
- };
-httpdb_strip_creds(LocalDb) ->
- LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
- Rep#rep{
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target =
Target} = State) ->
- % #rep_state contains the source and target at the top level and also
- % in the nested #rep_details record
- State#rep_state{
- rep_details = rep_strip_creds(Rep),
- source = httpdb_strip_creds(Source),
- target = httpdb_strip_creds(Target)
- }.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
- checkpoint_history = CheckpointHistory} = State) ->
- terminate_cleanup(State),
- couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
- couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
- % cancelled replication throught ?MODULE:cancel_replication/1
- couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
- terminate_cleanup(State);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
- #rep{id=RepId} = InitArgs,
- couch_stats:increment_counter([couch_replicator, failed_starts]),
- CleanInitArgs = rep_strip_creds(InitArgs),
- couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
- [Class, Error, CleanInitArgs, Stack]),
- case Error of
- {unauthorized, DbUri} ->
- NotifyError = {unauthorized, <<"unauthorized to access or create
database ", DbUri/binary>>};
- {db_not_found, DbUri} ->
- NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
- _ ->
- NotifyError = Error
- end,
- couch_replicator_notifier:notify({error, RepId, NotifyError}),
- couch_replicator_manager:replication_error(InitArgs, NotifyError);
-terminate(Reason, State) ->
- #rep_state{
- source_name = Source,
- target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
- } = State,
- couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
- [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
- terminate_cleanup(State),
- couch_replicator_notifier:notify({error, RepId, Reason}),
- couch_replicator_manager:replication_error(Rep, Reason).
-
-terminate_cleanup(State) ->
- update_task(State),
- stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
- stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
- couch_replicator_api_wrap:db_close(State#rep_state.source),
- couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-format_status(_Opt, [_PDict, State]) ->
- [{data, [{"State", state_strip_creds(State)}]}].
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
- {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = Seq} = State) ->
- case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
- {ok, NewState} ->
- couch_stats:increment_counter([couch_replicator, checkpoints,
success]),
- {stop, normal, cancel_timer(NewState)};
- Error ->
- couch_stats:increment_counter([couch_replicator, checkpoints,
failure]),
- {stop, Error, State}
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+ JobId = couch_replicator_ids:convert(JobId0),
+ {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+ case [JobInfo || {ok, JobInfo} <- Res] of
+ [JobInfo| _] ->
+ {ok, JobInfo};
+ [] ->
+ {error, not_found}
end.
-start_timer(State) ->
- After = State#rep_state.checkpoint_interval,
- case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
- {ok, Ref} ->
- Ref;
- Error ->
- couch_log:error("Replicator, error scheduling checkpoint: ~p",
[Error]),
- nil
+-spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+active_doc(DbName, DocId) ->
Review comment:
I pushed my fix for this as described above. First check is local where
workers run. Then if not found coordinator checks the owner only. Then if that
failed it checks all shards' nodes. This way in most cases there's be only one
node queried for undecided responses.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services