The current implementation of server and client assumes that a single
data message contains an encoded frame.
This is not a problem for most encoding but for MJPEG this causes
the client to fail decoding.
Collapse frame data into a single message before sending to the client.
This is done in the channel as the channel code is responsible to take care
of client protocol details. This allows for instance to support chunked
transfer to client if implemented.

Signed-off-by: Frediano Ziglio <fzig...@redhat.com>
---
 server/red-stream-device.c | 10 +++--
 server/stream-channel.c    | 83 +++++++++++++++++++++++++++++++++-----
 server/stream-channel.h    | 12 ++++++
 3 files changed, 90 insertions(+), 15 deletions(-)

diff --git a/server/red-stream-device.c b/server/red-stream-device.c
index df6a366f..864be99e 100644
--- a/server/red-stream-device.c
+++ b/server/red-stream-device.c
@@ -314,11 +314,13 @@ handle_msg_data(StreamDevice *dev, 
SpiceCharDeviceInstance *sin)
         if (n <= 0) {
             break;
         }
-        // TODO collect all message ??
-        // up: we send a single frame together
-        // down: guest can cause a crash
-        stream_channel_send_data(dev->stream_channel, buf, n, 
reds_get_mm_time());
+        uint32_t mm_time = reds_get_mm_time();
+        if (dev->msg_pos == 0) {
+            stream_channel_start_data(dev->stream_channel, dev->hdr.size, 
mm_time);
+        }
+        stream_channel_send_data(dev->stream_channel, buf, n, mm_time);
         dev->hdr.size -= n;
+        dev->msg_pos += n;
     }
 
     return dev->hdr.size == 0;
diff --git a/server/stream-channel.c b/server/stream-channel.c
index fc409ee6..0ac7ba1d 100644
--- a/server/stream-channel.c
+++ b/server/stream-channel.c
@@ -44,6 +44,7 @@
 
 typedef struct StreamChannelClient StreamChannelClient;
 typedef struct StreamChannelClientClass StreamChannelClientClass;
+typedef struct StreamDataItem StreamDataItem;
 
 /* we need to inherit from CommonGraphicsChannelClient
  * to get buffer handling */
@@ -74,6 +75,10 @@ struct StreamChannel {
 
     StreamQueueStat queue_stat;
 
+    /* pending partial data item */
+    StreamDataItem *data_item;
+    uint32_t data_item_pos;
+
     /* callback to notify when a stream should be started or stopped */
     stream_channel_start_proc start_cb;
     void *start_opaque;
@@ -105,12 +110,12 @@ typedef struct StreamCreateItem {
     SpiceMsgDisplayStreamCreate stream_create;
 } StreamCreateItem;
 
-typedef struct StreamDataItem {
+struct StreamDataItem {
     RedPipeItem base;
     StreamChannel *channel;
     // NOTE: this must be the last field in the structure
     SpiceMsgDisplayStreamData data;
-} StreamDataItem;
+};
 
 #define PRIMARY_SURFACE_ID 0
 
@@ -130,6 +135,16 @@ stream_channel_client_init(StreamChannelClient *client)
     client->stream_id = -1;
 }
 
+static void
+stream_channel_unref_data_item(StreamChannel *channel)
+{
+    if (channel->data_item) {
+        red_pipe_item_unref(&channel->data_item->base);
+        channel->data_item = NULL;
+        channel->data_item_pos = 0;
+    }
+}
+
 static void
 request_new_stream(StreamChannel *channel, StreamMsgStartStop *start)
 {
@@ -153,6 +168,7 @@ stream_channel_client_on_disconnect(RedChannelClient *rcc)
     channel->stream_id = -1;
     channel->width = 0;
     channel->height = 0;
+    stream_channel_unref_data_item(channel);
 
     // send stream stop to device
     StreamMsgStartStop stop = { 0, };
@@ -452,6 +468,16 @@ stream_channel_constructed(GObject *object)
     reds_register_channel(reds, red_channel);
 }
 
+static void
+stream_channel_finalize(GObject *object)
+{
+    StreamChannel *channel = STREAM_CHANNEL(object);
+
+    stream_channel_unref_data_item(channel);
+
+    G_OBJECT_CLASS(stream_channel_parent_class)->finalize(object);
+}
+
 static void
 stream_channel_class_init(StreamChannelClass *klass)
 {
@@ -459,6 +485,7 @@ stream_channel_class_init(StreamChannelClass *klass)
     RedChannelClass *channel_class = RED_CHANNEL_CLASS(klass);
 
     object_class->constructed = stream_channel_constructed;
+    object_class->finalize = stream_channel_finalize;
 
     channel_class->parser = 
spice_get_client_channel_parser(SPICE_CHANNEL_DISPLAY, NULL);
     channel_class->handle_message = handle_message;
@@ -532,14 +559,18 @@ data_item_free(RedPipeItem *base)
 {
     StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base);
 
-    stream_channel_update_queue_stat(pipe_item->channel, -1, 
-pipe_item->data.data_size);
+    if (pipe_item->channel->data_item != pipe_item) {
+        stream_channel_update_queue_stat(pipe_item->channel, -1, 
-pipe_item->data.data_size);
+    }
 
     g_free(pipe_item);
 }
 
-static StreamDataItem*
-stream_data_item_new(StreamChannel *channel, size_t size, uint32_t mm_time)
+static void
+stream_channel_init_data_item(StreamChannel *channel, size_t size, uint32_t 
mm_time)
 {
+    stream_channel_unref_data_item(channel);
+
     StreamDataItem *item = g_malloc(sizeof(*item) + size);
     red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA,
                             data_item_free);
@@ -548,7 +579,22 @@ stream_data_item_new(StreamChannel *channel, size_t size, 
uint32_t mm_time)
     item->data.data_size = size;
     item->channel = channel;
 
-    return item;
+    channel->data_item = item;
+    channel->data_item_pos = 0;
+}
+
+void
+stream_channel_start_data(StreamChannel *channel, size_t size, uint32_t 
mm_time)
+{
+    // see stream_channel_send_data comment
+    if (channel->stream_id < 0) {
+        return;
+    }
+
+    // TODO this collects all chunks in a single message
+    // up: we send a single frame together (more compatible)
+    // down: guest can cause a crash due to DoS. As a safe measure we limit 
the maximum message
+    stream_channel_init_data_item(channel, MIN(size, 32*1024*1024), mm_time);
 }
 
 void
@@ -563,11 +609,25 @@ stream_channel_send_data(StreamChannel *channel, const 
void *data, size_t size,
 
     RedChannel *red_channel = RED_CHANNEL(channel);
 
-    StreamDataItem *item = stream_data_item_new(channel, size, mm_time);
-    stream_channel_update_queue_stat(channel, 1, size);
-    // TODO try to optimize avoiding the copy
-    memcpy(item->data.data, data, size);
-    red_channel_pipes_add(red_channel, &item->base);
+    while (size) {
+        if (channel->data_item == NULL) {
+            stream_channel_init_data_item(channel, size, mm_time);
+        }
+
+        StreamDataItem *item = channel->data_item;
+
+        size_t copy_size = item->data.data_size - channel->data_item_pos;
+        copy_size = MIN(copy_size, size);
+        // TODO try to optimize avoiding the copy
+        memcpy(item->data.data + channel->data_item_pos, data, copy_size);
+        size -= copy_size;
+        channel->data_item_pos += copy_size;
+        if (channel->data_item_pos == item->data.data_size) {
+            channel->data_item = NULL;
+            stream_channel_update_queue_stat(channel, 1, item->data.data_size);
+            red_channel_pipes_add(red_channel, &item->base);
+        }
+    }
 }
 
 void
@@ -607,6 +667,7 @@ stream_channel_reset(StreamChannel *channel)
     channel->stream_id = -1;
     channel->width = 0;
     channel->height = 0;
+    stream_channel_unref_data_item(channel);
 
     if (!red_channel_is_connected(red_channel)) {
         return;
diff --git a/server/stream-channel.h b/server/stream-channel.h
index e8bec80b..18a1bdea 100644
--- a/server/stream-channel.h
+++ b/server/stream-channel.h
@@ -60,6 +60,18 @@ struct StreamMsgStartStop;
 
 void stream_channel_change_format(StreamChannel *channel,
                                   const struct StreamMsgFormat *fmt);
+
+/**
+ * Tell the channel that a new data packet is starting.
+ * This can be used to group all chunks together.
+ */
+void stream_channel_start_data(StreamChannel *channel,
+                               size_t size,
+                               uint32_t mm_time);
+
+/**
+ * Send to channel a chunk of data.
+ */
 void stream_channel_send_data(StreamChannel *channel,
                               const void *data, size_t size,
                               uint32_t mm_time);
-- 
2.17.0

_______________________________________________
Spice-devel mailing list
Spice-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to