Am 29.01.2013 03:01, schrieb Brian Anderson:
On 01/28/2013 05:29 PM, Graydon Hoare wrote:
On 13-01-28 04:56 PM, Brian Anderson wrote:

I think libuv is doing too much here. For example, if I don't want to
remove the socket from the event
queue, just disable the callback, then this is not possible. I'd
prefer when I could just tell libuv that
I am interested in event X (on Windows: I/O completion, on UNIX: I/O
availability).
Yet the optimization you suggest has to do with recycling the buffer,
not listening for one kind of event vs. another.

In general I'm not interested in trying to "get underneath" the
abstraction uv is providing. It's providing an IOCP-oriented interface,
I would like to code to that and make the rust IO library not have to
worry when it's on windows vs. unix. That's the point of the abstraction
uv provides, and it's valuable. If it means bouncing off epoll a few too
many times (or reallocating a buffer a few too many times), I'm not too
concerned. Those should both be O(1) operations.

Is it possible to do this optimization later or do we need to plan for
this ahead of time? I would prefer to use the uv API as it's presented
to start with.
The optimization to use a caller-provided buffer should (a) not be
necessary to get us started and (b) be equally possible on either
platform, unix or windows, _so long as_ we're actually sleeping a task
during its period of interest in IO (either the pre-readiness sleep or a
post-issue, pre-completion sleep). In other words, if we're simulating
sync IO, then we can use a task-local buffer. If we're _not_ simulating
sync IO (I sure hope we do!) then we should let uv allocate and free
dynamic buffers as it needs them.

But I really hope we wind up structuring it so it simulates sync IO.
We're providing a task abstraction. Users _want_ the sync IO abstraction
the same way they want the sequential control flow abstraction.

Presenting the scheduler-originating I/O as synchronous is what I intend. I am not sure that we can guarantee that a task is actually waiting for I/O when an I/O event occurs that that task is waiting for. A task may block on some other unrelated event while the event loop is doing I/O. Pseudocode:

let port = IOPort::connect(); // Assume we're doing I/O reads using something portlike
while port.recv() {
// Block on a different port, while uv continues doing I/O on our behalf
    let intermediate_value = some_other_port.recv();
}

This is why I'm imagining that the scheduler will sometimes need to buffer.

I don't think so. Let me explain.

This anyway is only a problem (which can be solved) iff we want to be able to treat I/O like a port and want to wait for either one to resume our thread. And I assume we want this, so that we can listen on an I/O socket AND for example for incoming messages at the same time.

The kernel provides a way to do (task-local) blocking I/O operations. There is no way for the task to return from a read() call unless data comes in or in case of EOF (or any other error condition). This behaves basically like a blocking POSIX read() call, just that it is converted into asynchronous read by libuv under the hood. To expose I/O as port, we have to start
a new task:

  let fd = open(...);
  let (po, ch) = streams::pipe();
  do task::spawn {
    loop {
      let buf: ~[u8] = vec::from_fn(1000, || 0);
      let nread = fd.read(buf, 1000);
      if nread > 0 {
        ch.send(Data(buf))
      }
      else if nread == 0 {
        ch.send(EOF)
      }
      else {
        ch.send(Error)
      }
    }
  }

  // now we can treat `po` as a Port and call select() on it


But I don't think channel I/O will be used that often.

Note that one big advantage is that we can specify the buffer size ourself!
When we would let libuv create a  buffer for us, how would it know the
buffer size? The alloc_cb you provide to libuv upon uv_start_read() will get
a suggested_size parameter passed, but this is 64k by default, and libuv
cannot know what kind of I/O protocol you are handling. When I do
line oriented I/O, I would not need a full 64k buffer allocated for every
read, which in the worst case would only return one byte in it in case
of a very slow sender (send one byte each second). Or is 64k enough
for receiving a very large packet. We clearly want a way to tell the I/O
system how large we expect the packet to be that will arrive over I/O
otherwise this is completely useless IMHO.

We would still have one separate iotask per scheduler. This is a native
thread and runs the I/O loop. There is no way to do that inside the
scheduler as we would block any task while waiting for I/O.
The callbacks like on_read_cb would simply notify the scheduler
that the task that was responsible for doing this read operation
can now resume. As the scheduler lives in another thread
(the thread in which all tasks of that scheduler live in)
and might be active, we need to do some locking here.
When the scheduler gets activated next time, either by
issuing a blocking I/O operation, giving up by using task::yield
or by waiting for a message on a port, or when sending a message
blocks, the scheduler can decide which task to schedule next
and consider those for which I/O has arrived as well.

One thing to consider is that we'd need a way to return the number
of bytes written to the buffer to the calling task of read().
We should store this in the same manner as the pointer to the buffer
and the buffer_size in the stream_t handle. This is safe, as one I/O
object is always exclusively used by one task. We can call this field
last_nread for example, and when the scheduler reactivates a
task blocked on a read I/O, we would simply return this field as number
of read bytes.

In short:

  * A task can either block on exactly *one* I/O object
  * or on a channel/port.
  * Each I/O object belongs exclusivly to one task
  * I/O and Port/Chan are two different things
  * I/O is "lower" than Port/Chan, but can be easily
     wrapped into an Port/Chan abstraction (see code above)
  * When a task blocks on an I/O event, it blocks until
     this I/O event arrives.
  * A task can only ever block on *one* I/O event
  * For Channel I/O (I/O over Port/Chan) a separate task
     in needed for each connection object.
  * We have on iotask per scheduler.

Actually what we are doing is reversing what the core provides
us by default. Right now, blocking I/O (read(), read_line()) is provided over a
Chan/Port system. While in the future blocking I/O will be provided by
the scheduler, and it is very easy to build a Chan/Port upon this.

And I forgot, the scheduler of course also has to talk to the iotask. For example
when a task blocks on I/O, the scheduler has to notify the iotask somehow
that it should update it's event list (call uv_start_read() for example), which upon
the next event loop iteration would be integrated. Note that while the
iotask -> scheduler communication can be done using a simple mutex, as the
scheduler only runs for a very short period of time, the reverse communication
between scheduler -> iotask must be done non-blocking, as the iotask might
sleep for quite some time. Actually, we would need a way to wake up the event
loop in some way from the scheduler. Because imagine there are two tasks.
One is blocked for I/O. This means that the iotask is blocked on I/O. When now the second task also wants to do I/O, it cannot register it's event in the iotask, as this is still blocked in epoll/whatever. If no I/O arrives for the first task, the second would be blocked forever, waiting to register it's interest in I/O.
Luckily libuv provides a way to wake up the event loop from another thread
(in our case the scheduler thread). This is uv_async_init(), uv_async_send().

struct iotask {
  uv_loop_t *evloop;
  uv_async_t *async;
  list *pending_event_registrations;
};

struct iohandle {
  uv_stream_t stream;
  void *buffer;
  size_t buffer_size;
  ssize_t last_nread;
  task *owning_task;
  scheduler *sched;
};

uv_buf_t alloc_cb(io, _suggested_size) {
  return uv_buf_init(io->buffer, io->buffer_size);
}

on_read_cb(io, ssize_t nread, int status) {
  uv_stop_read(io);
  io->last_nread = nread;
  scheduler_notify(io);
}

async_cb() {
    mutex_lock_on(iot->pending_event_registrations_list);
    foreach e in iot->pending_event_registration_list {
      match e {
DoRead(iohandle) -> uv_read_start(iohandle, alloc_cb, on_read_cb),
           ...
    }
    mutex_unlock(iot->pending_event_registrations_list);
}

iotask_start(iotask *iot) {
   uv_async_init(iot->evloop, iot->async, async_cb);
   uv_run(iot->evloop, UV_RUN_DEFAULT); // this will run forever
}

scheduler_notify(io) {
  mutex_lock(io->sched);
  io->sched->unblock_task(io->task);
  mutex_unlock(io->sched);
}

scheduler_read(io) {
  mutex_lock_on(io->iotask->pending_event_registrations_list);
  io->iotask->pending_registration_list.append( DoRead(io) );
  mutex_unlock(io->iotask->pending_event_registrations_list);
  uv_async_send(iotask->async);
  // now block calling task and switch to other task
}

task() {
  io.read(buf, 1000);
  // this will do the following:
  // io.buffer = buf;
  // io.buffer_size = 1000;
  // scheduler_read(io);
}

Seems to be similar in concept to the code we have right now, just that we would not use channels for communication between iotask, and that the uv_read_stop is done in the iotask itself, i.e. no need for further communication. Also there are no context switches, because scheduler and iotask are separate tasks. The only downside is that every read()
needs to awake the iotask. This might be an issue!!!!

If instead we would use Chan I/O:

* allocate buffer for each read(). We only need a way to tell the system a per allocation buffer size, *and* a "total number of bytes" value used before blocking (blocking would mean here calling uv_read_stop), then this is OK. The length of the queue (channels are buffered) would suffice here, as the total number of bytes before blocking
      would then simply be length_of_channel_buffer * per_alloc_buffer_size
* the on_read_cb would then simply aquire the lock for the associated channel and append the buffer to it. * Only when the channel is blocked, we have to call uv_stop_read. And once it is unblocked (someone recv()ed from it)
     we have to notice this and call uv_start_read again.

Hm, now that I think about it, forget everything what I said in the beginning. I start to like this channel based approachmore, and I think it will perform much better, because the iotask is not interrupted anymore (no more uv_read_stop). And it seems to be simpler to implement, given that it is easy (and fast) to put something on a channel from the iotask (which is in C or
low-level rust).

Can I specify a size for a Channel, so it works like a SizedQueue? I guess this would be important to limit DOS attacks,
otherwise someone could flood our channel from the outside.

In case we want to read exactly n bytes from a I/O handle, we could also special case it easily, so that it reuses an allocated buffer until it contains n bytes (or EOF), before pushing it to the channel. This would be quite easy to do, but we should not take this special case into account right now. Another thing we could do to optimize certain cases is to explicitly specify an allocation method onto the I/O handle which is responsible for buffer allocation.

Using I/O channels this will be a very cool system!!! It will be as fast as pure libuv minus a) allocation overhead (vs. hand-crafted code), minus task switching (vs. simple callbacks). While a) is negligible with a good memory allocator (with a GC this would be a problem!)
b) is the cost we pay for easier coding and is exactly what we want!

This would be the "pseudo" code for the Channel I/O. Note that there would be special code in the ChanPort code, that invokes start_read again once a channel turns from full to not_full. I am assuming a SizedQueue here.

struct iotask {
  uv_loop_t *evloop;
  uv_async_t *async;
  list *pending_event_registrations;
};

struct iohandle {
  uv_stream_t stream;
  size_t buffer_size;
  chan *read_chan;
};

uv_buf_t alloc_cb(io, _suggested_size) {
  return malloc(io->buffer_size);
}

on_read_cb(io, uv_buf_t buf, ssize_t nread, int status) {
  lock(io->read_chan);
  // this will never block! NOTE that the iotask should be the only
  // one allowed to send to the read_chan (i.e. single writer!)
  io->read_chan->push(buf);

  if io->read_chan->full() {
    // if the channel is full, we have to tell libuv to stop reading
    // if we consume from a full channel, we would need to call
    // scheduler_start_read(io) again.
    uv_stop_read(io);
  }

  unlock(io->read_chan);
}

async_cb() {
    mutex_lock_on(iot->pending_event_registrations_list);
    foreach e in iot->pending_event_registration_list {
      match e {
StartRead(iohandle) -> uv_read_start(iohandle, alloc_cb, on_read_cb),
           ...
    }
    mutex_unlock(iot->pending_event_registrations_list);
}

iotask_start(iotask *iot) {
   uv_async_init(iot->evloop, iot->async, async_cb);
   uv_run(iot->evloop, UV_RUN_DEFAULT); // this will run forever
}

scheduler_start_read(io) {
  mutex_lock_on(io->iotask->pending_event_registrations_list);
  io->iotask->pending_registration_list.append( StartRead(io) );
  mutex_unlock(io->iotask->pending_event_registrations_list);
  uv_async_send(iotask->async);
  // return to calling task!!!
}

task() {
io = SocketOpen(buffer_size: 1000); // this will call scheduler_start_read(io) // io should actually contain internally two channels/ports, one pair for
  // each direction
  while io.recv() {
    ...
  }
}

Regards,

  Michael
_______________________________________________
Rust-dev mailing list
[email protected]
https://mail.mozilla.org/listinfo/rust-dev

Reply via email to