nickva commented on a change in pull request #1972: Introduce Shard Splitting To CouchDB URL: https://github.com/apache/couchdb/pull/1972#discussion_r270163090
########## File path: src/mem3/src/mem3_rep.erl ########## @@ -369,58 +414,92 @@ calculate_start_seq(Acc) -> Seq = TargetSeq, History = couch_util:get_value(<<"history">>, TProps, {[]}) end, - Acc1#acc{seq = Seq, history = History}; + Tgt1#tgt{seq = Seq, history = History}; {not_found, _} -> - compare_epochs(Acc1) + compare_epochs(Db, Tgt1) end. -compare_epochs(Acc) -> + +push_changes(#acc{} = Acc0) -> #acc{ - db = Db, - target = #shard{node=Node, name=Name} - } = Acc, + db = Db0, + seq = Seq + } = Acc0, + + % Avoid needless rewriting the internal replication + % checkpoint document if nothing is replicated. + UpdateSeq = couch_db:get_update_seq(Db0), + if Seq < UpdateSeq -> ok; true -> + throw({finished, 0}) + end, + + with_src_db(Acc0, fun(Db) -> + Acc1 = Acc0#acc{db = Db}, + Fun = fun ?MODULE:changes_enumerator/2, + {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), + {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2), + {ok, couch_db:count_changes_since(Db, LastSeq)} + end). + + +compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) -> + #shard{node = Node, name = Name} = TgtShard, UUID = couch_db:get_uuid(Db), Epochs = couch_db:get_epochs(Db), Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs), - Acc#acc{seq = Seq, history = {[]}}. + Tgt#tgt{seq = Seq, history = {[]}}. + changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) -> {ok, FDI} = couch_db:get_full_doc_info(Db, DocId), changes_enumerator(FDI, Acc); -changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) -> - #doc_info{ - high_seq=Seq, - revs=Revs - } = couch_doc:to_doc_info(FDI), - {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of - keep -> {C + length(Revs), [FDI | Infos]}; - discard -> {C, Infos} +changes_enumerator(#full_doc_info{}=FDI, #acc{}=Acc0) -> + #acc{revcount = C, targets = Targets0, hashfun = HashFun} = Acc0, + #doc_info{high_seq=Seq, revs=Revs} = couch_doc:to_doc_info(FDI), + {Count, Targets} = case filter_doc(Acc0#acc.filter, FDI) of + keep -> {C + length(Revs), changes_append_fdi(FDI, Targets0, HashFun)}; + discard -> {C, Targets0} end, - Acc1 = Acc0#acc{ - seq=Seq, - revcount=Count, - infos=NewInfos - }, + Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets}, Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end, {Go, Acc1}. -replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> - case find_missing_revs(Acc) of - [] -> - ok; - Missing -> - lists:map(fun(Chunk) -> - Docs = open_docs(Acc, Chunk), +changes_append_fdi(#full_doc_info{id = Id} = FDI, Targets, HashFun) -> + case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of + not_in_range -> + Targets; Review comment: Yeah for cases where we replicate from [0, 10] -> [5, 15] we'd only want to send 5, 10 docs to [5, 15] range. This won't technically happen until we have shard merging ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services