This brings the wq_* SOCK_SEQPACKET API functionality
on par with the ipc_do (pipe-based) API.
---
lib/PublicInbox/IPC.pm | 36 ++++++++++++++++++++++++++++++++----
t/ipc.t | 6 ++++++
2 files changed, 38 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9efe551b..d5e37719 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -182,6 +182,13 @@ sub ipc_lock_init {
$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
}
+sub _wait_return ($$) {
+ my ($r_res, $sub) = @_;
+ my $ret = _get_rec($r_res) // die "no response on $sub";
+ die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+ wantarray ? @$ret : $$ret;
+}
+
# call $self->$sub(@args), on a worker if ipc_worker_spawn was used
sub ipc_do {
my ($self, $sub, @args) = @_;
@@ -191,9 +198,7 @@ sub ipc_do {
if (defined(wantarray)) {
my $r_res = $self->{-ipc_res} or die 'no ipc_res';
_send_rec($w_req, [ wantarray, $sub, @args ]);
- my $ret = _get_rec($r_res) // die "no response on $sub";
- die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
- wantarray ? @$ret : $$ret;
+ _wait_return($r_res, $sub);
} else { # likely, fire-and-forget into pipe
_send_rec($w_req, [ undef , $sub, @args ]);
}
@@ -298,7 +303,7 @@ sub wq_io_do { # always async
$!{ETOOMANYREFS} and
croak "sendmsg: $! (check RLIMIT_NOFILE)";
$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
- croak("sendmsg: $!");
+ croak("sendmsg: $!");
}
} else {
@$self{0..$#$ios} = @$ios;
@@ -308,6 +313,29 @@ sub wq_io_do { # always async
}
}
+sub wq_sync_run {
+ my ($self, $wantarray, $sub, @args) = @_;
+ if ($wantarray) {
+ my @ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \@ret, $@);
+ } else { # '' => wantscalar
+ my $ret = eval { $self->$sub(@args) };
+ ipc_return($self->{0}, \$ret, $@);
+ }
+}
+
+sub wq_do {
+ my ($self, $sub, @args) = @_;
+ if (defined(wantarray)) {
+ pipe(my ($r, $w)) or die "pipe: $!";
+ wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
+ undef $w;
+ _wait_return($r, $sub);
+ } else {
+ wq_io_do($self, $sub, [], @args);
+ }
+}
+
sub _wq_worker_start ($$$) {
my ($self, $oldset, $fields) = @_;
my ($bcast1, $bcast2);
diff --git a/t/ipc.t b/t/ipc.t
index 7983fdc0..202b1cc6 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -161,6 +161,12 @@ SKIP: {
is(waitpid($pid, 0), $pid, 'waitpid complete');
is($?, 0, 'child wq producer exited');
}
+ my @ary = $ipc->wq_do('test_array');
+ is_deeply(\@ary, [ qw(test array) ], 'wq_do wantarray');
+ is(my $s = $ipc->wq_do('test_scalar'), 'scalar', 'defined wantarray');
+ my $exp = bless ['blessed'], 'PublicInbox::WTF';
+ my $ret = eval { $ipc->wq_do('test_die', $exp) };
+ is_deeply($@, $exp, 'die with blessed ref');
}
$ipc->wq_close;
--
unsubscribe: one-click, see List-Unsubscribe header
archive: https://public-inbox.org/meta/