Maybe we can use it in other places, maybe not.  I'm not
bothering to support `$/ = undef' or `$/ = \$integer' cases
for now since (AFAIK) we won't need them.
---
 Not entirely sure if there's other places to use it just
 yet, but I've already worked around this other places

 MANIFEST                        |  1 +
 lib/PublicInbox/Gcf2Client.pm   |  5 +--
 lib/PublicInbox/Git.pm          | 75 +++++++--------------------------
 lib/PublicInbox/ProcessIO.pm    |  5 ++-
 lib/PublicInbox/ProcessIORBF.pm | 55 ++++++++++++++++++++++++
 5 files changed, 77 insertions(+), 64 deletions(-)
 create mode 100644 lib/PublicInbox/ProcessIORBF.pm

diff --git a/MANIFEST b/MANIFEST
index 3df48667..fc1a122b 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -321,6 +321,7 @@ lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
 lib/PublicInbox/ProcessIO.pm
 lib/PublicInbox/ProcessIONBF.pm
+lib/PublicInbox/ProcessIORBF.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index f63a0335..1215b75b 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev 
isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIO;
+use PublicInbox::ProcessIORBF;
 use autodie qw(socketpair);
 
 # fields:
@@ -28,12 +28,11 @@ sub new  {
        # ensure the child process has the same @INC we do:
        my $env = { PERL5LIB => join(':', @INC) };
        socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
-       $s1->blocking(0);
        $opt->{0} = $opt->{1} = $s2;
        my $cmd = [$^X, $^W ? ('-w') : (),
                        qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
        my $pid = spawn($cmd, $env, $opt);
-       my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+       my $sock = PublicInbox::ProcessIORBF->maybe_new($pid, $s1);
        $self->{inflight} = [];
        $self->{epwatch} = \undef; # for Git->cleanup
        $self->SUPER::new($sock, EPOLLIN);
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 9c26d8bf..b0214f0c 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,21 +14,19 @@ use autodie qw(socketpair read);
 use POSIX ();
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
 use PublicInbox::Spawn qw(spawn popen_rd which);
-use PublicInbox::ProcessIONBF;
+use PublicInbox::ProcessIORBF;
 use PublicInbox::Tmpfile;
-use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
 use PublicInbox::SHA qw(sha_all);
 our %HEXLEN2SHA = (40 => 1, 64 => 256);
 our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
 our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN read_all);
 our $in_cleanup;
-our $RDTIMEO = 60_000; # milliseconds
 our $async_warn; # true in read-only daemons
 
 # committerdate:unix is git 2.9.4+ (2017-05-05), so using raw instead
@@ -165,46 +163,7 @@ sub _sock_cmd {
                                                $self->fail("tmpfile($id): $!");
        }
        my $pid = spawn(\@cmd, undef, $opt);
-       $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
-}
-
-sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
-
-sub my_read ($$$) {
-       my ($fh, $rbuf, $len) = @_;
-       my $left = $len - length($$rbuf);
-       my $r;
-       while ($left > 0) {
-               $r = sysread($fh, $$rbuf, $left, length($$rbuf));
-               if ($r) {
-                       $left -= $r;
-               } elsif (defined($r)) { # EOF
-                       return 0;
-               } else {
-                       next if ($! == EAGAIN and poll_in($fh));
-                       next if $! == EINTR; # may be set by sysread or poll_in
-                       return; # unrecoverable error
-               }
-       }
-       my $no_pad = substr($$rbuf, 0, $len, '');
-       \$no_pad;
-}
-
-sub my_readline ($$) {
-       my ($fh, $rbuf) = @_;
-       while (1) {
-               if ((my $n = index($$rbuf, "\n")) >= 0) {
-                       return substr($$rbuf, 0, $n + 1, '');
-               }
-               my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
-               # return whatever's left on EOF
-               return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
-               next if ($! == EAGAIN and poll_in($fh));
-               next if $! == EINTR; # may be set by sysread or poll_in
-               return; # unrecoverable error
-       }
+       $self->{sock} = PublicInbox::ProcessIORBF->new($pid, $s1);
 }
 
 sub cat_async_retry ($$) {
@@ -238,18 +197,16 @@ sub cat_async_step ($$) {
        my ($self, $inflight) = @_;
        die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
        my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-       my $rbuf = delete($self->{rbuf}) // \(my $new = '');
        my ($bref, $oid, $type, $size);
-       my $head = my_readline($self->{sock}, $rbuf);
+       my $head = readline($self->{sock});
        my $cmd = ref($req) ? $$req : $req;
        # ->fail may be called via Gcf2Client.pm
        my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
        if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
                ($oid, $type, $size) = ($1, $2, $3 + 0);
                unless ($info) { # --batch-command
-                       $bref = my_read($self->{sock}, $rbuf, $size + 1) or
-                               $self->fail(defined($bref) ?
-                                               'read EOF' : "read: $!");
+                       $bref = eval { \read_all($self->{sock}, $size + 1) };
+                       $self->fail("$oid: $@") if $@;
                        chop($$bref) eq "\n" or
                                        $self->fail('LF missing after blob');
                }
@@ -272,7 +229,6 @@ sub cat_async_step ($$) {
                my $err = $! ? " ($!)" : '';
                $self->fail("bad result from async cat-file: $head$err");
        }
-       $self->{rbuf} = $rbuf if $$rbuf ne '';
        splice(@$inflight, 0, 3); # don't retry $cb on ->fail
        eval { $cb->($bref, $oid, $type, $size, $arg) };
        async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -315,17 +271,15 @@ sub check_async_step ($$) {
        my ($ck, $inflight) = @_;
        die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
        my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-       my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
-       chomp(my $line = my_readline($ck->{sock}, $rbuf));
+       chomp(my $line = readline($ck->{sock}));
        my ($hex, $type, $size) = split(/ /, $line);
 
        # git <2.21 would show `dangling' (2.21+ shows `ambiguous')
        # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
        if ($hex eq 'dangling') {
-               my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
-               $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+               eval { read_all($ck->{sock}, $type + 1) };
+               $ck->fail("$hex: $@") if $@;
        }
-       $ck->{rbuf} = $rbuf if $$rbuf ne '';
        splice(@$inflight, 0, 3); # don't retry $cb on ->fail
        eval { $cb->(undef, $hex, $type, $size, $arg) };
        async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -560,7 +514,7 @@ sub read_all ($;$$) {
        my ($fh, $len, $bref) = @_;
        $bref //= \(my $buf);
        my $r = read($fh, $$bref, $len //= -s $fh);
-       croak("$fh read ($r != $len)") if $len != $r;
+       croak("$fh read ($r != $len) (\$!=$!)") if $len != $r;
        $$bref;
 }
 
@@ -638,7 +592,7 @@ sub cleanup_if_unlinked {
        my $ret = 0;
        for my $obj ($self, ($self->{ck} // ())) {
                my $sock = $obj->{sock} // next;
-               my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
+               my PublicInbox::ProcessIORBF $p = tied *$sock;
                my $pid = $p->{pid} // next;
                open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
                while (<$fh>) {
@@ -659,10 +613,11 @@ sub event_step {
        my $inflight = $self->{inflight};
        if ($inflight && @$inflight) {
                $self->cat_async_step($inflight);
-               return $self->close unless $self->{sock};
+               my $sock = $self->{sock} or return $self->close;
                # don't loop here to keep things fair, but we must requeue
                # if there's already-read data in rbuf
-               $self->requeue if exists($self->{rbuf});
+               my PublicInbox::ProcessIORBF $rbf = tied *$sock;
+               $self->requeue if exists($rbf->{rbuf});
        }
 }
 
@@ -686,7 +641,7 @@ sub close {
                        warn "E: (in abort) $req: $@" if $@;
                }
        }
-       delete @$self{qw(-bc err_c inflight rbuf)};
+       delete @$self{qw(-bc err_c inflight)};
        delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
 }
 
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index ea5d3e6c..4f4aaa80 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -2,7 +2,10 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe or socket,
-# see perltie(1) for details.
+# see perltie(1) for details.  This uses perlio (Perl internals) for
+# buffered reads, subclass PublicInbox::ProcessIONBF uses unbuffered
+# reads, while subclass PublicInbox::ProcessIORBF buffers reads in our
+# own Perl code, not via perlio, allowing for use in event loops
 package PublicInbox::ProcessIO;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
diff --git a/lib/PublicInbox/ProcessIORBF.pm b/lib/PublicInbox/ProcessIORBF.pm
new file mode 100644
index 00000000..2369e668
--- /dev/null
+++ b/lib/PublicInbox/ProcessIORBF.pm
@@ -0,0 +1,55 @@
+# Copyright (C) all contributors <[email protected]>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Synchronous buffering reads for non-blocking IO since perlio doesn't
+# expose PerlIO_get_cnt to determine if we can rely on epoll/kevent.
+package PublicInbox::ProcessIORBF;
+use v5.12;
+use parent qw(PublicInbox::ProcessIONBF);
+use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
+use bytes qw(length substr);
+
+sub retry_read ($) {
+       ($! == EAGAIN and
+               IO::Poll::_poll(-1, fileno($_[0]->{fh}), my $ev = POLLIN)) ||
+       ($! == EINTR); # may be set by _poll (or sysread)
+}
+
+# this only supports $/ when it's "\n", don't bother with \integer and
+# undef cases since we only use this for git cat-file (and maybe fast-import)
+sub READLINE {
+       my $rbuf = delete($_[0]->{rbuf}) // '';
+       while (1) {
+               my $n = index($rbuf, "\n");
+               if ($n >= 0) {
+                       my $ret = substr($rbuf, 0, $n + 1, '');
+                       $_[0]->{rbuf} = $rbuf if $rbuf ne '';
+                       return $ret;
+               }
+               $n = sysread($_[0]->{fh}, $rbuf, 65536, length($rbuf)) and next;
+               return $rbuf if defined($n); # EOF, everything
+               return ($rbuf eq '' ? undef : $rbuf) if !retry_read($_[0]);
+       }
+}
+
+sub READ { # ($self, $buf, $len) offset is not supported by us
+       my $rbuf = delete($_[0]->{rbuf}) // '';
+       my $left = $_[2] - length($rbuf);
+       while ($left > 0) {
+               my $r = sysread($_[0]->{fh}, $rbuf, $left, length($rbuf));
+               if ($r) {
+                       $left -= $r
+               } elsif (defined($r)) {
+                       last # EOF
+               } elsif (!retry_read($_[0])) { # unrecoverable error
+                       last if $rbuf ne ''; # return whatever's left
+                       return ($_[1] = undef);
+               } # else: loop and retry
+       }
+       $_[1] = substr($rbuf, 0, $_[2], '');
+       $_[0]->{rbuf} = $rbuf if $rbuf ne '';
+       length($_[1]);
+}
+
+1;

Reply via email to