iilyak commented on a change in pull request #470: Scheduling Replicator
URL: https://github.com/apache/couchdb/pull/470#discussion_r112502305
 
 

 ##########
 File path: src/couch_replicator/src/couch_replicator.erl
 ##########
 @@ -191,847 +126,244 @@ wait_for_result(RepId) ->
     end.
 
 
-cancel_replication({BaseId, Extension}) ->
-    FullRepId = BaseId ++ Extension,
-    couch_log:notice("Canceling replication `~s`...", [FullRepId]),
-    case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
-    ok ->
-        couch_log:notice("Replication `~s` canceled.", [FullRepId]),
-        case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
-            ok ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            {error, not_found} ->
-                {ok, {cancelled, ?l2b(FullRepId)}};
-            Error ->
-                Error
-        end;
-    Error ->
-        couch_log:error("Error canceling replication `~s`: ~p", [FullRepId, 
Error]),
-        Error
-    end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
-    case lists:member(<<"_admin">>, Roles) of
-    true ->
-        cancel_replication(RepId);
-    false ->
-        case find_replicator(RepId) of
-        {ok, Pid} ->
-            case details(Pid) of
-            {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
-                cancel_replication(RepId);
-            {ok, _} ->
-                throw({unauthorized,
-                    <<"Can't cancel a replication triggered by another 
user">>});
-            Error ->
-                Error
-            end;
-        Error ->
-            Error
-        end
-    end.
-
-find_replicator({BaseId, Ext} = _RepId) ->
-    case lists:keysearch(
-        BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) 
of
-    {value, {_, Pid, _, _}} when is_pid(Pid) ->
-            {ok, Pid};
-    _ ->
-            {error, not_found}
+-spec cancel_replication(rep_id()) ->
+    {ok, {cancelled, binary()}} | {error, not_found}.
+cancel_replication({BasedId, Extension} = RepId) ->
+    FullRepId = BasedId ++ Extension,
+    couch_log:notice("Canceling replication '~s' ...", [FullRepId]),
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{} ->
+        ok = couch_replicator_scheduler:remove_job(RepId),
+        couch_log:notice("Replication '~s' cancelled", [FullRepId]),
+        {ok, {cancelled, ?l2b(FullRepId)}};
+    nil ->
+        couch_log:notice("Replication '~s' not found", [FullRepId]),
+        {error, not_found}
     end.
 
-details(Pid) ->
-    case (catch gen_server:call(Pid, get_details)) of
-    {ok, Rep} ->
-        {ok, Rep};
-    {'EXIT', {noproc, {gen_server, call, _}}} ->
-        {error, not_found};
-    Error ->
-        throw(Error)
-    end.
-
-init(InitArgs) ->
-    {ok, InitArgs, 0}.
 
-do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
-    process_flag(trap_exit, true),
+-spec replication_states() -> [atom()].
+replication_states() ->
+    ?REPLICATION_STATES.
 
-    random:seed(os:timestamp()),
 
-    #rep_state{
-        source = Source,
-        target = Target,
-        source_name = SourceName,
-        target_name = TargetName,
-        start_seq = {_Ts, StartSeq},
-        committed_seq = {_, CommittedSeq},
-        highest_seq_done = {_, HighestSeq},
-        checkpoint_interval = CheckpointInterval
-    } = State = init_state(Rep),
-
-    NumWorkers = get_value(worker_processes, Options),
-    BatchSize = get_value(worker_batch_size, Options),
-    {ok, ChangesQueue} = couch_work_queue:new([
-        {max_items, BatchSize * NumWorkers * 2},
-        {max_size, 100 * 1024 * NumWorkers}
-    ]),
-    % This starts the _changes reader process. It adds the changes from
-    % the source db to the ChangesQueue.
-    {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
-        StartSeq, Source, ChangesQueue, Options
-    ),
-    % Changes manager - responsible for dequeing batches from the changes queue
-    % and deliver them to the worker processes.
-    ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
-    % This starts the worker processes. They ask the changes queue manager for 
a
-    % a batch of _changes rows to process -> check which revs are missing in 
the
-    % target, and for the missing ones, it copies them from the source to the 
target.
-    MaxConns = get_value(http_connections, Options),
-    Workers = lists:map(
-        fun(_) ->
-            couch_stats:increment_counter([couch_replicator, workers_started]),
-            {ok, Pid} = couch_replicator_worker:start_link(
-                self(), Source, Target, ChangesManager, MaxConns),
-            Pid
-        end,
-        lists:seq(1, NumWorkers)),
-
-    couch_task_status:add_task([
-        {type, replication},
-        {user, UserCtx#user_ctx.name},
-        {replication_id, ?l2b(BaseId ++ Ext)},
-        {database, Rep#rep.db_name},
-        {doc_id, Rep#rep.doc_id},
-        {source, ?l2b(SourceName)},
-        {target, ?l2b(TargetName)},
-        {continuous, get_value(continuous, Options, false)},
-        {revisions_checked, 0},
-        {missing_revisions_found, 0},
-        {docs_read, 0},
-        {docs_written, 0},
-        {changes_pending, get_pending_count(State)},
-        {doc_write_failures, 0},
-        {source_seq, HighestSeq},
-        {checkpointed_source_seq, CommittedSeq},
-        {checkpoint_interval, CheckpointInterval}
-    ]),
-    couch_task_status:set_update_frequency(1000),
-
-    % Until OTP R14B03:
-    %
-    % Restarting a temporary supervised child implies that the original 
arguments
-    % (#rep{} record) specified in the MFA component of the supervisor
-    % child spec will always be used whenever the child is restarted.
-    % This implies the same replication performance tunning parameters will
-    % always be used. The solution is to delete the child spec (see
-    % cancel_replication/1) and then start the replication again, but this is
-    % unfortunately not immune to race conditions.
-
-    couch_log:notice("Replication `~p` is using:~n"
-        "~c~p worker processes~n"
-        "~ca worker batch size of ~p~n"
-        "~c~p HTTP connections~n"
-        "~ca connection timeout of ~p milliseconds~n"
-        "~c~p retries per request~n"
-        "~csocket options are: ~s~s",
-        [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
-            MaxConns, $\t, get_value(connection_timeout, Options),
-            $\t, get_value(retries, Options),
-            $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
-            case StartSeq of
-            ?LOWEST_SEQ ->
-                "";
-            _ ->
-                io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
-            end]),
-
-    couch_log:debug("Worker pids are: ~p", [Workers]),
-
-    couch_replicator_manager:replication_started(Rep),
-
-    {ok, State#rep_state{
-            changes_queue = ChangesQueue,
-            changes_manager = ChangesManager,
-            changes_reader = ChangesReader,
-            workers = Workers
-        }
-    }.
-
-adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
-    Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
-    couch_log:notice(Msg, [RepId]),
-    Src#httpdb{http_connections = 2};
-
-adjust_maxconn(Src, _RepId) ->
-    Src.
-
-handle_info(shutdown, St) ->
-    {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
-    couch_log:error("Source database is down. Reason: ~p", [Why]),
-    {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
-    couch_log:error("Target database is down. Reason: ~p", [Why]),
-    {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
-    couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
-    {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) 
->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) 
->
-    couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
-    couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
-    {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
-    {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
-    couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
-    couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
-    {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
-    case Workers -- [Pid] of
-    Workers ->
-        couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
-        {noreply, State#rep_state{workers = Workers}};
-        %% not clear why a stop was here before
-        %%{stop, {unknown_process_died, Pid, normal}, State};
-    [] ->
-        catch unlink(State#rep_state.changes_manager),
-        catch exit(State#rep_state.changes_manager, kill),
-        do_last_checkpoint(State);
-    Workers2 ->
-        {noreply, State#rep_state{workers = Workers2}}
-    end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
-    State2 = cancel_timer(State),
-    case lists:member(Pid, Workers) of
-    false ->
-        {stop, {unknown_process_died, Pid, Reason}, State2};
-    true ->
-        couch_stats:increment_counter([couch_replicator, worker_deaths]),
-        couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
-        {stop, {worker_died, Pid, Reason}, State2}
-    end;
-
-handle_info(timeout, InitArgs) ->
-    try do_init(InitArgs) of {ok, State} ->
-        {noreply, State}
-    catch Class:Error ->
-        Stack = erlang:get_stacktrace(),
-        {stop, shutdown, {error, Class, Error, Stack, InitArgs}}
+-spec strip_url_creds(binary() | {[_]}) -> binary().
+strip_url_creds(Endpoint) ->
+    case couch_replicator_docs:parse_rep_db(Endpoint, [], []) of
+        #httpdb{url=Url} ->
+            iolist_to_binary(couch_util:url_strip_password(Url));
+        LocalDb when is_binary(LocalDb) ->
+            LocalDb
     end.
 
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
-    {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
-    gen_server:reply(From, ok),
-    NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
-    {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
-    #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = 
HighestDone,
-        current_through_seq = ThroughSeq, stats = Stats} = State) ->
-    gen_server:reply(From, ok),
-    {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
-    [] ->
-        {Seq, []};
-    [Seq | Rest] ->
-        {Seq, Rest};
-    [_ | _] ->
-        {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
-    end,
-    NewHighestDone = lists:max([HighestDone, Seq]),
-    NewThroughSeq = case NewSeqsInProgress of
-    [] ->
-        lists:max([NewThroughSeq0, NewHighestDone]);
-    _ ->
-        NewThroughSeq0
-    end,
-    couch_log:debug("Worker reported seq ~p, through seq was ~p, "
-        "new through seq is ~p, highest seq done was ~p, "
-        "new highest seq done is ~p~n"
-        "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
-        [Seq, ThroughSeq, NewThroughSeq, HighestDone,
-            NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    NewState = State#rep_state{
-        stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
-        current_through_seq = NewThroughSeq,
-        seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone
-    },
-    update_task(NewState),
-    {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
-    #rep_state{source = #db{name = DbName} = Source} = State) ->
-    {ok, NewSource} = couch_db:reopen(Source),
-    {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
-    #rep_state{target = #db{name = DbName} = Target} = State) ->
-    {ok, NewTarget} = couch_db:reopen(Target),
-    {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
-    #rep_state{rep_details = #rep{} = Rep} = State,
-    case couch_replicator_manager:continue(Rep) of
-    {true, _} ->
-        case do_checkpoint(State) of
-        {ok, NewState} ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, 
success]),
-            {noreply, NewState#rep_state{timer = start_timer(State)}};
-        Error ->
-            couch_stats:increment_counter([couch_replicator, checkpoints, 
failure]),
-            {stop, Error, State}
-        end;
-    {false, Owner} ->
-        couch_replicator_manager:replication_usurped(Rep, Owner),
-        {stop, shutdown, State}
-    end;
-
-handle_cast({report_seq, Seq},
-    #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
-    NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
-    {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
-    {ok, State}.
-
 
-headers_strip_creds([], Acc) ->
-    lists:reverse(Acc);
-headers_strip_creds([{Key, Value0} | Rest], Acc) ->
-    Value = case string:to_lower(Key) of
-    "authorization" ->
-        "****";
-    _ ->
-        Value0
-    end,
-    headers_strip_creds(Rest, [{Key, Value} | Acc]).
-
-
-httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
-    HttpDb#httpdb{
-        url = couch_util:url_strip_password(Url),
-        headers = headers_strip_creds(Headers, [])
-    };
-httpdb_strip_creds(LocalDb) ->
-    LocalDb.
-
-
-rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
-    Rep#rep{
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = 
Target} = State) ->
-    % #rep_state contains the source and target at the top level and also
-    % in the nested #rep_details record
-    State#rep_state{
-        rep_details = rep_strip_creds(Rep),
-        source = httpdb_strip_creds(Source),
-        target = httpdb_strip_creds(Target)
-    }.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
-    checkpoint_history = CheckpointHistory} = State) ->
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
-    couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
-    % cancelled replication throught ?MODULE:cancel_replication/1
-    couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
-    terminate_cleanup(State);
-
-terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
-    #rep{id=RepId} = InitArgs,
-    couch_stats:increment_counter([couch_replicator, failed_starts]),
-    CleanInitArgs = rep_strip_creds(InitArgs),
-    couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
-             [Class, Error, CleanInitArgs, Stack]),
-    case Error of
-    {unauthorized, DbUri} ->
-        NotifyError = {unauthorized, <<"unauthorized to access or create 
database ", DbUri/binary>>};
-    {db_not_found, DbUri} ->
-        NotifyError = {db_not_found, <<"could not open ", DbUri/binary>>};
-    _ ->
-        NotifyError = Error
-    end,
-    couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError);
-terminate(Reason, State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason).
-
-terminate_cleanup(State) ->
-    update_task(State),
-    stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
-    stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
-    couch_replicator_api_wrap:db_close(State#rep_state.source),
-    couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-format_status(_Opt, [_PDict, State]) ->
-    [{data, [{"State", state_strip_creds(State)}]}].
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
-    {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
-    highest_seq_done = Seq} = State) ->
-    case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
-    {ok, NewState} ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, 
success]),
-        {stop, normal, cancel_timer(NewState)};
-    Error ->
-        couch_stats:increment_counter([couch_replicator, checkpoints, 
failure]),
-        {stop, Error, State}
+-spec job(binary()) -> {ok, {[_]}} | {error, not_found}.
+job(JobId0) when is_binary(JobId0) ->
+    JobId = couch_replicator_ids:convert(JobId0),
+    {Res, _Bad} = rpc:multicall(couch_replicator_scheduler, job, [JobId]),
+    case [JobInfo || {ok, JobInfo} <- Res] of
+        [JobInfo| _] ->
+            {ok, JobInfo};
+        [] ->
+            {error, not_found}
     end.
 
 
-start_timer(State) ->
-    After = State#rep_state.checkpoint_interval,
-    case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
-    {ok, Ref} ->
-        Ref;
-    Error ->
-        couch_log:error("Replicator, error scheduling checkpoint:  ~p", 
[Error]),
-        nil
+-spec active_doc(binary(), binary()) -> {ok, {[_]}} | {error, not_found}.
+active_doc(DbName, DocId) ->
+    Nodes = try
+        lists:usort([Shard#shard.node || Shard <- mem3:shards(DbName)])
+    catch
+        % Might be a local database
+        error:database_does_not_exist ->
+            [node() | nodes()]
+    end,
+    {Res, _Bad} = rpc:multicall(Nodes, couch_replicator_doc_processor, doc,
+        [DbName, DocId]),
+    case [DocInfo || {ok, DocInfo} <- Res] of
+        [DocInfo | _] ->
+            {ok, DocInfo};
+        [] ->
+            {error, not_found}
     end.
 
 
-cancel_timer(#rep_state{timer = nil} = State) ->
-    State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
-    {ok, cancel} = timer:cancel(Timer),
-    State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
-    #rep{
-        id = {BaseId, _Ext},
-        source = Src0, target = Tgt,
-        options = Options, user_ctx = UserCtx,
-        type = Type, view = View
-    } = Rep,
-    % Adjust minimum number of http source connections to 2 to avoid deadlock
-    Src = adjust_maxconn(Src0, BaseId),
-    {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, 
UserCtx}]),
-    {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, 
UserCtx}],
-        get_value(create_target, Options, false)),
-
-    {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
-    {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
-    [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
-    {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
-    StartSeq = {0, StartSeq1},
-
-    SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
-
-    #doc{body={CheckpointHistory}} = SourceLog,
-    State = #rep_state{
-        rep_details = Rep,
-        source_name = couch_replicator_api_wrap:db_uri(Source),
-        target_name = couch_replicator_api_wrap:db_uri(Target),
-        source = Source,
-        target = Target,
-        history = History,
-        checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
-        start_seq = StartSeq,
-        current_through_seq = StartSeq,
-        committed_seq = StartSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = httpd_util:rfc1123_date(),
-        src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
-        tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
-        session_id = couch_uuids:random(),
-        source_db_compaction_notifier =
-            start_db_compaction_notifier(Source, self()),
-        target_db_compaction_notifier =
-            start_db_compaction_notifier(Target, self()),
-        source_monitor = db_monitor(Source),
-        target_monitor = db_monitor(Target),
-        source_seq = SourceSeq,
-        use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options,
-                                        ?DEFAULT_CHECKPOINT_INTERVAL),
-        type = Type,
-        view = View
-    },
-    State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
-    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
-    fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
-    lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
-    case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
-    {error, <<"not_found">>} when Vsn > 1 ->
-        OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
-        fold_replication_logs(Dbs, Vsn - 1,
-            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
-    {error, <<"not_found">>} ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | 
Acc]);
-    {ok, Doc} when LogId =:= NewId ->
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
-    {ok, Doc} ->
-        MigratedLog = #doc{id = NewId, body = Doc#doc.body},
-        fold_replication_logs(
-            Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+-spec doc(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc(RepDb, DocId, UserCtx) ->
+    case active_doc(RepDb, DocId) of
+        {ok, DocInfo} ->
+            {ok, DocInfo};
+        {error, not_found} ->
+            doc_from_db(RepDb, DocId, UserCtx)
     end.
 
 
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
-    spawn_link(fun() ->
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
-    end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
-    receive
-    {get_changes, From} ->
-        case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
-        closed ->
-            From ! {closed, self()};
-        {ok, Changes} ->
-            #doc_info{high_seq = Seq} = lists:last(Changes),
-            ReportSeq = {Ts, Seq},
-            ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
-            From ! {changes, self(), Changes, ReportSeq}
-        end,
-        changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+-spec doc_from_db(binary(), binary(), [_]) -> {ok, {[_]}} | {error, not_found}.
+doc_from_db(RepDb, DocId, UserCtx) ->
+    case fabric:open_doc(RepDb, DocId, [UserCtx, ejson_body]) of
+        {ok, Doc} ->
+            {ok, info_from_doc(RepDb, couch_doc:to_json_obj(Doc, []))};
+         {not_found, _Reason} ->
+            {error, not_found}
     end.
 
 
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
-    NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, 
false}]} },
-    {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) 
->
-    update_task(State),
-    {ok, State};
-do_checkpoint(State) ->
-    #rep_state{
-        source_name=SourceName,
-        target_name=TargetName,
-        source = Source,
-        target = Target,
-        history = OldHistory,
-        start_seq = {_, StartSeq},
-        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
-        source_log = SourceLog,
-        target_log = TargetLog,
-        rep_starttime = ReplicationStartTime,
-        src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime,
-        stats = Stats,
-        rep_details = #rep{options = Options},
-        session_id = SessionId
-    } = State,
-    case commit_to_both(Source, Target) of
-    {source_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
-    {target_error, Reason} ->
-         {checkpoint_commit_failure,
-             <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
-    {SrcInstanceStartTime, TgtInstanceStartTime} ->
-        couch_log:notice("recording a checkpoint for `~s` -> `~s` at source 
update_seq ~p",
-            [SourceName, TargetName, NewSeq]),
-        StartTime = ?l2b(ReplicationStartTime),
-        EndTime = ?l2b(httpd_util:rfc1123_date()),
-        NewHistoryEntry = {[
-            {<<"session_id">>, SessionId},
-            {<<"start_time">>, StartTime},
-            {<<"end_time">>, EndTime},
-            {<<"start_last_seq">>, StartSeq},
-            {<<"end_last_seq">>, NewSeq},
-            {<<"recorded_seq">>, NewSeq},
-            {<<"missing_checked">>, 
couch_replicator_stats:missing_checked(Stats)},
-            {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
-            {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-            {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
-            {<<"doc_write_failures">>, 
couch_replicator_stats:doc_write_failures(Stats)}
-        ]},
-        BaseHistory = [
-            {<<"session_id">>, SessionId},
-            {<<"source_last_seq">>, NewSeq},
-            {<<"replication_id_version">>, ?REP_ID_VERSION}
-        ] ++ case get_value(doc_ids, Options) of
-        undefined ->
-            [];
-        _DocIds ->
-            % backwards compatibility with the result of a replication by
-            % doc IDs in versions 0.11.x and 1.0.x
-            % TODO: deprecate (use same history format, simplify code)
-            [
-                {<<"start_time">>, StartTime},
-                {<<"end_time">>, EndTime},
-                {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
-                {<<"docs_written">>, 
couch_replicator_stats:docs_written(Stats)},
-                {<<"doc_write_failures">>, 
couch_replicator_stats:doc_write_failures(Stats)}
-            ]
-        end,
-        % limit history to 50 entries
-        NewRepHistory = {
-            BaseHistory ++
-            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 
50)}]
-        },
-
-        try
-            {SrcRevPos, SrcRevId} = update_checkpoint(
-                Source, SourceLog#doc{body = NewRepHistory}, source),
-            {TgtRevPos, TgtRevId} = update_checkpoint(
-                Target, TargetLog#doc{body = NewRepHistory}, target),
-            NewState = State#rep_state{
-                checkpoint_history = NewRepHistory,
-                committed_seq = NewTsSeq,
-                source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
-                target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
-            },
-            update_task(NewState),
-            {ok, NewState}
-        catch throw:{checkpoint_commit_failure, _} = Failure ->
-            Failure
-        end;
-    {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Target database out of sync. "
-            "Try to increase max_dbs_open at the target's server.">>};
-    {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source database out of sync. "
-            "Try to increase max_dbs_open at the source's server.">>};
-    {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
-        {checkpoint_commit_failure, <<"Source and target databases out of "
-            "sync. Try to increase max_dbs_open at both servers.">>}
+-spec info_from_doc(binary(), {[_]}) -> {[_]}.
+info_from_doc(RepDb, {Props}) ->
+    DocId = get_value(<<"_id">>, Props),
+    Source = get_value(<<"source">>, Props),
+    Target = get_value(<<"target">>, Props),
+    State0 = state_atom(get_value(<<"_replication_state">>, Props, null)),
+    StateTime = get_value(<<"_replication_state_time">>, Props, null),
+    {State1, StateInfo, ErrorCount, StartTime} = case State0 of
+        completed ->
+            {InfoP} = get_value(<<"_replication_stats">>, Props, {[]}),
+            case lists:keytake(<<"start_time">>, 1, InfoP) of
+                {value, {_, Time}, InfoP1} ->
+                    {State0, {InfoP1}, 0, Time};
+                false ->
+                    case lists:keytake(start_time, 1, InfoP) of
+                        {value, {_, Time}, InfoP1} ->
+                            {State0, {InfoP1}, 0, Time};
+                        false ->
+                            {State0, {InfoP}, 0, null}
+                        end
+            end;
+        failed ->
+            Info = get_value(<<"_replication_state_reason">>, Props, null),
+            {State0, Info, 1, StateTime};
+        _OtherState ->
+            {null, null, 0, null}
+    end,
+    {[
+        {doc_id, DocId},
+        {database, RepDb},
+        {id, null},
+        {source, strip_url_creds(Source)},
+        {target, strip_url_creds(Target)},
+        {state, State1},
+        {error_count, ErrorCount},
+        {info, StateInfo},
+        {start_time, StartTime},
+        {last_updated, StateTime}
+     ]}.
+
+
+state_atom(<<"triggered">>) ->
+    triggered;  % This handles a legacy case were document wasn't converted yet
+state_atom(State) when is_binary(State) ->
+    erlang:binary_to_existing_atom(State, utf8);
+state_atom(State) when is_atom(State) ->
+    State.
+
+
+-spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found.
+check_authorization(RepId, #user_ctx{name = Name} = Ctx) ->
+    case couch_replicator_scheduler:rep_state(RepId) of
+    #rep{user_ctx = #user_ctx{name = Name}} ->
+        ok;
+    #rep{} ->
+        couch_httpd:verify_is_server_admin(Ctx);
 
 Review comment:
   My concern is that `couch_db_plugin:check_is_admin/1` wouldn't be called [as 
we do it here for 
example](https://github.com/apache/couchdb/blob/master/src/couch/src/couch_db.erl#L434).
 Which means we wouldn't be able to override the check mechanics. We can solve 
it by modifying couch_httpd.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to