We don't have to create extra pipes to pass around, actually.
Sending records via send/sendmsg isn't noticeably more expensive
than writing to a pipe.
---
 lib/PublicInbox/CodeSearchIdx.pm | 60 +++++++++++++++-----------------
 1 file changed, 29 insertions(+), 31 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 2700744d..f9251be5 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -758,28 +758,31 @@ sub scan_git_dirs ($) {
        progress($self, "scanning $n code repositories...");
 }
 
-sub prune_do { # via wq_io_do in IDX_SHARDS
+sub prune_init { # via wq_io_do in IDX_SHARDS
        my ($self) = @_;
-       my $gone = delete $self->{0} // die 'BUG: no {0} gone input';
-       my $prune_op_p = delete $self->{1} // die 'BUG: no {1} op_p';
+       $self->{nr_prune} = 0;
        $TXN_BYTES = $BATCH_BYTES;
        $self->begin_txn_lazy;
-       my $xdb = $self->{xdb};
-       my $nr = 0;
-       local $/ = "\0";
-       while (my $p = <$gone>) { # Q$cmt or P$git_dir
-               chomp $p;
-               my @docids = docids_by_postlist($self, $p);
-               for (@docids) {
-                       $TXN_BYTES -= $xdb->get_doclength($_) * 42;
-                       $xdb->delete_document($_);
-               }
-               ++$nr;
-               $TXN_BYTES < 0 and
-                       cidx_ckpoint($self, "prune [$self->{shard}] $nr");
+}
+
+sub prune_one { # via wq_io_do in IDX_SHARDS
+       my ($self, $term) = @_;
+       my @docids = docids_by_postlist($self, $term);
+       for (@docids) {
+               $TXN_BYTES -= $self->{xdb}->get_doclength($_) * 42;
+               $self->{xdb}->delete_document($_);
        }
-       send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
+       ++$self->{nr_prune};
+       $TXN_BYTES < 0 and
+               cidx_ckpoint($self, "prune [$self->{shard}] $self->{nr_prune}");
+}
+
+sub prune_commit { # via wq_io_do in IDX_SHARDS
+       my ($self) = @_;
+       my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
+       my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
        cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
+       send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
 }
 
 sub shards_active { # post_loop_do
@@ -1025,25 +1028,20 @@ EOM
 sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
        my ($self, $comm_rd) = @_;
        return if $DO_QUIT;
-       my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{prune_done} = [ $self ];
-       my @gone;
-       for my $n (0..$#IDX_SHARDS) {
-               pipe(my ($r, $w)) or die "pipe: $!";
-               push @gone, $w;
-               $IDX_SHARDS[$n]->wq_io_do('prune_do', [$r, $p->{op_p}]);
-       }
+       $_->wq_do('prune_init') for @IDX_SHARDS;
        while (defined(my $cmt = <$comm_rd>)) {
-               chomp $cmt;
-               my $n = hex(substr($cmt, 0, 8)) % scalar(@gone);
-               print { $gone[$n] } 'Q', $cmt, "\0" or die "print: $!";
+               chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)";
+               my $n = hex(substr($cmt, 0, 8)) % scalar(@IDX_SHARDS);
+               $IDX_SHARDS[$n]->wq_do('prune_one', 'Q'.$cmt);
                last if $DO_QUIT;
        }
        for my $git_dir (@GIT_DIR_GONE) {
-               my $n = git_dir_hash($git_dir) % scalar(@gone);
-               print { $gone[$n] } 'P', $git_dir, "\0" or die "print: $!";
+               my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS);
+               $IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir);
        }
-       for (@gone) { close $_ or die "close: $!" };
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{prune_done} = [ $self ];
+       $_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS;
 }
 
 sub init_associate_prefork ($) {

Reply via email to