Replace _local doc sequence with revision trees There are a number of cases that _local docs might need to be merged between nodes. One motivating case is for clusters that might wish to move _local docs across nodes to maintain replication checkpoints with external CouchDB instances. The previous _local docs strategy of a single linear sequence breaks down in this situation.
This new behavior should be indistinguishable from the previous behavior assuming clients did not try and introspect the _rev value for _local documents. It should be impossible for normal HTTP clients to introduce different behavior than previously existed because there's no support for non-linear updates at the HTTP level. This update is merely and internal refactoring for special cases like clusters. Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/eb4138f1 Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/eb4138f1 Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/eb4138f1 Branch: refs/heads/new-security-object Commit: eb4138f196f15364e293128cacc2c5011bb28b69 Parents: 0ab5ebd Author: Paul Joseph Davis <dav...@apache.org> Authored: Thu Jan 19 18:10:05 2012 -0600 Committer: Paul Joseph Davis <dav...@apache.org> Committed: Wed Jan 25 01:14:07 2012 -0600 ---------------------------------------------------------------------- src/couch_replicator/src/couch_replicator.erl | 9 +- src/couchdb/couch_db.erl | 21 +++- src/couchdb/couch_db_updater.erl | 116 ++++++++++---------- 3 files changed, 81 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couch_replicator/src/couch_replicator.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl index 1f7c08a..53acfd5 100644 --- a/src/couch_replicator/src/couch_replicator.erl +++ b/src/couch_replicator/src/couch_replicator.erl @@ -580,15 +580,18 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> fold_replication_logs(Dbs, Vsn - 1, ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc); {error, <<"not_found">>} -> + Doc0 = #doc{id = NewId}, + Doc1 = Doc0#doc{revs = {1, [couch_db:new_revid(Doc0)]}}, fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]); + Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc1 | Acc]); {ok, Doc} when LogId =:= NewId -> fold_replication_logs( Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]); {ok, Doc} -> - MigratedLog = #doc{id = NewId, body = Doc#doc.body}, + Log0 = #doc{id = NewId, body = Doc#doc.body}, + Log1 = Log0#doc{revs = {1, [couch_db:new_revid(Log0)]}}, fold_replication_logs( - Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc]) + Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Log1 | Acc]) end. http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couchdb/couch_db.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index ae21bfa..a6903c4 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -717,16 +717,18 @@ update_docs(Db, Docs, Options, interactive_edit) -> % associate reference with each doc in order to track duplicates Docs2 = lists:map(fun(Doc) -> {Doc, make_ref()} end,Docs), - {Docs3, NonRepDocs} = lists:foldl( + {Docs3, NonRepDocs0} = lists:foldl( fun({#doc{id=Id},_Ref}=Doc, {DocsAcc, NonRepDocsAcc}) -> case Id of <<?LOCAL_DOC_PREFIX, _/binary>> -> - {DocsAcc, [Doc | NonRepDocsAcc]}; + {DocsAcc, [[Doc] | NonRepDocsAcc]}; Id-> {[Doc | DocsAcc], NonRepDocsAcc} end end, {[], []}, Docs2), + {NonRepDocs, _} = new_revs(NonRepDocs0, [], []), + DocBuckets = before_docs_update(Db, group_alike_docs(Docs3)), case (Db#db.validate_doc_funs /= []) orelse @@ -826,8 +828,9 @@ collect_results(UpdatePid, MRef, ResultsAcc) -> end. write_and_commit(#db{update_pid=UpdatePid}=Db, DocBuckets1, - NonRepDocs, Options0) -> + NonRepDocs1, Options0) -> DocBuckets = prepare_doc_summaries(Db, DocBuckets1), + NonRepDocs = prepare_doc_summaries(Db, NonRepDocs1), Options = set_commit_option(Options0), MergeConflicts = lists:member(merge_conflicts, Options), FullCommit = lists:member(full_commit, Options), @@ -1182,9 +1185,15 @@ open_doc_revs_int(Db, IdRevs, Options) -> open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) -> case couch_btree:lookup(local_btree(Db), [Id]) of - [{ok, {_, {Rev, BodyData}}}] -> - Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData}, - apply_open_options({ok, Doc}, Options); + [{ok, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo}] -> + #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} = + DocInfo = couch_doc:to_doc_info(FullDocInfo), + {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), + Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), + apply_open_options( + {ok, Doc#doc{ + meta=doc_meta_info(DocInfo, RevTree, Options) + }}, Options); [not_found] -> {not_found, missing} end; http://git-wip-us.apache.org/repos/asf/couchdb/blob/eb4138f1/src/couchdb/couch_db_updater.erl ---------------------------------------------------------------------- diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 0bfe951..862c48a 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -183,12 +183,8 @@ handle_call({compact_done, CompactFilepath}, _From, #db{filepath=Filepath}=Db) - case Db#db.update_seq == NewSeq of true -> % suck up all the local docs into memory and write them to the new db - {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), - - NewDb2 = commit_data(NewDb#db{ - local_docs_btree = NewLocalBtree, + NewDb1 = copy_local_docs(Db, NewDb), + NewDb2 = commit_data(NewDb1#db{ main_pid = Db#db.main_pid, filepath = Filepath, instance_start_time = Db#db.instance_start_time, @@ -449,7 +445,11 @@ init_db(DbName, Filepath, Fd, ReaderFd, Header0, Options) -> {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}, {compression, Compression}]), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd, - [{compression, Compression}]), + [ + {split, fun(X) -> btree_by_id_split(X) end}, + {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, + {compression, Compression} + ]), case Header#db_header.security_ptr of nil -> Security = [], @@ -680,45 +680,34 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> update_local_docs(Db, []) -> {ok, Db}; update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || {_Client, {#doc{id=Id}, _Ref}} <- Docs], - OldDocLookups = couch_btree:lookup(Btree, Ids), - BtreeEntries = lists:zipwith( - fun({Client, {#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, Ref}}, OldDocLookup) -> - case PrevRevs of - [RevStr|_] -> - PrevRev = list_to_integer(?b2l(RevStr)); - [] -> - PrevRev = 0 - end, - OldRev = - case OldDocLookup of - {ok, {_, {OldRev0, _}}} -> OldRev0; - not_found -> 0 - end, - case OldRev == PrevRev of - true -> - case Delete of - false -> - send_result(Client, Ref, {ok, - {0, ?l2b(integer_to_list(PrevRev + 1))}}), - {update, {Id, {PrevRev + 1, Body}}}; - true -> - send_result(Client, Ref, - {ok, {0, <<"0">>}}), - {remove, Id} - end; - false -> + Options = [{revs_limit, Db#db.revs_limit}], + ZipFun = fun + (_Id, {ok, FullDocInfo}) -> FullDocInfo; + (Id, not_found) -> #full_doc_info{id=Id} + end, + FoldFun = fun({OldInfo, {Client, [{NewDoc, Ref}]}}, Acc) -> + case couch_doc:merge(OldInfo, NewDoc, Options) of + {ok, NewInfo} -> + #doc_info{ + revs=[#rev_info{rev={NewPos, NewRev}} | _] + } = couch_doc:to_doc_info(NewInfo), + send_result(Client, Ref, {ok, {NewPos, NewRev}}), + [NewInfo | Acc]; + {ok, NewInfo, NewRev} -> + send_result(Client, Ref, {ok, NewRev}), + [NewInfo | Acc]; + conflict -> send_result(Client, Ref, conflict), - ignore - end - end, Docs, OldDocLookups), - - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], - - {ok, Btree2} = - couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - + Acc + end + end, + Ids = [Id || {_Client, [{#doc{id=Id}, _Ref}]} <- Docs], + OldInfos0 = couch_btree:lookup(Btree, Ids), + OldInfos = lists:zipwith(ZipFun, Ids, OldInfos0), + Pairs = lists:zip(OldInfos, Docs), + NewInfos = lists:foldl(FoldFun, [], Pairs), + {ok, FlushedInfos} = flush_trees(Db, NewInfos, []), + {ok, Btree2} = couch_btree:add(Btree, FlushedInfos), {ok, Db#db{local_docs_btree = Btree2}}. @@ -852,6 +841,31 @@ copy_docs(Db, #db{updater_fd = DestFd} = NewDb, InfoBySeq0, Retry) -> docinfo_by_seq_btree=DocInfoBTree}. +copy_local_docs(Db, #db{revs_limit=Limit, updater_fd = DestFd}=NewDb) -> + FoldFun = fun(#full_doc_info{rev_tree=RevTree}=Info, Acc) -> + NewRevTree0 = couch_key_tree:map(fun + (_, _, branch) -> + ?REV_MISSING; + (_Rev, LeafVal, leaf) -> + IsDel = element(1, LeafVal), + Sp = element(2, LeafVal), + Seq = element(3, LeafVal), + {_Body, AttsInfo} = Summary = copy_doc_attachments( + Db, Sp, DestFd), + SummaryChunk = make_doc_summary(NewDb, Summary), + {ok, Pos, SummarySize} = couch_file:append_raw_chunk( + DestFd, SummaryChunk), + TotalLeafSize = lists:foldl( + fun({_, _, _, AttLen, _, _, _, _}, S) -> S + AttLen end, + SummarySize, AttsInfo), + {IsDel, Pos, Seq, TotalLeafSize} + end, RevTree), + NewRevTree = couch_key_tree:stem(NewRevTree0, Limit), + {ok, [Info#full_doc_info{rev_tree=NewRevTree} | Acc]} + end, + {ok, _, NewInfos} = couch_btree:foldl(Db#db.local_docs_btree, FoldFun, []), + {ok, LocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, NewInfos), + NewDb#db{local_docs_btree=LocalBtree}. copy_compact(Db, NewDb0, Retry) -> FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], @@ -911,17 +925,7 @@ copy_compact(Db, NewDb0, Retry) -> NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), TotalChanges = couch_task_status:get(changes_done), - % copy misc header values - if NewDb3#db.security /= Db#db.security -> - {ok, Ptr, _} = couch_file:append_term( - NewDb3#db.updater_fd, Db#db.security, - [{compression, NewDb3#db.compression}]), - NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; - true -> - NewDb4 = NewDb3 - end, - - commit_data(NewDb4#db{update_seq=Db#db.update_seq}). + commit_data(NewDb3#db{update_seq=Db#db.update_seq}). start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) -> CompactFile = Filepath ++ ".compact",