nickva commented on a change in pull request #1972: Introduce Shard Splitting To CouchDB URL: https://github.com/apache/couchdb/pull/1972#discussion_r270205719
########## File path: src/mem3/src/mem3_rep.erl ########## @@ -369,58 +414,92 @@ calculate_start_seq(Acc) -> Seq = TargetSeq, History = couch_util:get_value(<<"history">>, TProps, {[]}) end, - Acc1#acc{seq = Seq, history = History}; + Tgt1#tgt{seq = Seq, history = History}; {not_found, _} -> - compare_epochs(Acc1) + compare_epochs(Db, Tgt1) end. -compare_epochs(Acc) -> + +push_changes(#acc{} = Acc0) -> #acc{ - db = Db, - target = #shard{node=Node, name=Name} - } = Acc, + db = Db0, + seq = Seq + } = Acc0, + + % Avoid needless rewriting the internal replication + % checkpoint document if nothing is replicated. + UpdateSeq = couch_db:get_update_seq(Db0), + if Seq < UpdateSeq -> ok; true -> + throw({finished, 0}) + end, + + with_src_db(Acc0, fun(Db) -> + Acc1 = Acc0#acc{db = Db}, + Fun = fun ?MODULE:changes_enumerator/2, + {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), + {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2), + {ok, couch_db:count_changes_since(Db, LastSeq)} + end). + + +compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) -> + #shard{node = Node, name = Name} = TgtShard, UUID = couch_db:get_uuid(Db), Epochs = couch_db:get_epochs(Db), Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs), - Acc#acc{seq = Seq, history = {[]}}. + Tgt#tgt{seq = Seq, history = {[]}}. + changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) -> {ok, FDI} = couch_db:get_full_doc_info(Db, DocId), changes_enumerator(FDI, Acc); -changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) -> - #doc_info{ - high_seq=Seq, - revs=Revs - } = couch_doc:to_doc_info(FDI), - {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of - keep -> {C + length(Revs), [FDI | Infos]}; - discard -> {C, Infos} +changes_enumerator(#full_doc_info{}=FDI, #acc{}=Acc0) -> + #acc{revcount = C, targets = Targets0, hashfun = HashFun} = Acc0, + #doc_info{high_seq=Seq, revs=Revs} = couch_doc:to_doc_info(FDI), + {Count, Targets} = case filter_doc(Acc0#acc.filter, FDI) of + keep -> {C + length(Revs), changes_append_fdi(FDI, Targets0, HashFun)}; + discard -> {C, Targets0} end, - Acc1 = Acc0#acc{ - seq=Seq, - revcount=Count, - infos=NewInfos - }, + Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets}, Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end, {Go, Acc1}. -replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> - case find_missing_revs(Acc) of - [] -> - ok; - Missing -> - lists:map(fun(Chunk) -> - Docs = open_docs(Acc, Chunk), +changes_append_fdi(#full_doc_info{id = Id} = FDI, Targets, HashFun) -> + case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of + not_in_range -> + Targets; Review comment: Updated with a fixup commit to do this ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services