The current implementation of server and client assumes that a single
data message contains an encoded frame.
This is not a problem for most encoding but for MJPEG this causes
the client to fail decoding.
Collapse frame data into a single message before sending to the client.
This is done in the channel as the channel code is responsible to take care
of client protocol details. This allows for instance to support chunked
transfer to client if implemented.

Signed-off-by: Frediano Ziglio <fzig...@redhat.com>
---
 server/stream-channel.c | 99 ++++++++++++++++++++++++++++++++++++++++++-------
 server/stream-channel.h | 12 ++++++
 server/stream-device.c  | 10 +++--
 3 files changed, 103 insertions(+), 18 deletions(-)

Changes since v1:
- explain why is imple,mented that way;
- read multimedia time only once for first chunk of data.

diff --git a/server/stream-channel.c b/server/stream-channel.c
index 88f859f6..3a3b733f 100644
--- a/server/stream-channel.c
+++ b/server/stream-channel.c
@@ -44,6 +44,7 @@
 
 typedef struct StreamChannelClient StreamChannelClient;
 typedef struct StreamChannelClientClass StreamChannelClientClass;
+typedef struct StreamDataItem StreamDataItem;
 
 /* we need to inherit from CommonGraphicsChannelClient
  * to get buffer handling */
@@ -74,6 +75,10 @@ struct StreamChannel {
 
     StreamQueueStat queue_stat;
 
+    /* pending partial data item */
+    StreamDataItem *data_item;
+    uint32_t data_item_pos;
+
     /* callback to notify when a stream should be started or stopped */
     stream_channel_start_proc start_cb;
     void *start_opaque;
@@ -104,12 +109,12 @@ typedef struct StreamCreateItem {
     SpiceMsgDisplayStreamCreate stream_create;
 } StreamCreateItem;
 
-typedef struct StreamDataItem {
+struct StreamDataItem {
     RedPipeItem base;
     StreamChannel *channel;
     // NOTE: this must be the last field in the structure
     SpiceMsgDisplayStreamData data;
-} StreamDataItem;
+};
 
 #define PRIMARY_SURFACE_ID 0
 
@@ -129,6 +134,16 @@ stream_channel_client_init(StreamChannelClient *client)
     client->stream_id = -1;
 }
 
+static void
+stream_channel_unref_data_item(StreamChannel *channel)
+{
+    if (channel->data_item) {
+        red_pipe_item_unref(&channel->data_item->base);
+        channel->data_item = NULL;
+        channel->data_item_pos = 0;
+    }
+}
+
 static void
 request_new_stream(StreamChannel *channel, StreamMsgStartStop *start)
 {
@@ -152,6 +167,7 @@ stream_channel_client_on_disconnect(RedChannelClient *rcc)
     channel->stream_id = -1;
     channel->width = 0;
     channel->height = 0;
+    stream_channel_unref_data_item(channel);
 
     // send stream stop to device
     StreamMsgStartStop stop = { 0, };
@@ -424,6 +440,16 @@ stream_channel_constructed(GObject *object)
     reds_register_channel(reds, red_channel);
 }
 
+static void
+stream_channel_finalize(GObject *object)
+{
+    StreamChannel *channel = STREAM_CHANNEL(object);
+
+    stream_channel_unref_data_item(channel);
+
+    G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object);
+}
+
 static void
 stream_channel_class_init(StreamChannelClass *klass)
 {
@@ -431,6 +457,7 @@ stream_channel_class_init(StreamChannelClass *klass)
     RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
 
     object_class->constructed = stream_channel_constructed;
+    object_class->finalize = stream_channel_finalize;
 
     channel_class->parser = 
spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL);
     channel_class->handle_message = handle_message;
@@ -503,11 +530,46 @@ data_item_free(RedPipeItem *base)
 {
     StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
 
-    stream_channel_update_queue_stat(pipe_item->channel, -1, 
-pipe_item->data.data_size);
+    if (pipe_item->channel->data_item != pipe_item) {
+        stream_channel_update_queue_stat(pipe_item->channel, -1, 
-pipe_item->data.data_size);
+    }
 
     g_free(pipe_item);
 }
 
+static StreamDataItem*
+stream_channel_new_data_item(StreamChannel *channel, size_t size, uint32_t 
mm_time)
+{
+    stream_channel_unref_data_item(channel);
+
+    StreamDataItem *item = g_malloc(sizeof(*item) + size);
+    red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
+                            data_item_free);
+    item->data.base.id = channel->stream_id;
+    item->data.base.multi_media_time = mm_time;
+    item->data.data_size = size;
+    item->channel = channel;
+
+    channel->data_item = item;
+    channel->data_item_pos = 0;
+
+    return item;
+}
+
+void
+stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t 
mm_time)
+{
+    // see stream_channel_send_data comment
+    if (channel->stream_id < 0) {
+        return;
+    }
+
+    // TODO this collects all chunks in a single message
+    // up: we send a single frame together (more compatible)
+    // down: guest can cause a crash due to DoS. As a safe measure we limit 
the maximum message
+    stream_channel_new_data_item(channel, MIN(size, 32*1024*1024), mm_time);
+}
+
 void
 stream_channel_send_data(StreamChannel *channel, const void *data, size_t 
size, uint32_t mm_time)
 {
@@ -520,17 +582,25 @@ stream_channel_send_data(StreamChannel *channel, const 
void *data, size_t size,
 
     RedChannel *red_channel = RED_CHANNEL(channel);
 
-    StreamDataItem *item = g_malloc(sizeof(*item) + size);
-    red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
-                            data_item_free);
-    item->data.base.id = channel->stream_id;
-    item->data.base.multi_media_time = mm_time;
-    item->data.data_size = size;
-    item->channel = channel;
-    stream_channel_update_queue_stat(channel, 1, size);
-    // TODO try to optimize avoiding the copy
-    memcpy(item->data.data, data, size);
-    red_channel_pipes_add(red_channel, &item->base);
+    while (size) {
+        StreamDataItem *item = channel->data_item;
+
+        if (!item) {
+            item = stream_channel_new_data_item(channel, size, mm_time);
+        }
+
+        size_t copy_size = item->data.data_size - channel->data_item_pos;
+        copy_size = MIN(copy_size, size);
+        // TODO try to optimize avoiding the copy
+        memcpy(item->data.data + channel->data_item_pos, data, copy_size);
+        size -= copy_size;
+        channel->data_item_pos += copy_size;
+        if (channel->data_item_pos == item->data.data_size) {
+            channel->data_item = NULL;
+            stream_channel_update_queue_stat(channel, 1, item->data.data_size);
+            red_channel_pipes_add(red_channel, &item->base);
+        }
+    }
 }
 
 void
@@ -570,6 +640,7 @@ stream_channel_reset(StreamChannel *channel)
     channel->stream_id = -1;
     channel->width = 0;
     channel->height = 0;
+    stream_channel_unref_data_item(channel);
 
     if (!red_channel_is_connected(red_channel)) {
         return;
diff --git a/server/stream-channel.h b/server/stream-channel.h
index e8bec80b..18a1bdea 100644
--- a/server/stream-channel.h
+++ b/server/stream-channel.h
@@ -60,6 +60,18 @@ struct StreamMsgStartStop;
 
 void stream_channel_change_format(StreamChannel *channel,
                                   const struct StreamMsgFormat *fmt);
+
+/**
+ * Tell the channel that a new data packet is starting.
+ * This can be used to group all chunks together.
+ */
+void stream_channel_start_data(StreamChannel *channel,
+                               size_t size,
+                               uint32_t mm_time);
+
+/**
+ * Send to channel a chunk of data.
+ */
 void stream_channel_send_data(StreamChannel *channel,
                               const void *data, size_t size,
                               uint32_t mm_time);
diff --git a/server/stream-device.c b/server/stream-device.c
index 6cf29d37..c84ebfac 100644
--- a/server/stream-device.c
+++ b/server/stream-device.c
@@ -273,11 +273,13 @@ handle_msg_data(StreamDevice *dev, 
SpiceCharDeviceInstance *sin)
         if (n <= 0) {
             break;
         }
-        // TODO collect all message ??
-        // up: we send a single frame together
-        // down: guest can cause a crash
-        stream_channel_send_data(dev->stream_channel, buf, n, 
reds_get_mm_time());
+        uint32_t mm_time = reds_get_mm_time();
+        if (dev->msg_pos == 0) {
+            stream_channel_start_data(dev->stream_channel, dev->hdr.size, 
mm_time);
+        }
+        stream_channel_send_data(dev->stream_channel, buf, n, mm_time);
         dev->hdr.size -= n;
+        dev->msg_pos += n;
     }
 
     return dev->hdr.size == 0;
-- 
2.14.3

_______________________________________________
Spice-devel mailing list
Spice-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to