iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r111405816
 
 

 ##########
 File path: src/couch_replicator/src/couch_replicator.erl
 ##########
 @@ -191,847 +129,314 @@ wait_for_result(RepId) ->
     end.
 
 
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, 
Error]),
-        Error
-    end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        case find_replicator(RepId) of
-        {ok, Pid} ->
-            case details(Pid) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another 
user">>});
-            Error ->
-                Error
-            end;
-        Error ->
-            Error
-        end
+-spec cancel_replication(rep_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+    FullRepId = BasedId ++ Extension,
+    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{} ->
+        ok = couch_replicator_scheduler:remove_job(RepId),
+        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+        {ok, {cancelled, ?l2b(FullRepId)}};
+    nil ->
+        couch_log:notice("Replication '~s' not found", [FullRepId]),
+        {error, not_found}
     end.
 
-find_replicator({BaseId, Ext} = _RepId) ->
-    case lists:keysearch(
-        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) 
of
-    {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            {ok, Pid};
-    _ ->
-            {error, not_found}
-    end.
 
-details(Pid) ->
-    case (catch gen_server:call(Pid, get_details)) of
-    {ok, Rep} ->
-        {ok, Rep};
-    {'EXIT', {noproc, {gen_server, call, _}}} ->
-        {error, not_found};
-    Error ->
-        throw(Error)
+-spec replication_states() -> [atom()].
+replication_states() ->
+    ?REPLICATION_STATES.
+
+
+-spec stream_terminal_docs_info(binary(), user_doc_cb(), any(), [atom()]) ->
+    any().
+stream_terminal_docs_info(Db, Cb, UserAcc, States) ->
+    DDoc = <<"_replicator">>,
+    View = <<"terminal_states">>,
+    QueryCb = fun handle_replicator_doc_query/2,
+    Args = #mrargs{view_type = map, reduce = false},
+    Acc = {Db, Cb, UserAcc, States},
+    try fabric:query_view(Db, DDoc, View, QueryCb, Acc, Args) of
+    {ok, {Db, Cb, UserAcc1, States}} ->
+        UserAcc1
+    catch
+        error:database_does_not_exist ->
+            UserAcc;
+        error:{badmatch, {not_found, Reason}} ->
+            Msg = "Could not find _design/~s ~s view in replicator db ~s : ~p",
+            couch_log:error(Msg, [DDoc, View, Db, Reason]),
+            couch_replicator_docs:ensure_cluster_rep_ddoc_exists(Db),
+            timer:sleep(?DESIGN_DOC_CREATION_DELAY_MSEC),
+            stream_terminal_docs_info(Db, Cb, UserAcc, States)
     end.
 
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
-
-    random:seed(os:timestamp()),
-
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        committed_seq = {_, CommittedSeq},
-        highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-        StartSeq, Source, ChangesQueue, Options
-    ),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for 
a
-    % a batch of _changes rows to process -> check which revs are missing in 
the
-    % target, and for the missing ones, it copies them from the source to the 
target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            couch_stats:increment_counter([couch_replicator, workers_started]),
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
 
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, 0},
-        {source_seq, HighestSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {checkpoint_interval, CheckpointInterval}
-    ]),
-    couch_task_status:set_update_frequency(1000),
-
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original 
arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    couch_log:notice("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    couch_log:debug("Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
-    Src.
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
-    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) 
->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) 
->
-    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
-    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
-    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+-spec stream_active_docs_info(user_doc_cb(), any(), [atom()]) -> any().
+stream_active_docs_info(Cb, UserAcc, States) ->
+    Nodes = lists:sort([node() | nodes()]),
+    stream_active_docs_info(Nodes, Cb, UserAcc, States).
+
+
+-spec stream_active_docs_info([node()], user_doc_cb(), any(), [atom()]) ->
+    any().
+stream_active_docs_info([], _Cb, UserAcc, _States) ->
+    UserAcc;
+stream_active_docs_info([Node | Nodes], Cb, UserAcc, States) ->
+    case rpc:call(Node, couch_replicator_doc_processor, docs, [States]) of
+        {badrpc, Reason} ->
+            ErrMsg = "Could not get replicator docs from ~p. Error: ~p",
+            couch_log:error(ErrMsg, [Node, Reason]),
+            stream_active_docs_info(Nodes, Cb, UserAcc, States);
+        Results ->
+            UserAcc1 = lists:foldl(Cb, UserAcc, Results),
+            stream_active_docs_info(Nodes, Cb, UserAcc1, States)
     end.
 
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
 
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = 
HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+-spec handle_replicator_doc_query
+    ({row, [_]} , query_acc()) -> {ok, query_acc()};
+    ({error, any()}, query_acc()) -> {error, any()};
+    ({meta, any()}, query_acc()) -> {ok,  query_acc()};
+    (complete, query_acc()) -> {ok, query_acc()}.
+handle_replicator_doc_query({row, Props}, {Db, Cb, UserAcc, States}) ->
+    DocId = get_value(id, Props),
+    DocStateBin = get_value(key, Props),
+    DocState = erlang:binary_to_existing_atom(DocStateBin, utf8),
 
 Review comment:
   Are there other options rather than to use binary_to_existing_atom? 
binary_to_existing_atom is not hot-code upgrade friendly since it introduces an 
order in which modules need to be loaded to make it work. 
   ```
   state_to_atom(<<"initializing">>) -> initializing;
   state_to_atom(<<"error">>) -> error;
   state_to_atom(<<"running">>) -> running;
   state_to_atom(<<"pending">>) -> pending;
   state_to_atom(<<"crashing">>) -> crashing;
   ...
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to