On Tue, 2017-08-22 at 11:26 -0400, Frediano Ziglio wrote:
> > 
> > So I was reviewing this code and had a few comments but then I
> > realized
> > that we already have a basic flow control mechanism for char
> > devices.
> > RedCharDeviceClient has an implementation based on 'tokens'. Is
> > there
> > any reason that we can't use this instead of re-implementing it
> > here?
> > 
> 
> I think the main reason is that the token implementation requires
> to use the characters device as a pass-through device for the client
> while this device is not a pass-through.

Yes, it's not a pass-through, but when I looked, I had thought that it
would be possible to use the RedCharDeviceClient for our purpose. It's
not explicitly a pass-through: the data can be transformed (or dropped)
by the implementation of
RedCharDeviceClass::read_one_msg_from_device().

But now I see that some of our char device reads may result in many
RedPipeItem being sent (e.g. reading a STREAM_TYPE_FORMAT message 
results in us sending the following pipe items over the stream channel:
STREAM_DESTROY, SURFACE_DESTROY, SURFACE_CREATE, DISPLAY_MARK,
STREAM_CREATE, and STREAM_ACTIVATE_REPORT)...


> 
> This implementation does not surely solve the entire problem.
> The streaming in the DisplayChannel protocol is supposed to use
> stream reports on the server <-> client chat and this should be
> propagated to the guest trying to reduce the bandwidth usage
> (reducing frames and/or quality). Currently I enable these
> report but there's no handling of it. Also there's no messages
> for the guest for these information.
> 
> Basically this patch solve one part of the issue. It avoids
> the queue guest <-> server to grow undefinitely.

OK. Even so, it seems a bit of a shame to have to sort of re-implement
flow control here. Maybe there's no choice...


> 
> Frediano
> 
> > 
> > 
> > On Wed, 2017-06-14 at 16:40 +0100, Frediano Ziglio wrote:
> > > Do not allow the guest to fill host memory.
> > > Also having a huge queue mainly cause to have a higher video
> > > latency.
> > > 
> > > Signed-off-by: Frediano Ziglio <fzig...@redhat.com>
> > > ---
> > >  server/stream-channel.c | 41
> > > ++++++++++++++++++++++++++++++++++++++++-
> > >  server/stream-channel.h | 10 ++++++++++
> > >  server/stream-device.c  | 34 +++++++++++++++++++++++++++++++++-
> > >  3 files changed, 83 insertions(+), 2 deletions(-)
> > > 
> > > diff --git a/server/stream-channel.c b/server/stream-channel.c
> > > index 58c550e..966dd77 100644
> > > --- a/server/stream-channel.c
> > > +++ b/server/stream-channel.c
> > > @@ -68,9 +68,15 @@ struct StreamChannel {
> > >      /* size of the current video stream */
> > >      unsigned width, height;
> > >  
> > > +    StreamQueueStat queue_stat;
> > > +
> > >      /* callback to notify when a stream should be started or
> > > stopped
> > > */
> > >      stream_channel_start_proc start_cb;
> > >      void *start_opaque;
> > > +
> > > +    /* callback to notify when queue statistics changes */
> > > +    stream_channel_queue_stat_proc queue_cb;
> > > +    void *queue_opaque;
> > >  };
> > >  
> > >  struct StreamChannelClass {
> > > @@ -95,6 +101,7 @@ typedef struct StreamCreateItem {
> > >  
> > >  typedef struct StreamDataItem {
> > >      RedPipeItem base;
> > > +    StreamChannel *channel;
> > >      // NOTE: this must be the last field in the structure
> > >      SpiceMsgDisplayStreamData data;
> > >  } StreamDataItem;
> > > @@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel
> > > *channel, const StreamMsgFormat *fmt)
> > >      red_pipe_item_unref(&item->base);
> > >  }
> > >  
> > > +static inline void
> > > +stream_channel_update_queue_stat(StreamChannel *channel,
> > > +                                 int32_t num_diff, int32_t
> > > size_diff)
> > > +{
> > > +    channel->queue_stat.num_items += num_diff;
> > > +    channel->queue_stat.size += size_diff;
> > > +    if (channel->queue_cb) {
> > > +        channel->queue_cb(channel->queue_opaque, &channel-
> > > > queue_stat, channel);
> > > 
> > > +    }
> > > +}
> > > +
> > > +static void
> > > +data_item_free(RedPipeItem *base)
> > > +{
> > > +    StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem,
> > > base);
> > > +
> > > +    stream_channel_update_queue_stat(pipe_item->channel, -1,
> > > -pipe_item->data.data_size);
> > > +
> > > +    free(pipe_item);
> > > +}
> > > +
> > >  void
> > >  stream_channel_send_data(StreamChannel *channel, const void
> > > *data,
> > > size_t size)
> > >  {
> > > @@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel
> > > *channel, const void *data, size_t size)
> > >      RedChannel *red_channel = RED_CHANNEL(channel);
> > >  
> > >      StreamDataItem *item = spice_malloc(sizeof(*item) + size);
> > > -    red_pipe_item_init(&item->base,
> > > RED_PIPE_ITEM_TYPE_STREAM_DATA);
> > > +    red_pipe_item_init_full(&item->base,
> > > RED_PIPE_ITEM_TYPE_STREAM_DATA,
> > > +                            data_item_free);
> > >      item->data.base.id = channel->stream_id;
> > >      item->data.base.multi_media_time = reds_get_mm_time();
> > >      item->data.data_size = size;
> > > +    item->channel = channel;
> > > +    stream_channel_update_queue_stat(channel, 1, size);
> > >      // TODO try to optimize avoiding the copy
> > >      memcpy(item->data.data, data, size);
> > >      red_channel_pipes_new_add(red_channel, pipe_item_new_ref,
> > > item);
> > > @@ -479,6 +510,14 @@
> > > stream_channel_register_start_cb(StreamChannel
> > > *channel,
> > >  }
> > >  
> > >  void
> > > +stream_channel_register_queue_stat_cb(StreamChannel *channel,
> > > +                                      stream_channel_queue_stat_
> > > proc
> > > cb, void *opaque)
> > > +{
> > > +    channel->queue_cb = cb;
> > > +    channel->queue_opaque = opaque;
> > > +}
> > > +
> > > +void
> > >  stream_channel_reset(StreamChannel *channel)
> > >  {
> > >      RedChannel *red_channel = RED_CHANNEL(channel);
> > > diff --git a/server/stream-channel.h b/server/stream-channel.h
> > > index 8c9c0f1..ca20b6a 100644
> > > --- a/server/stream-channel.h
> > > +++ b/server/stream-channel.h
> > > @@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void
> > > *opaque, struct StreamMsgStartSto
> > >  void stream_channel_register_start_cb(StreamChannel *channel,
> > >                                        stream_channel_start_proc
> > > cb,
> > > void *opaque);
> > >  
> > > +typedef struct StreamQueueStat {
> > > +    uint32_t num_items;
> > > +    uint32_t size;
> > > +} StreamQueueStat;
> > > +
> > > +typedef void (*stream_channel_queue_stat_proc)(void *opaque,
> > > const
> > > StreamQueueStat *stats,
> > > +                                               StreamChannel
> > > *channel);
> > > +void stream_channel_register_queue_stat_cb(StreamChannel
> > > *channel,
> > > +                                           stream_channel_queue_
> > > stat
> > > _proc cb, void *opaque);
> > > +
> > >  G_END_DECLS
> > >  
> > >  #endif /* STREAM_CHANNEL_H_ */
> > > diff --git a/server/stream-device.c b/server/stream-device.c
> > > index b8a9dac..2b1e2f2 100644
> > > --- a/server/stream-device.c
> > > +++ b/server/stream-device.c
> > > @@ -44,6 +44,7 @@ struct StreamDevice {
> > >      uint8_t hdr_pos;
> > >      bool has_error;
> > >      bool opened;
> > > +    bool flow_stopped;
> > >      StreamChannel *channel;
> > >  };
> > >  
> > > @@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice
> > > *self, SpiceCharDeviceInstance *si
> > >      SpiceCharDeviceInterface *sif;
> > >      int n;
> > >  
> > > -    if (dev->has_error) {
> > > +    if (dev->has_error || dev->flow_stopped) {
> > >          return NULL;
> > >      }
> > >  
> > > @@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev,
> > > SpiceCharDeviceInstance *sin)
> > >          if (n <= 0) {
> > >              break;
> > >          }
> > > +        // TODO collect all message ??
> > > +        // up: we send a single frame together
> > > +        // down: guest can cause a crash
> > >          stream_channel_send_data(dev->channel, buf, n);
> > >          dev->hdr.size -= n;
> > >      }
> > > @@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque,
> > > StreamMsgStartStop *start,
> > >      red_char_device_write_buffer_add(char_dev, buf);
> > >  }
> > >  
> > > +static void
> > > +stream_device_stream_queue_stat(void *opaque, const
> > > StreamQueueStat
> > > *stats G_GNUC_UNUSED,
> > > +                                StreamChannel *channel
> > > G_GNUC_UNUSED)
> > > +{
> > > +    StreamDevice *dev = (StreamDevice *) opaque;
> > > +
> > > +    if (!dev->opened) {
> > > +        return;
> > > +    }
> > > +
> > > +    // very easy control flow... if any data stop
> > > +    // this seems a very small queue but as we use tcp
> > > +    // there's already that queue
> > > +    if (stats->num_items) {
> > > +        dev->flow_stopped = true;
> > > +        return;
> > > +    }
> > > +
> > > +    if (dev->flow_stopped) {
> > > +        dev->flow_stopped = false;
> > > +        // TODO resume flow...
> > > +        // avoid recursion if we need to call get data from data
> > > handling from
> > > +        // data handling
> > > +        red_char_device_wakeup(&dev->parent);
> > > +    }
> > > +}
> > > +
> > >  RedCharDevice *
> > >  stream_device_connect(RedsState *reds, SpiceCharDeviceInstance
> > > *sin)
> > >  {
> > > @@ -228,6 +259,7 @@ stream_device_connect(RedsState *reds,
> > > SpiceCharDeviceInstance *sin)
> > >      StreamDevice *dev = stream_device_new(sin, reds);
> > >      dev->channel = channel;
> > >      stream_channel_register_start_cb(channel,
> > > stream_device_stream_start, dev);
> > > +    stream_channel_register_queue_stat_cb(channel,
> > > stream_device_stream_queue_stat, dev);
> > >  
> > >      sif = spice_char_device_get_interface(sin);
> > >      if (sif->state) {
_______________________________________________
Spice-devel mailing list
Spice-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to