While the {inflight} array should be tied to the IO object even
more tightly, that's not an easy task with our current code.  So
take some small steps by introducing a gcf_inflight helper to
validate the ownership of the process and to drain the inflight
array via the awaitpid callback.

This hopefully fix problems with t/lei-q-save.t (still) hanging
occasionally on v2 outputs since git->cleanup/->DESTROY was getting
called in v2 shard workers.
---
 lib/PublicInbox/Gcf2Client.pm  |  6 ++-
 lib/PublicInbox/Git.pm         | 79 ++++++++++++++++++++--------------
 lib/PublicInbox/GitAsyncCat.pm |  2 +-
 lib/PublicInbox/IO.pm          | 17 ++++++--
 4 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 19d77e32..07ff7dcb 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -31,15 +31,16 @@ sub new  {
        $opt->{0} = $opt->{1} = $s2;
        my $cmd = [$^X, $^W ? ('-w') : (),
                        qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-       PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt));
        $self->{inflight} = [];
+       PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt),
+                       \&PublicInbox::Git::gcf_drain, $self->{inflight});
        $self->{epwatch} = \undef; # for Git->cleanup
        $self->SUPER::new($s1, EPOLLIN);
 }
 
 sub gcf2_async ($$$;$) {
        my ($self, $req, $cb, $arg) = @_;
-       my $inflight = $self->{inflight} or return $self->close;
+       my $inflight = $self->gcf_inflight or return;
        PublicInbox::Git::write_all($self, $req, \&cat_async_step, $inflight);
        push @$inflight, \$req, $cb, $arg; # ref prevents Git.pm retries
 }
@@ -49,6 +50,7 @@ sub alternates_changed {}
 
 no warnings 'once';
 
+*gcf_inflight = \&PublicInbox::Git::gcf_inflight; # for event_step
 *cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
 *event_step = \&PublicInbox::Git::event_step;
 *fail = \&PublicInbox::Git::fail;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 93736cf0..fe834210 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -140,6 +140,18 @@ sub last_check_err {
        $buf;
 }
 
+sub gcf_drain { # awaitpid cb
+       my ($pid, $inflight, $bc) = @_;
+       while (@$inflight) {
+               my ($req, $cb, $arg) = splice(@$inflight, 0, 3);
+               $req = $$req if ref($req);
+               $bc and $req =~ s/\A(?:contents|info) //;
+               $req =~ s/ .*//; # drop git_dir for Gcf2Client
+               eval { $cb->(undef, $req, undef, undef, $arg) };
+               warn "E: (in abort) $req: $@" if $@;
+       }
+}
+
 sub _sock_cmd {
        my ($self, $batch, $err_c) = @_;
        $self->{sock} and Carp::confess('BUG: {sock} exists');
@@ -162,8 +174,11 @@ sub _sock_cmd {
                $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
                                                $self->fail("tmpfile($id): $!");
        }
+       my $inflight = []; # TODO consider moving this into the IO object
        my $pid = spawn(\@cmd, undef, $opt);
-       $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
+       $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid,
+                               \&gcf_drain, $inflight, $self->{-bc});
+       $self->{inflight} = $inflight;
 }
 
 sub cat_async_retry ($$) {
@@ -171,8 +186,8 @@ sub cat_async_retry ($$) {
 
        # {inflight} may be non-existent, but if it isn't we delete it
        # here to prevent cleanup() from waiting:
-       delete $self->{inflight};
-       cleanup($self);
+       my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)};
+       $self->SUPER::close if $epwatch;
        my $new_inflight = batch_prepare($self);
 
        while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
@@ -180,13 +195,25 @@ sub cat_async_retry ($$) {
                $oid = \$oid if !@$new_inflight; # to indicate oid retried
                push @$new_inflight, $oid, $cb, $arg;
        }
+       $sock->close if $sock; # only safe once old_inflight is empty
        cat_async_step($self, $new_inflight); # take one step
 }
 
+sub gcf_inflight ($) {
+       my ($self) = @_;
+       if ($self->{sock}) {
+               return $self->{inflight} if $self->{sock}->owner_pid == $$;
+               delete @$self{qw(sock inflight)};
+       } else {
+               $self->close;
+       }
+       undef;
+}
+
 # returns true if prefetch is successful
 sub async_prefetch {
        my ($self, $oid, $cb, $arg) = @_;
-       my $inflight = $self->{inflight} or return;
+       my $inflight = gcf_inflight($self) or return;
        return if @$inflight;
        substr($oid, 0, 0) = 'contents ' if $self->{-bc};
        write_all($self, "$oid\n", \&cat_async_step, $inflight);
@@ -195,7 +222,7 @@ sub async_prefetch {
 
 sub cat_async_step ($$) {
        my ($self, $inflight) = @_;
-       die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+       croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
        my ($req, $cb, $arg) = @$inflight[0, 1, 2];
        my ($bref, $oid, $type, $size);
        my $head = $self->{sock}->my_readline;
@@ -237,11 +264,8 @@ sub cat_async_step ($$) {
 
 sub cat_async_wait ($) {
        my ($self) = @_;
-       return $self->close if !$self->{sock};
-       my $inflight = $self->{inflight} or return;
-       while (scalar(@$inflight)) {
-               cat_async_step($self, $inflight);
-       }
+       my $inflight = gcf_inflight($self) or return;
+       cat_async_step($self, $inflight) while (scalar(@$inflight));
 }
 
 sub batch_prepare ($) {
@@ -253,7 +277,6 @@ sub batch_prepare ($) {
        } else {
                _sock_cmd($self, 'batch');
        }
-       $self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -271,7 +294,7 @@ sub cat_file {
 
 sub check_async_step ($$) {
        my ($ck, $inflight) = @_;
-       die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+       croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
        my ($req, $cb, $arg) = @$inflight[0, 1, 2];
        chomp(my $line = $ck->{sock}->my_readline);
        my ($hex, $type, $size) = split(/ /, $line);
@@ -291,8 +314,7 @@ sub check_async_wait ($) {
        my ($self) = @_;
        return cat_async_wait($self) if $self->{-bc};
        my $ck = $self->{ck} or return;
-       return $ck->close if !$ck->{sock};
-       my $inflight = $ck->{inflight} or return;
+       my $inflight = gcf_inflight($ck) or return;
        check_async_step($ck, $inflight) while (scalar(@$inflight));
 }
 
@@ -312,7 +334,6 @@ sub check_async_begin ($) {
        } else {
                _sock_cmd($self = ck($self), 'batch-check', 1);
        }
-       $self->{inflight} = [];
 }
 
 sub write_all {
@@ -337,12 +358,13 @@ sub check_async ($$$$) {
        my $inflight;
        if ($self->{-bc}) { # likely as time goes on
 batch_command:
-               $inflight = $self->{inflight} // cat_async_begin($self);
+               $inflight = gcf_inflight($self) // cat_async_begin($self);
                substr($oid, 0, 0) = 'info ';
                write_all($self, "$oid\n", \&cat_async_step, $inflight);
        } else { # accounts for git upgrades while we're running:
                my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
-               $inflight = $ck->{inflight} // check_async_begin($self);
+               $inflight = ($ck ? gcf_inflight($ck) : undef)
+                                // check_async_begin($self);
                goto batch_command if $self->{-bc};
                write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
        }
@@ -417,8 +439,8 @@ sub date_parse {
 }
 
 sub _active ($) {
-       scalar(@{$_[0]->{inflight} // []}) ||
-               ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+       scalar(@{gcf_inflight($_[0]) // []}) ||
+               ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []}))
 }
 
 # check_async and cat_async may trigger the other, so ensure they're
@@ -493,13 +515,13 @@ sub pub_urls {
 sub cat_async_begin {
        my ($self) = @_;
        cleanup($self) if $self->alternates_changed;
-       die 'BUG: already in async' if $self->{inflight};
+       die 'BUG: already in async' if gcf_inflight($self);
        batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
        my ($self, $oid, $cb, $arg) = @_;
-       my $inflight = $self->{inflight} // cat_async_begin($self);
+       my $inflight = gcf_inflight($self) // cat_async_begin($self);
        substr($oid, 0, 0) = 'contents ' if $self->{-bc};
        write_all($self, $oid."\n", \&cat_async_step, $inflight);
        push(@$inflight, $oid, $cb, $arg);
@@ -596,8 +618,7 @@ sub cleanup_if_unlinked {
 
 sub event_step {
        my ($self) = @_;
-       $self->close if !$self->{sock}; # process died while requeued
-       my $inflight = $self->{inflight};
+       my $inflight = gcf_inflight($self);
        if ($inflight && @$inflight) {
                $self->cat_async_step($inflight);
                return $self->close unless $self->{sock};
@@ -619,18 +640,10 @@ sub watch_async ($) {
 
 sub close {
        my ($self) = @_;
-       if (my $q = $self->{inflight}) { # abort inflight requests
-               while (@$q) {
-                       my ($req, $cb, $arg) = splice(@$q, 0, 3);
-                       $req = $$req if ref($req);
-                       $self->{-bc} and $req =~ s/\A(?:contents|info) //;
-                       $req =~ s/ .*//; # drop git_dir for Gcf2Client
-                       eval { $cb->(undef, $req, undef, undef, $arg) };
-                       warn "E: (in abort) $req: $@" if $@;
-               }
-       }
+       my $sock = $self->{sock};
        delete @$self{qw(-bc err_c inflight)};
        delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+       $sock->close if $sock; # calls gcf_drain via awaitpid
 }
 
 package PublicInbox::GitCheck; # only for git <2.36
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index f8b2a9fc..09744b34 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -40,7 +40,7 @@ sub ibx_async_prefetch {
        my ($ibx, $oid, $cb, $arg) = @_;
        my $git = $ibx->git;
        if (!defined($ibx->{topdir}) && $GCF2C) {
-               if (!@{$GCF2C->{inflight} // []}) {
+               if (!@{$GCF2C->gcf_inflight // []}) {
                        $oid .= " $git->{git_dir}\n";
                        return $GCF2C->gcf2_async($oid, $cb, $arg); # true
                }
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index fcebac59..6593dcdf 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,8 +15,10 @@ use Errno qw(EINTR EAGAIN);
 
 sub waitcb { # awaitpid callback
        my ($pid, $errref, $cb, @args) = @_;
+       $errref //= \my $workaround_await_pids_clobbered;
        $$errref = $?; # sets .cerr for _close
-       $cb->($pid, @args) if $cb;
+       $cb->($pid, @args) if $cb; # may clobber $?
+       $? = $$errref;
 }
 
 sub attach_pid {
@@ -33,6 +35,11 @@ sub attached_pid {
        ${${*$io}{pi_io_reap} // []}[1];
 }
 
+sub owner_pid {
+       my ($io) = @_;
+       ${${*$io}{pi_io_reap} // [-1]}[0];
+}
+
 # caller cares about error result if they call close explicitly
 # reap->[2] may be set before this is called via waitcb
 sub close {
@@ -40,8 +47,12 @@ sub close {
        my $ret = $io->SUPER::close;
        my $reap = delete ${*$io}{pi_io_reap};
        return $ret unless $reap && $reap->[0] == $$;
-       ${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2]
-       ($? = ${$reap->[2]}) ? '' : $ret;
+       if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
+               $? = ${$reap->[2]};
+       } else { # wait synchronously
+               my $w = awaitpid($reap->[1]);
+       }
+       $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
 }
 
 sub DESTROY {

Reply via email to