So I was reviewing this code and had a few comments but then I realized
that we already have a basic flow control mechanism for char devices.
RedCharDeviceClient has an implementation based on 'tokens'. Is there
any reason that we can't use this instead of re-implementing it here?



On Wed, 2017-06-14 at 16:40 +0100, Frediano Ziglio wrote:
> Do not allow the guest to fill host memory.
> Also having a huge queue mainly cause to have a higher video
> latency.
> 
> Signed-off-by: Frediano Ziglio <fzig...@redhat.com>
> ---
>  server/stream-channel.c | 41
> ++++++++++++++++++++++++++++++++++++++++-
>  server/stream-channel.h | 10 ++++++++++
>  server/stream-device.c  | 34 +++++++++++++++++++++++++++++++++-
>  3 files changed, 83 insertions(+), 2 deletions(-)
> 
> diff --git a/server/stream-channel.c b/server/stream-channel.c
> index 58c550e..966dd77 100644
> --- a/server/stream-channel.c
> +++ b/server/stream-channel.c
> @@ -68,9 +68,15 @@ struct StreamChannel {
>      /* size of the current video stream */
>      unsigned width, height;
>  
> +    StreamQueueStat queue_stat;
> +
>      /* callback to notify when a stream should be started or stopped
> */
>      stream_channel_start_proc start_cb;
>      void *start_opaque;
> +
> +    /* callback to notify when queue statistics changes */
> +    stream_channel_queue_stat_proc queue_cb;
> +    void *queue_opaque;
>  };
>  
>  struct StreamChannelClass {
> @@ -95,6 +101,7 @@ typedef struct StreamCreateItem {
>  
>  typedef struct StreamDataItem {
>      RedPipeItem base;
> +    StreamChannel *channel;
>      // NOTE: this must be the last field in the structure
>      SpiceMsgDisplayStreamData data;
>  } StreamDataItem;
> @@ -450,6 +457,27 @@ stream_channel_change_format(StreamChannel
> *channel, const StreamMsgFormat *fmt)
>      red_pipe_item_unref(&item->base);
>  }
>  
> +static inline void
> +stream_channel_update_queue_stat(StreamChannel *channel,
> +                                 int32_t num_diff, int32_t
> size_diff)
> +{
> +    channel->queue_stat.num_items += num_diff;
> +    channel->queue_stat.size += size_diff;
> +    if (channel->queue_cb) {
> +        channel->queue_cb(channel->queue_opaque, &channel-
> >queue_stat, channel);
> +    }
> +}
> +
> +static void
> +data_item_free(RedPipeItem *base)
> +{
> +    StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
> +
> +    stream_channel_update_queue_stat(pipe_item->channel, -1,
> -pipe_item->data.data_size);
> +
> +    free(pipe_item);
> +}
> +
>  void
>  stream_channel_send_data(StreamChannel *channel, const void *data,
> size_t size)
>  {
> @@ -460,10 +488,13 @@ stream_channel_send_data(StreamChannel
> *channel, const void *data, size_t size)
>      RedChannel *red_channel = RED_CHANNEL(channel);
>  
>      StreamDataItem *item = spice_malloc(sizeof(*item) + size);
> -    red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA);
> +    red_pipe_item_init_full(&item->base,
> RED_PIPE_ITEM_TYPE_STREAM_DATA,
> +                            data_item_free);
>      item->data.base.id = channel->stream_id;
>      item->data.base.multi_media_time = reds_get_mm_time();
>      item->data.data_size = size;
> +    item->channel = channel;
> +    stream_channel_update_queue_stat(channel, 1, size);
>      // TODO try to optimize avoiding the copy
>      memcpy(item->data.data, data, size);
>      red_channel_pipes_new_add(red_channel, pipe_item_new_ref, item);
> @@ -479,6 +510,14 @@ stream_channel_register_start_cb(StreamChannel
> *channel,
>  }
>  
>  void
> +stream_channel_register_queue_stat_cb(StreamChannel *channel,
> +                                      stream_channel_queue_stat_proc
> cb, void *opaque)
> +{
> +    channel->queue_cb = cb;
> +    channel->queue_opaque = opaque;
> +}
> +
> +void
>  stream_channel_reset(StreamChannel *channel)
>  {
>      RedChannel *red_channel = RED_CHANNEL(channel);
> diff --git a/server/stream-channel.h b/server/stream-channel.h
> index 8c9c0f1..ca20b6a 100644
> --- a/server/stream-channel.h
> +++ b/server/stream-channel.h
> @@ -65,6 +65,16 @@ typedef void (*stream_channel_start_proc)(void
> *opaque, struct StreamMsgStartSto
>  void stream_channel_register_start_cb(StreamChannel *channel,
>                                        stream_channel_start_proc cb,
> void *opaque);
>  
> +typedef struct StreamQueueStat {
> +    uint32_t num_items;
> +    uint32_t size;
> +} StreamQueueStat;
> +
> +typedef void (*stream_channel_queue_stat_proc)(void *opaque, const
> StreamQueueStat *stats,
> +                                               StreamChannel
> *channel);
> +void stream_channel_register_queue_stat_cb(StreamChannel *channel,
> +                                           stream_channel_queue_stat
> _proc cb, void *opaque);
> +
>  G_END_DECLS
>  
>  #endif /* STREAM_CHANNEL_H_ */
> diff --git a/server/stream-device.c b/server/stream-device.c
> index b8a9dac..2b1e2f2 100644
> --- a/server/stream-device.c
> +++ b/server/stream-device.c
> @@ -44,6 +44,7 @@ struct StreamDevice {
>      uint8_t hdr_pos;
>      bool has_error;
>      bool opened;
> +    bool flow_stopped;
>      StreamChannel *channel;
>  };
>  
> @@ -67,7 +68,7 @@ stream_device_read_msg_from_dev(RedCharDevice
> *self, SpiceCharDeviceInstance *si
>      SpiceCharDeviceInterface *sif;
>      int n;
>  
> -    if (dev->has_error) {
> +    if (dev->has_error || dev->flow_stopped) {
>          return NULL;
>      }
>  
> @@ -165,6 +166,9 @@ handle_msg_data(StreamDevice *dev,
> SpiceCharDeviceInstance *sin)
>          if (n <= 0) {
>              break;
>          }
> +        // TODO collect all message ??
> +        // up: we send a single frame together
> +        // down: guest can cause a crash
>          stream_channel_send_data(dev->channel, buf, n);
>          dev->hdr.size -= n;
>      }
> @@ -218,6 +222,33 @@ stream_device_stream_start(void *opaque,
> StreamMsgStartStop *start,
>      red_char_device_write_buffer_add(char_dev, buf);
>  }
>  
> +static void
> +stream_device_stream_queue_stat(void *opaque, const StreamQueueStat
> *stats G_GNUC_UNUSED,
> +                                StreamChannel *channel
> G_GNUC_UNUSED)
> +{
> +    StreamDevice *dev = (StreamDevice *) opaque;
> +
> +    if (!dev->opened) {
> +        return;
> +    }
> +
> +    // very easy control flow... if any data stop
> +    // this seems a very small queue but as we use tcp
> +    // there's already that queue
> +    if (stats->num_items) {
> +        dev->flow_stopped = true;
> +        return;
> +    }
> +
> +    if (dev->flow_stopped) {
> +        dev->flow_stopped = false;
> +        // TODO resume flow...
> +        // avoid recursion if we need to call get data from data
> handling from
> +        // data handling
> +        red_char_device_wakeup(&dev->parent);
> +    }
> +}
> +
>  RedCharDevice *
>  stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin)
>  {
> @@ -228,6 +259,7 @@ stream_device_connect(RedsState *reds,
> SpiceCharDeviceInstance *sin)
>      StreamDevice *dev = stream_device_new(sin, reds);
>      dev->channel = channel;
>      stream_channel_register_start_cb(channel,
> stream_device_stream_start, dev);
> +    stream_channel_register_queue_stat_cb(channel,
> stream_device_stream_queue_stat, dev);
>  
>      sif = spice_char_device_get_interface(sin);
>      if (sif->state) {
_______________________________________________
Spice-devel mailing list
Spice-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to