Ensure the callback is always guarded by `eval' to catch
exceptions and to force a ->close (EPOLL_CTL_DEL).

We also don't want to blindly set O_NONBLOCK on TTYs since their
O_NONBLOCK semantics aren't well-defined by POSIX.  We can also
drop EPOLLET (edge-triggered) use to reduce the need to make
->requeue calls on our end.
---
 lib/PublicInbox/InputPipe.pm | 48 ++++++++++++++++++++++++------------
 1 file changed, 32 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index 60a9f01f..39aefab2 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -1,35 +1,51 @@
 # Copyright (C) all contributors <[email protected]>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# for reading pipes and sockets off the DS event loop
+# for reading pipes, sockets, and TTYs off the DS event loop
 package PublicInbox::InputPipe;
 use v5.12;
 use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::Syscall qw(EPOLLIN);
 
 sub consume {
        my ($in, $cb, @args) = @_;
        my $self = bless { cb => $cb, args => \@args }, __PACKAGE__;
-       eval { $self->SUPER::new($in, EPOLLIN|EPOLLET) };
-       return $self->requeue if $@; # regular file
-       $in->blocking(0); # pipe or socket
+       eval { $self->SUPER::new($in, EPOLLIN) };
+       if ($@) { # regular file (but not w/ select|IO::Poll backends)
+               $self->{-need_rq} = 1;
+               $self->requeue;
+       } elsif (-p $in || -s _) { # O_NONBLOCK for sockets and pipes
+               $in->blocking(0);
+       } # TODO: tty
+}
+
+sub close {
+       my ($self) = @_;
+       $self->{-need_rq} ? delete($self->{sock}) : $self->SUPER::close
 }
 
 sub event_step {
        my ($self) = @_;
        my $r = sysread($self->{sock} // return, my $rbuf, 65536);
-       if ($r) {
-               $self->{cb}->(@{$self->{args}}, $rbuf);
-               return $self->requeue; # may be regular file or pipe
-       }
-       if (defined($r)) { # EOF
-               $self->{cb}->(@{$self->{args}}, '');
-       } elsif ($!{EAGAIN}) {
-               return;
-       } else { # another error
-               $self->{cb}->(@{$self->{args}}, undef)
+       eval {
+               if ($r) {
+                       $self->{cb}->(@{$self->{args}}, $rbuf);
+                       $self->requeue if $self->{-need_rq};
+               } elsif (defined($r)) { # EOF
+                       $self->{cb}->(@{$self->{args}}, '');
+                       $self->close
+               } elsif ($!{EAGAIN}) { # rely on EPOLLIN
+               } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
+                       $self->requeue if $self->{-need_rq};
+               } else { # another error
+                       $self->{cb}->(@{$self->{args}}, undef);
+                       $self->close;
+               }
+       };
+       if ($@) {
+               warn "E: $@";
+               $self->close;
        }
-       $self->{sock}->blocking ? delete($self->{sock}) : $self->close
 }
 
 1;

Reply via email to