nickva commented on a change in pull request #1972: Introduce Shard Splitting 
To CouchDB
URL: https://github.com/apache/couchdb/pull/1972#discussion_r270163090
 
 

 ##########
 File path: src/mem3/src/mem3_rep.erl
 ##########
 @@ -369,58 +414,92 @@ calculate_start_seq(Acc) ->
                     Seq = TargetSeq,
                     History = couch_util:get_value(<<"history">>, TProps, {[]})
             end,
-            Acc1#acc{seq = Seq, history = History};
+            Tgt1#tgt{seq = Seq, history = History};
         {not_found, _} ->
-            compare_epochs(Acc1)
+            compare_epochs(Db, Tgt1)
     end.
 
-compare_epochs(Acc) ->
+
+push_changes(#acc{} = Acc0) ->
     #acc{
-        db = Db,
-        target = #shard{node=Node, name=Name}
-    } = Acc,
+        db = Db0,
+        seq = Seq
+    } = Acc0,
+
+    % Avoid needless rewriting the internal replication
+    % checkpoint document if nothing is replicated.
+    UpdateSeq = couch_db:get_update_seq(Db0),
+    if Seq < UpdateSeq -> ok; true ->
+        throw({finished, 0})
+    end,
+
+    with_src_db(Acc0, fun(Db) ->
+        Acc1 = Acc0#acc{db = Db},
+        Fun = fun ?MODULE:changes_enumerator/2,
+        {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+        {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2),
+        {ok, couch_db:count_changes_since(Db, LastSeq)}
+    end).
+
+
+compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) ->
+    #shard{node = Node, name = Name} = TgtShard,
     UUID = couch_db:get_uuid(Db),
     Epochs = couch_db:get_epochs(Db),
     Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
-    Acc#acc{seq = Seq, history = {[]}}.
+    Tgt#tgt{seq = Seq, history = {[]}}.
+
 
 changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
     changes_enumerator(FDI, Acc);
-changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
-    #doc_info{
-        high_seq=Seq,
-        revs=Revs
-    } = couch_doc:to_doc_info(FDI),
-    {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of
-        keep -> {C + length(Revs), [FDI | Infos]};
-        discard -> {C, Infos}
+changes_enumerator(#full_doc_info{}=FDI, #acc{}=Acc0) ->
+    #acc{revcount = C, targets = Targets0, hashfun = HashFun} = Acc0,
+    #doc_info{high_seq=Seq, revs=Revs} = couch_doc:to_doc_info(FDI),
+    {Count, Targets} = case filter_doc(Acc0#acc.filter, FDI) of
+        keep -> {C + length(Revs), changes_append_fdi(FDI, Targets0, HashFun)};
+        discard -> {C, Targets0}
     end,
-    Acc1 = Acc0#acc{
-        seq=Seq,
-        revcount=Count,
-        infos=NewInfos
-    },
+    Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets},
     Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
     {Go, Acc1}.
 
 
-replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
-    case find_missing_revs(Acc) of
-    [] ->
-        ok;
-    Missing ->
-        lists:map(fun(Chunk) ->
-            Docs = open_docs(Acc, Chunk),
+changes_append_fdi(#full_doc_info{id = Id} = FDI, Targets, HashFun) ->
+    case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of
+        not_in_range ->
+            Targets;
 
 Review comment:
   Yeah for cases where we replicate from [0, 10] -> [5, 15] we'd only want to 
send 5, 10 docs to [5, 15] range. This won't technically happen until we have 
shard merging

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to