On Mon, Nov 28, 2011 at 01:13:45PM +0000, Daniel P. Berrange wrote: > From: "Daniel P. Berrange" <[email protected]> > > The GIO GInputStream/GOutputStream async model for I/O does not > work for working with non-blocking bi-directional streams. To > allow that to be done more effectively, add an API to allow > main loop watches to be registered against streams. > > Since the libvirt level virStreamEventAddCallback API only allows > a single callback to be registered to a stream at any time, the > GVirStream object needs to be multiplexing of multiple watches into > a single libvirt level callback. > > Watches can be removed in the normal way with g_source_remove > > * libvirt-gobject/libvirt-gobject-stream.c, > libvirt-gobject/libvirt-gobject-stream.h, > libvirt-gobject/libvirt-gobject.sym: Add gvir_stream_add_watch > --- > libvirt-gobject/libvirt-gobject-stream.c | 180 > ++++++++++++++++++++++++++++++ > libvirt-gobject/libvirt-gobject-stream.h | 17 +++ > libvirt-gobject/libvirt-gobject.sym | 1 + > 3 files changed, 198 insertions(+), 0 deletions(-) > > diff --git a/libvirt-gobject/libvirt-gobject-stream.c > b/libvirt-gobject/libvirt-gobject-stream.c > index 0d1c2d1..03b2c84 100644 > --- a/libvirt-gobject/libvirt-gobject-stream.c > +++ b/libvirt-gobject/libvirt-gobject-stream.c > @@ -46,8 +46,20 @@ struct _GVirStreamPrivate > virStreamPtr handle; > GInputStream *input_stream; > GOutputStream *output_stream; > + > + gboolean eventRegistered; > + int eventLast; > + GList *sources; > }; > > +typedef struct { > + GSource source; > + GVirStreamIOCondition cond; > + GVirStreamIOCondition newCond; > + GVirStream *stream; > +} GVirStreamSource; > + > + > G_DEFINE_TYPE(GVirStream, gvir_stream, G_TYPE_IO_STREAM); > > > @@ -186,6 +198,7 @@ static void gvir_stream_finalize(GObject *object) > { > GVirStream *self = GVIR_STREAM(object); > GVirStreamPrivate *priv = self->priv; > + GList *tmp; > > DEBUG("Finalize GVirStream=%p", self); > > @@ -199,6 +212,14 @@ static void gvir_stream_finalize(GObject *object) > virStreamFree(priv->handle); > } > > + tmp = priv->sources; > + while (tmp) { > + GVirStreamSource *source = tmp->data; > + g_source_remove(g_source_get_id((GSource*)source));
I think g_source_destroy can be used here
> + tmp = tmp->next;
> + }
> + g_list_free(priv->sources);
> +
> G_OBJECT_CLASS(gvir_stream_parent_class)->finalize(object);
> }
>
> @@ -448,3 +469,162 @@ gvir_stream_send_all(GVirStream *self,
> GVirStreamSourceFunc func, gpointer user_
>
> return r;
> }
> +
> +
> +static void gvir_stream_handle_events(virStreamPtr st G_GNUC_UNUSED,
> + int events,
> + void *opaque)
> +{
> + GVirStream *stream = GVIR_STREAM(opaque);
> + GVirStreamPrivate *priv = stream->priv;
> + GList *tmp = priv->sources;
> +
> + while (tmp) {
> + GVirStreamSource *source = tmp->data;
> + source->newCond = 0;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE) {
> + if (events & VIR_STREAM_EVENT_READABLE)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_READABLE;
> + if (events & VIR_STREAM_EVENT_HANGUP)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> + if (events & VIR_STREAM_EVENT_ERROR)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> + }
> + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE) {
> + if (events & VIR_STREAM_EVENT_WRITABLE)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_WRITABLE;
> + if (events & VIR_STREAM_EVENT_HANGUP)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_HANGUP;
> + if (events & VIR_STREAM_EVENT_ERROR)
> + source->newCond |= GVIR_STREAM_IO_CONDITION_ERROR;
> + }
> + tmp = tmp->next;
> + }
> +
> +}
> +
> +
> +static void gvir_stream_update_events(GVirStream *stream)
> +{
> + GVirStreamPrivate *priv = stream->priv;
> + int mask = 0;
> + GList *tmp = priv->sources;
> +
> + while (tmp) {
> + GVirStreamSource *source = tmp->data;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_READABLE)
> + mask |= VIR_STREAM_EVENT_READABLE;
> + if (source->cond & GVIR_STREAM_IO_CONDITION_WRITABLE)
> + mask |= VIR_STREAM_EVENT_WRITABLE;
> + tmp = tmp->next;
> + }
> +
> + if (mask) {
> + if (priv->eventRegistered) {
> + virStreamEventUpdateCallback(priv->handle, mask);
> + } else {
> + virStreamEventAddCallback(priv->handle, mask,
> + gvir_stream_handle_events,
> + g_object_ref(stream),
> + g_object_unref);
> + priv->eventRegistered = TRUE;
> + }
> + } else {
> + if (priv->eventRegistered) {
> + virStreamEventRemoveCallback(priv->handle);
> + priv->eventRegistered = FALSE;
> + }
> + }
> +}
> +
> +static gboolean gvir_stream_source_prepare(GSource *source,
> + gint *timeout)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + if (gsource->newCond) {
> + *timeout = 0;
> + return TRUE;
> + }
> + *timeout = -1;
> + return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_check(GSource *source)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + if (gsource->newCond)
> + return TRUE;
> + return FALSE;
> +}
> +
> +static gboolean gvir_stream_source_dispatch(GSource *source,
> + GSourceFunc callback,
> + gpointer user_data)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + GVirStreamIOFunc func = (GVirStreamIOFunc)callback;
> + gboolean ret;
> + ret = func(gsource->stream, gsource->newCond, user_data);
> + gsource->newCond = 0;
> + return ret;
> +}
> +
> +static void gvir_stream_source_finalize(GSource *source)
> +{
> + GVirStreamSource *gsource = (GVirStreamSource*)source;
> + GVirStreamPrivate *priv = gsource->stream->priv;
> + GList *tmp, *prev = NULL;
> +
> + tmp = priv->sources;
> + while (tmp) {
> + if (tmp->data == source) {
> + if (prev) {
> + prev->next = tmp->next;
> + } else {
> + priv->sources = tmp->next;
> + }
> + tmp->next = NULL;
> + g_list_free(tmp);
> + break;
> + }
> +
> + prev = tmp;
> + tmp = tmp->next;
> + }
isn't it doing the same as g_list_remove?
> +
> + gvir_stream_update_events(gsource->stream);
> +}
> +
> +GSourceFuncs gvir_stream_source_funcs = {
> + .prepare = gvir_stream_source_prepare,
> + .check = gvir_stream_source_check,
> + .dispatch = gvir_stream_source_dispatch,
> + .finalize = gvir_stream_source_finalize,
> +};
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + GVirStreamIOFunc func,
> + gpointer opaque,
> + GDestroyNotify notify)
Dunno if it's worth having both gvir_stream_add_watch and
gvir_stream_add_watch_full to be consistent with most glib source functions
(g_timeout_add, g_idle_add, g_io_add_watch, ...). The notify argument would
only be in the _full variant.
> +{
> + GVirStreamPrivate *priv = stream->priv;
> + gint id;
> + GVirStreamSource *source =
> (GVirStreamSource*)g_source_new(&gvir_stream_source_funcs,
> +
> sizeof(GVirStreamSource));
> +
> + source->stream = stream;
> + source->cond = cond;
> +
> + priv->sources = g_list_append(priv->sources, source);
> +
> + gvir_stream_update_events(source->stream);
> +
> + g_source_set_callback((GSource*)source, (GSourceFunc)func, opaque,
> notify);
> + g_source_attach((GSource*)source, g_main_context_default());
> +
> + id = g_source_get_id((GSource*)source);
g_source_attach returns this id which is of type guint.
> + g_source_unref((GSource*)source);
> +
> + return id;
> +}
> diff --git a/libvirt-gobject/libvirt-gobject-stream.h
> b/libvirt-gobject/libvirt-gobject-stream.h
> index 5a1ee68..e0004b2 100644
> --- a/libvirt-gobject/libvirt-gobject-stream.h
> +++ b/libvirt-gobject/libvirt-gobject-stream.h
> @@ -93,6 +93,23 @@ typedef gint (* GVirStreamSourceFunc)(GVirStream *stream,
> GType gvir_stream_get_type(void);
> GType gvir_stream_handle_get_type(void);
>
> +typedef enum {
> + GVIR_STREAM_IO_CONDITION_READABLE = (1 << 0),
> + GVIR_STREAM_IO_CONDITION_WRITABLE = (1 << 1),
> + GVIR_STREAM_IO_CONDITION_HANGUP = (1 << 2),
> + GVIR_STREAM_IO_CONDITION_ERROR = (1 << 3),
> +} GVirStreamIOCondition;
> +
> +typedef gboolean (*GVirStreamIOFunc)(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + gpointer opaque);
> +
> +gint gvir_stream_add_watch(GVirStream *stream,
> + GVirStreamIOCondition cond,
> + GVirStreamIOFunc func,
> + gpointer opaque,
> + GDestroyNotify notify);
> +
> gssize gvir_stream_receive_all(GVirStream *stream, GVirStreamSinkFunc func,
> gpointer user_data, GError **error);
> gssize gvir_stream_receive(GVirStream *stream, gchar *buffer, gsize size,
> GCancellable *cancellable, GError **error);
>
> diff --git a/libvirt-gobject/libvirt-gobject.sym
> b/libvirt-gobject/libvirt-gobject.sym
> index 78b3935..6261865 100644
> --- a/libvirt-gobject/libvirt-gobject.sym
> +++ b/libvirt-gobject/libvirt-gobject.sym
> @@ -126,6 +126,7 @@ LIBVIRT_GOBJECT_0.0.1 {
> gvir_stream_get_type;
> gvir_stream_receive_all;
> gvir_stream_handle_get_type;
> + gvir_stream_add_watch;
>
> local:
> *;
> --
> 1.7.6.4
>
> --
> libvir-list mailing list
> [email protected]
> https://www.redhat.com/mailman/listinfo/libvir-list
pgp3Lo6huuYLz.pgp
Description: PGP signature
-- libvir-list mailing list [email protected] https://www.redhat.com/mailman/listinfo/libvir-list
