On Tue, 2004-08-10 at 22:42, Dan Sugalski wrote:
> Part of me's tempted to just define our own set of functions, but the
> problem there is that we then put the onus on the embedding app to
> conform to us, which I'm not sure is the right way to go about things.
When the standard APIs are all so completely different (kqueue, epoll,
aio, /dev/poll) and seriously broken (select, poll), I think you pretty
much HAVE to define your own API! If you standardize on kqueue,
implementing epoll becomes incredibly difficult, and vice versa. The
only solution that I am aware of is to introduce a sane API that works
with all possible underlying implementations.
I had a similar need for another project about a year ago. I wanted my
my application to have a single I/O API, no matter which underlying
implementation was being used. I looked at a few libraries at the time
that claimed to do exactly this, but they were all very heavyweight and
intrusive. So, I rolled my own. Here's a quick description...
The API:
Each file is represented by a filedescriptor and a callback routine.
These are grouped into an io_atom structure. io_atoms are intended to
be embedded into other data structures, so all io_atom
allocation/deallocation is handled by the caller.
typedef struct io_atom {
io_proc proc;
int fd;
} io_atom;
To use it:
typedef {
io_atom io;
int parse_state;
... etc.
} struct my_connection;
io_add starts monitoring an atom. Specify the events that you are
interested in in flags (IO_EXCEPT appears to be very platform
dependent... IO_READ is also used to get incoming connections from a
listening socket).
int io_add(io_atom *atom, int flags);
Change the events an atom is monitoring using io_set.
Stop monitoring an atom with io_del.
int io_set(io_atom *atom, int flags);
int io_del(io_atom *atom);
Whenever an event occurs on a filehandle, the atom's io_proc
notification is called. The flags tell what happened.
#define IO_READ 0x01
#define IO_WRITE 0x02
#define IO_EXCEPT 0x04
typedef void (*io_proc)(struct io_atom *atom, int flags);
Your application waits for events and dispatches them using io_wait. A
timeout of 0 means return immediately after dispatching all pending
events, a timeout of MAXINT means no timeout -- never return. Timeout
is in milliseconds.
int io_wait(int timeout);
Finally, because some of these implementations require dynamic memory
(kqueue), we need:
void io_init(); // inits internal data structures
void io_exit(); // releases all dynamic memory used
If you call io_exit without calling io_del on all the added FDs first,
the files are are not closed. They are simply not monitored anymore.
TODO:
Some more work needs to be put into standardizing what error messages
can be returned by what calls and exactly what they mean. Extensive
documentation. Generalization.
Write a clearer demo! :)
Perhaps TODO:
In my application, the io_atom appears first in every struct, so it's
real easy and memory-efficient to convert the atom into its containing
structure:
typedef struct connection {
io_atom io; ///< I/O information (set by rot_connect)
struct in_addr remote_addr;
int remote_port;
...
void callback(struct io_atom *io, int flags)
{
connection *conn = (connection*)io;
do_stuff(conn, flags);
}
This might not be true in the general case... Maybe the io_atom should
include a void* client-specified refcon?
In summary...
I'm quite happy with this design, but the code is marginal right now.
Nevertheless, I've banged together a mediocre networking demo. Baware
the boogs!! I figure I should have some time in a few weeks to solidify
this and get it into Parrot if nobody solves the IO issue before then.
My girlfriend is moving to Boston, so that should help a lot. :)
- Scott
P.S. This is all licensed under the same terms as Parrot: GPL or
Artistic 2.0, your choice.
// io.h
// Scott Bronson
// 2 Oct 2003
// This is the generic Async I/O API. It can be implemented using
// select, poll, epoll, kqueue, aio, and /dev/poll (hopefully).
//
// This code is licensed under the same terms as Parrot itself.
#define IO_READ 0x01
#define IO_WRITE 0x02
#define IO_EXCEPT 0x04
// Tells how many incoming connections we can handle at once
// (the backlog parameter to listen)
#define STD_LISTEN_SIZE 128
struct io_atom;
/**
* This routine is called whenever there is action on an atom.
*
* @param atom The atom that the proc is being called on.
* @param flags What sort of action is happening.
*/
typedef void (*io_proc)(struct io_atom *atom, int flags);
typedef struct io_atom {
io_proc proc;
int fd;
} io_atom;
void io_init();
void io_exit();
int io_add(io_atom *atom, int flags);
int io_set(io_atom *atom, int flags);
int io_del(io_atom *atom);
/// Waits for an event, then handles it. Stops waiting if timeout occurs.
/// Specify MAXINT for no timeout.
int io_wait(int timeout);
// select.c
// Scott Bronson
// 4 October 2003
//
// Uses select to satisfy gatekeeper's network I/O
// Because of select's internal limitations, MAXFDS is 1024.
//
// This code is licensed under the same terms as Parrot itself.
#include <stdio.h>
#include <errno.h>
#include <values.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include "io.h"
static io_atom* connections[FD_SETSIZE];
static fd_set fd_read, fd_write, fd_except;
static int max_fd; // the highest-numbered filedescriptor in connections.
// Pass the file descriptor that you'll be listening and accepting on.
void io_init()
{
FD_ZERO(&fd_read);
FD_ZERO(&fd_write);
FD_ZERO(&fd_except);
}
void io_exit()
{
// nothing to do
}
static void install(int fd, int flags)
{
if(flags & IO_READ) {
FD_SET(fd, &fd_read);
} else {
FD_CLR(fd, &fd_read);
}
if(flags & IO_WRITE) {
FD_SET(fd, &fd_write);
} else {
FD_CLR(fd, &fd_write);
}
if(flags & IO_EXCEPT) {
FD_SET(fd, &fd_except);
} else {
FD_CLR(fd, &fd_except);
}
}
int io_add(io_atom *atom, int flags)
{
int fd = atom->fd;
if(fd < 0 || fd > FD_SETSIZE) {
return -ERANGE;
}
if(connections[fd]) {
return -EALREADY;
}
connections[fd] = atom;
install(fd, flags);
if(fd > max_fd) max_fd = fd;
return 0;
}
int io_set(io_atom *atom, int flags)
{
int fd = atom->fd;
if(fd < 0 || fd > FD_SETSIZE) {
return -ERANGE;
}
if(!connections[fd]) {
return -EALREADY;
}
install(fd, flags);
return 0;
}
int io_del(io_atom *atom)
{
int fd = atom->fd;
if(fd < 0 || fd > FD_SETSIZE) {
return -ERANGE;
}
if(!connections[fd]) {
return -EALREADY;
}
install(fd, 0);
connections[fd] = NULL;
while((max_fd >= 0) && (connections[max_fd] == NULL)) {
max_fd -= 1;
}
return 0;
}
// Wait for events, then dispatch them.
// timeout is in milliseconds. MAXINT == forever.
int io_wait(int timeout)
{
struct timeval tv;
struct timeval *tvp = &tv;
int num, i, flags;
if(timeout == MAXINT) {
tvp = NULL;
} else {
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
}
fd_set rfds = fd_read;
fd_set wfds = fd_write;
fd_set efds = fd_except;
num = select(1+max_fd, &rfds, &wfds, &efds, tvp);
if(num < 0) {
perror("select");
return num;
}
for(i=0; i <= max_fd; i++) {
flags = 0;
if(FD_ISSET(i, &rfds)) flags |= IO_READ;
if(FD_ISSET(i, &wfds)) flags |= IO_WRITE;
if(FD_ISSET(i, &efds)) flags |= IO_EXCEPT;
if(flags) {
if(connections[i]) {
(*connections[i]->proc)(connections[i], flags);
} else {
// what do we do -- event on an unknown connection?
printf("Got an event on an uknown connection %d!\n", i);
}
}
}
return num;
}
// iotest.c
// Scott Bronson
// 11 Aug 2004
//
// Compile: "cc -Wall iotest.c io_select.c -o iotest"
// Run it it, then "telnet localhost 21314" a bunch of times.
//
// This example doesn't really show off asynchronous I/O, the whole point
// of the io_atom library. And it's 99% obtuse networking code. So,
// really, it's a pretty darn poor demo. But hopefully it's better than
// nothing.
//
// This code is licensed under the same terms as parrot itself.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <values.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
#include "io.h"
#define PORT 21314
io_atom g_accepter; // the listening socket
char g_char = 'A';
char g_readbuf[1024];
typedef struct {
io_atom io;
char c;
int chars_processed;
} connection;
int set_nonblock(int sd)
{
int flags = 1;
// set nonblocking IO on the socket
if(ioctl(sd, FIONBIO, &flags)) {
// ioctl failed -- try alternative
flags = fcntl(sd, F_GETFL, 0);
if(flags < 0) {
return -1;
}
if(fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
return -1;
}
}
return 0;
}
void connection_close(connection *conn)
{
io_del(&conn->io);
close(conn->io.fd);
free(conn);
}
void connection_proc(io_atom *ioa, int flags)
{
connection *conn = (connection*)ioa;
int fd = conn->io.fd;
int len;
if(flags & IO_READ) {
do {
len = read(fd, g_readbuf, sizeof(g_readbuf));
} while (errno == EINTR); // stupid posix
if(len > 0) {
write(fd, g_readbuf, len);
conn->chars_processed += len;
} else if(len == 0) {
// A 0-length read means remote has closed normally
connection_close(conn);
return;
} else {
// handle an error on the socket
if(errno == EAGAIN) {
// nothing to read? weird.
} else if(errno == EWOULDBLOCK) {
// with glibc EAGAIN==EWOULDBLOCK so this is probably dead code
} else {
// there's some sort of read error on this stream.
connection_close(conn);
return;
}
}
}
if(flags & IO_WRITE) {
// there's more space in the write buffer
// so continue writing.
}
if(flags & IO_EXCEPT) {
// I think this is also used for OOB.
// recv (fd1, &c, 1, MSG_OOB);
connection_close(conn);
return;
}
}
void accept_proc(io_atom *ioa, int flags)
{
connection *conn;
struct sockaddr_in pin;
socklen_t plen;
int sd;
// since the accepter only has IO_READ anyway, there's no need to
// check the flags param.
plen = sizeof(pin);
while((sd = accept(ioa->fd, (struct sockaddr*)&pin, &plen)) < 0) {
if(errno == EINTR) {
// This call was interrupted by a signal. Try again and
// see if we receive a connection.
continue;
}
if(errno == EAGAIN || errno == EWOULDBLOCK) {
// socket is marked nonblocking but no pending connections
// are present. Weird. I guess we should succeed but do nothing.
return;
}
// Probably there is a network error pending for this
// connection (already!). Should probably just ignore it...?
return;
}
if(set_nonblock(sd) < 0) {
printf("Could not set nonblocking: %s.\n", strerror(errno));
close(sd);
return;
}
conn = malloc(sizeof(connection));
if(!conn) {
close(sd);
return;
}
conn->io.fd = sd;
conn->io.proc = connection_proc;
if(io_add(&conn->io, IO_READ) < 0) {
perror("io_add_main");
close(sd);
exit(1);
}
printf("connection opened from %s port %d given fd %d\n",
inet_ntoa(pin.sin_addr), ntohs(pin.sin_port), conn->io.fd);
}
int main(int argc, char **argv)
{
int sd;
struct sockaddr_in sin;
io_init();
printf("Opening listening socket...\n");
if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
exit(1);
}
if(set_nonblock(sd) < 0) {
perror("setnonblocking");
close(sd);
exit(1);
}
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sd, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
close(sd);
exit(1);
}
if (listen(sd, STD_LISTEN_SIZE) == -1) {
perror("listen");
close(sd);
exit(1);
}
g_accepter.fd = sd;
g_accepter.proc = accept_proc;
if(io_add(&g_accepter, IO_READ) < 0) {
perror("io_add_main");
close(sd);
exit(1);
}
printf("Listening on port %d, fd %d.\n", PORT, g_accepter.fd);
for(;;) {
io_wait(MAXINT);
}
io_exit();
return 0;
}