Jean Boussier <[email protected]> wrote:
> >> It seems socketpair(..., SOCK_SEQPACKET) can be used for
> >> packetized bidirectional IPC, perhaps with send_io + recv_io iff
> >> necessary.  There shouldn't be any need for new, fragile FS
> >> interactions.
> 
> > Hum. I'm not familiar with socketpair, I'll have to read on it.

Please keep [email protected] Cc-ed for archival purposes
in case I'm arrested/kidnapped/killed.  I'll quote and reply to
your other message separately, no need to resend.

> Could you clarify what you had in mind? Because I've read on socketpair, and
> from my understanding it's similar to anonymous pipes, as they have to
> be created prior
> to forking.

It's bidirectional, and you can use send_io/recv_io to send
pipes or any other IO objects across.  SOCK_SEQPACKET
(sequenced-packet) also frees you from having to deal with
splitting on message boundaries or interleaving.

Ruby's send_io/recv_io does have a weakness in that it can't
send a useful string buffer along with the IO object, but a little
C can be used with sendmsg/recvmsg to make it happen (but
sendmsg + send_io is probably fine)

Ruby's socket ext just sends a 1 byte dummy padding buffer, but
it can allow whatever the socket buffers can handle (over 200k,
IIRC).

This should allow the a socketpair created from a master to
communicate bidirectionally with any of its kids.

If you don't feel like writing C, it might be easier to have
two socketpairs:

1) for string buffer messages
2) another exclusively for send_io/recv_io

> So if we start forking from a worker, I don't see how the new worker and
> the master can open a line of communication without exchanging on the
> filesystem.

If you're sending one end of a pipe from the worker to master:

   m1, w1 = socketpair ...
   m2, w2 = socketpair ...

   w1.sendmsg("#$$-#{io.stat.ino}")
   w2.send_io(io)

And the master will:

   m1.recvmsg => "#{worker_pid}-#{inode}"
   m2.recv_io => map inode of received IO to worker

One major weakness of the above is the non-atomicity if a worker
dies in between w1.sendmsg and w2.send_io; the master would
be hanging on m2.recv_io.

This is why having a bit of C to bundle the IO object with
sendmsg/recvmsg in a single syscall would be nice (along with
saving 2 FDs).


Fwiw, here's the equivalent send/recv wrappers I wrote for Perl
Inline::C a while back, but it should be easy to translate into
Ruby's C API.

In case you can't infer what the Sv* functions do by name,
pretty much all the Perl C API is documented in the perlapi(1)
manpage.  Perl's API was easy enough for a doof like me to learn
after doing Ruby C for a bit...

/* allow send/recv of up to 10 IO objects at once */
#define SEND_FD_CAPA 10
#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
        struct cmsghdr hdr;
        char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
};

static int sleep_wait(unsigned *tries, int err)
{
        const struct timespec req = { 0, 100000000 }; /* 100ms */
        switch (err) {
        case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
                if (++*tries < 50) {
                        fprintf(stderr, "sleeping on sendmsg: %s (#%u)\n",
                                strerror(err), *tries);
                        nanosleep(&req, NULL);
                        return 1;
                }
        default:
                return 0;
        }
}

SV *send_foo(PerlIO *s, SV *svfds, SV *data, int flags)
{
        struct msghdr msg = { 0 };
        union my_cmsg cmsg = { 0 };
        STRLEN dlen = 0;
        struct iovec iov;
        ssize_t sent;
        AV *fds = (AV *)SvRV(svfds);
        I32 i, nfds = av_len(fds) + 1;
        int *fdp;
        unsigned tries = 0;

        if (SvOK(data)) {
                iov.iov_base = SvPV(data, dlen);
                iov.iov_len = dlen;
        }
        if (!dlen) { /* must be non-zero */
                iov.iov_base = &msg.msg_namelen; /* whatever */
                iov.iov_len = 1;
        }
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        if (nfds) {
                if (nfds > SEND_FD_CAPA) {
                        fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds);
                        nfds = SEND_FD_CAPA;
                }
                msg.msg_control = &cmsg.hdr;
                msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int));
                cmsg.hdr.cmsg_level = SOL_SOCKET;
                cmsg.hdr.cmsg_type = SCM_RIGHTS;
                cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int));
                fdp = (int *)CMSG_DATA(&cmsg.hdr);
                for (i = 0; i < nfds; i++) {
                        SV **fd = av_fetch(fds, i, 0);
                        *fdp++ = SvIV(*fd);
                }
        }
        do {
                sent = sendmsg(PerlIO_fileno(s), &msg, flags);
        } while (sent < 0 && sleep_wait(&tries, errno));
        return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
}

void recv_foo(PerlIO *s, SV *buf, STRLEN n)
{
        union my_cmsg cmsg = { 0 };
        struct msghdr msg = { 0 };
        struct iovec iov;
        ssize_t i;
        Inline_Stack_Vars;
        Inline_Stack_Reset;

        if (!SvOK(buf))
                sv_setpvn(buf, "", 0);
        iov.iov_base = SvGROW(buf, n + 1);
        iov.iov_len = n;
        msg.msg_iov = &iov;
        msg.msg_iovlen = 1;
        msg.msg_control = &cmsg.hdr;
        msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);

        i = recvmsg(PerlIO_fileno(s), &msg, 0);
        if (i < 0)
                Inline_Stack_Push(&PL_sv_undef);
        else
                SvCUR_set(buf, i);
        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
                size_t len = cmsg.hdr.cmsg_len;
                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
                        Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
        }
        Inline_Stack_Done;
}

Reply via email to