This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch auto-delete-3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/auto-delete-3 by this push: new c7af6527c teach replicator to make peer checkpoint c7af6527c is described below commit c7af6527c8fd25277cca782e9f36d492177a07cd Author: Robert Newson <rnew...@apache.org> AuthorDate: Thu Mar 20 17:08:55 2025 +0000 teach replicator to make peer checkpoint --- .../src/couch_replicator_scheduler_job.erl | 36 ++++++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 62e604b5e..f92019cda 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -678,6 +678,7 @@ init_state(Rep) -> StartSeq = {0, StartSeq1}, SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), + create_peer_checkpoint_doc_if_missing(Source, BaseId, SourceSeq), #doc{body = {CheckpointHistory}} = SourceLog, State = #rep_state{ @@ -809,7 +810,7 @@ do_checkpoint(State) -> src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats, - rep_details = #rep{options = Options}, + rep_details = #rep{id = {BaseId, _}, options = Options}, session_id = SessionId } = State, case commit_to_both(Source, Target) of @@ -886,7 +887,7 @@ do_checkpoint(State) -> {TgtRevPos, TgtRevId} = update_checkpoint( Target, TargetLog#doc{body = NewRepHistory}, target ), - %% TODO update_checkpoint(Source, peer_checkpoint_doc(State), source), + update_checkpoint(Source, peer_checkpoint_doc(BaseId, NewSeq), source), NewState = State#rep_state{ checkpoint_history = NewRepHistory, committed_seq = NewTsSeq, @@ -913,14 +914,29 @@ do_checkpoint(State) -> >>} end. -peer_checkpoint_doc(#rep_state{} = State) -> - #rep_state{ - session_id = SessionId - } = State, - #doc{ - id = <<"peer-checkpoint-", SessionId/binary>>, - body = {[{<<"update_seq">>, State#rep_state.committed_seq}]} - }. +create_peer_checkpoint_doc_if_missing(#httpdb{} = Db, BaseId, SourceSeq) when + is_list(BaseId), is_binary(SourceSeq) +-> + case couch_replicator_api_wrap:open_doc(Db, peer_checkpoint_id(BaseId), []) of + {ok, _} -> + ok; + {error, <<"not_found">>} -> + Doc = peer_checkpoint_doc(BaseId, SourceSeq), + case couch_replicator_api_wrap:update_doc(Db, Doc, []) of + {ok, _} -> + ok; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end. + +peer_checkpoint_doc(BaseId, Seq) -> + #doc{id = peer_checkpoint_id(BaseId), body = {[{<<"update_seq">>, Seq}]}}. + +peer_checkpoint_id(BaseId) when is_list(BaseId) -> + ?l2b(?LOCAL_DOC_PREFIX ++ "peer-checkpoint-repl-" ++ BaseId). update_checkpoint(Db, Doc, DbType) -> try