Ack.

On 01/13/2011 06:01 AM, Alon Levy wrote:
---
  server/main_channel.c |  778 ++++++++++++++++++++++++++++---------------------
  1 files changed, 441 insertions(+), 337 deletions(-)

diff --git a/server/main_channel.c b/server/main_channel.c
index e42f173..f1fb4c6 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -38,6 +38,7 @@
  #include "common/messages.h"
  #include "reds.h"
  #include "main_channel.h"
+#include "red_channel.h"
  #include "generated_marshallers.h"

  #define ZERO_BUF_SIZE 4096
@@ -55,34 +56,67 @@ static uint8_t zero_page[ZERO_BUF_SIZE] = {0};

  typedef struct RedsOutItem RedsOutItem;
  struct RedsOutItem {
-    RingItem link;
-    SpiceMarshaller *m;
-    SpiceDataHeader *header;
+    PipeItem base;
  };

-typedef struct RedsOutgoingData {
-    Ring pipe;
-    RedsOutItem *item;
-    int vec_size;
-    struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
-    struct iovec *vec;
-} RedsOutgoingData;
-
-// TODO - remove and use red_channel.h
-typedef struct IncomingHandler {
-    spice_parse_channel_func_t parser;
+typedef struct PingPipeItem {
+    PipeItem base;
+    int size;
+} PingPipeItem;
+
+typedef struct MouseModePipeItem {
+    PipeItem base;
+    int current_mode;
+    int is_client_mouse_allowed;
+} MouseModePipeItem;
+
+typedef struct TokensPipeItem {
+    PipeItem base;
+    int tokens;
+} TokensPipeItem;
+
+typedef struct AgentDataPipeItem {
+    PipeItem base;
+    uint8_t* data;
+    size_t len;
+    spice_marshaller_item_free_func free_data;
      void *opaque;
-    int shut;
-    uint8_t buf[RECEIVE_BUF_SIZE];
-    uint32_t end_pos;
-    void (*handle_message)(void *opaque, size_t size, uint32_t type, void 
*message);
-} IncomingHandler;
+} AgentDataPipeItem;
+
+typedef struct InitPipeItem {
+    PipeItem base;
+    int connection_id;
+    int display_channels_hint;
+    int current_mouse_mode;
+    int is_client_mouse_allowed;
+    int multi_media_time;
+    int ram_hint;
+} InitPipeItem;
+
+typedef struct NotifyPipeItem {
+    PipeItem base;
+    uint8_t *mess;
+    int mess_len;
+} NotifyPipeItem;
+
+typedef struct MigrateBeginPipeItem {
+    PipeItem base;
+    int port;
+    int sport;
+    char *host;
+    uint16_t cert_pub_key_type;
+    uint32_t cert_pub_key_len;
+    uint8_t *cert_pub_key;
+} MigrateBeginPipeItem;
+
+typedef struct MultiMediaTimePipeItem {
+    PipeItem base;
+    int time;
+} MultiMediaTimePipeItem;

  typedef struct MainChannel {
-    RedsStreamContext *peer;
-    IncomingHandler in_handler;
-    RedsOutgoingData outgoing;
-    uint64_t serial; //migrate me
+    RedChannel base;
+    uint8_t recv_buf[RECEIVE_BUF_SIZE];
      uint32_t ping_id;
      uint32_t net_test_id;
      int net_test_stage;
@@ -98,45 +132,13 @@ enum NetTestStage {
  static uint64_t latency = 0;
  uint64_t bitrate_per_sec = ~0;

-static void main_channel_out_item_free(RedsOutItem *item);
-
-static void main_reset_outgoing(MainChannel *main_chan)
-{
-    RedsOutgoingData *outgoing =&main_chan->outgoing;
-    RingItem *ring_item;
-
-    if (outgoing->item) {
-        main_channel_out_item_free(outgoing->item);
-        outgoing->item = NULL;
-    }
-    while ((ring_item = ring_get_tail(&outgoing->pipe))) {
-        RedsOutItem *out_item = (RedsOutItem *)ring_item;
-        ring_remove(ring_item);
-        main_channel_out_item_free(out_item);
-    }
-    outgoing->vec_size = 0;
-    outgoing->vec = outgoing->vec_buf;
-}
-
-// ALON from reds_disconnect
  static void main_disconnect(MainChannel *main_chan)
  {
-    if (!main_chan || !main_chan->peer) {
-        return;
-    }
-    main_reset_outgoing(main_chan);
-    core->watch_remove(main_chan->peer->watch);
-    main_chan->peer->watch = NULL;
-    main_chan->peer->cb_free(main_chan->peer);
-    main_chan->peer = NULL;
-    main_chan->in_handler.shut = TRUE;
-    main_chan->serial = 0;
      main_chan->ping_id = 0;
      main_chan->net_test_id = 0;
      main_chan->net_test_stage = NET_TEST_STAGE_INVALID;
-    main_chan->in_handler.end_pos = 0;
+    red_channel_destroy(&main_chan->base);

-    // TODO: Should probably reset these on the ping start, not here
      latency = 0;
      bitrate_per_sec = ~0;
  }
@@ -149,233 +151,199 @@ void main_channel_start_net_test(Channel *channel)
          return;
      }

-    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)&&
-                            main_channel_push_ping(channel, 0)&&
-                            main_channel_push_ping(channel, NET_TEST_BYTES)) {
+    if (main_channel_push_ping(channel, NET_TEST_WARMUP_BYTES)
+&&  main_channel_push_ping(channel, 0)
+&&  main_channel_push_ping(channel, NET_TEST_BYTES)) {
          main_chan->net_test_id = main_chan->ping_id - 2;
          main_chan->net_test_stage = NET_TEST_STAGE_WARMUP;
      }
  }

-static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static RedsOutItem *main_pipe_item_new(MainChannel *main_chan, int type)
  {
-    for (;;) {
-        uint8_t *buf = handler->buf;
-        uint32_t pos = handler->end_pos;
-        uint8_t *end = buf + pos;
-        SpiceDataHeader *header;
-        int n;
-        n = peer->cb_read(peer->ctx, buf + pos, RECEIVE_BUF_SIZE - pos);
-        if (n<= 0) {
-            if (n == 0) {
-                return -1;
-            }
-            switch (errno) {
-            case EAGAIN:
-                return 0;
-            case EINTR:
-                break;
-            case EPIPE:
-                return -1;
-            default:
-                red_printf("%s", strerror(errno));
-                return -1;
-            }
-        } else {
-            pos += n;
-            end = buf + pos;
-            while (buf + sizeof(SpiceDataHeader)<= end&&
-                   buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader 
*)buf)->size<= end) {
-                uint8_t *data = (uint8_t *)(header+1);
-                size_t parsed_size;
-                uint8_t *parsed;
-                message_destructor_t parsed_free;
-
-                buf += sizeof(SpiceDataHeader) + header->size;
-                parsed = handler->parser(data, data + header->size, 
header->type,
-                                         
SPICE_VERSION_MINOR,&parsed_size,&parsed_free);
-                if (parsed == NULL) {
-                    red_printf("failed to parse message type %d", 
header->type);
-                    return -1;
-                }
-                handler->handle_message(handler->opaque, parsed_size, 
header->type, parsed);
-                parsed_free(parsed);
-                if (handler->shut) {
-                    return -1;
-                }
-            }
-            memmove(handler->buf, buf, (handler->end_pos = end - buf));
-        }
-    }
+    RedsOutItem *item = spice_malloc(sizeof(RedsOutItem));
+
+    red_channel_pipe_item_init(&main_chan->base,&item->base, type);
+    return item;
  }

-static RedsOutItem *new_out_item(MainChannel *main_chan, uint32_t type)
+static MouseModePipeItem *main_mouse_mode_item_new(MainChannel *main_chan,
+    int current_mode, int is_client_mouse_allowed)
  {
-    RedsOutItem *item;
-
-    item = spice_new(RedsOutItem, 1);
-    ring_item_init(&item->link);
+    MouseModePipeItem *item = spice_malloc(sizeof(MouseModePipeItem));

-    item->m = spice_marshaller_new();
-    item->header = (SpiceDataHeader *)
-        spice_marshaller_reserve_space(item->m, sizeof(SpiceDataHeader));
-    spice_marshaller_set_base(item->m, sizeof(SpiceDataHeader));
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_MAIN_MOUSE_MODE);
+    item->current_mode = current_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    return item;
+}

-    item->header->serial = ++main_chan->serial;
-    item->header->type = type;
-    item->header->sub_list = 0;
+static PingPipeItem *main_ping_item_new(MainChannel *channel, int size)
+{
+    PingPipeItem *item = spice_malloc(sizeof(PingPipeItem));

+    red_channel_pipe_item_init(&channel->base,&item->base, SPICE_MSG_PING);
+    item->size = size;
      return item;
  }

-static void main_channel_out_item_free(RedsOutItem *item)
+static TokensPipeItem *main_tokens_item_new(MainChannel *main_chan, int tokens)
  {
-    spice_marshaller_destroy(item->m);
-    free(item);
+    TokensPipeItem *item = spice_malloc(sizeof(TokensPipeItem));
+
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_MAIN_AGENT_TOKEN);
+    item->tokens = tokens;
+    return item;
  }

-static struct iovec *main_channel_iovec_skip(struct iovec vec[], int skip, int 
*vec_size)
+static AgentDataPipeItem *main_agent_data_item_new(MainChannel *channel,
+           uint8_t* data, size_t len,
+           spice_marshaller_item_free_func free_data, void *opaque)
  {
-    struct iovec *now = vec;
+    AgentDataPipeItem *item = spice_malloc(sizeof(AgentDataPipeItem));

-    while (skip&&  skip>= now->iov_len) {
-        skip -= now->iov_len;
-        --*vec_size;
-        now++;
-    }
-    now->iov_base = (uint8_t *)now->iov_base + skip;
-    now->iov_len -= skip;
-    return now;
+    red_channel_pipe_item_init(&channel->base,&item->base, 
SPICE_MSG_MAIN_AGENT_DATA);
+    item->data = data;
+    item->len = len;
+    item->free_data = free_data;
+    item->opaque = opaque;
+    return item;
  }

-static int main_channel_send_data(MainChannel *main_chan)
+static InitPipeItem *main_init_item_new(MainChannel *main_chan,
+    int connection_id, int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
+    int ram_hint)
  {
-    RedsOutgoingData *outgoing =&main_chan->outgoing;
-    int n;
-
-    if (!outgoing->item) {
-        return TRUE;
-    }
+    InitPipeItem *item = spice_malloc(sizeof(InitPipeItem));

-    ASSERT(outgoing->vec_size);
-    for (;;) {
-        if ((n = main_chan->peer->cb_writev(main_chan->peer->ctx, outgoing->vec, 
outgoing->vec_size)) == -1) {
-            switch (errno) {
-            case EAGAIN:
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ | 
SPICE_WATCH_EVENT_WRITE);
-                return FALSE;
-            case EINTR:
-                break;
-            case EPIPE:
-                reds_disconnect();
-                return FALSE;
-            default:
-                red_printf("%s", strerror(errno));
-                reds_disconnect();
-                return FALSE;
-            }
-        } else {
-            outgoing->vec = main_channel_iovec_skip(outgoing->vec, 
n,&outgoing->vec_size);
-            if (!outgoing->vec_size) {
-                main_channel_out_item_free(outgoing->item);
-                outgoing->item = NULL;
-                outgoing->vec = outgoing->vec_buf;
-                return TRUE;
-            }
-        }
-    }
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_MAIN_INIT);
+    item->connection_id = connection_id;
+    item->display_channels_hint = display_channels_hint;
+    item->current_mouse_mode = current_mouse_mode;
+    item->is_client_mouse_allowed = is_client_mouse_allowed;
+    item->multi_media_time = multi_media_time;
+    item->ram_hint = ram_hint;
+    return item;
  }

-static void main_channel_push(MainChannel *main_chan)
+static NotifyPipeItem *main_notify_item_new(MainChannel *main_chan,
+                                        uint8_t *mess, const int mess_len)
  {
-    RedsOutgoingData *outgoing =&main_chan->outgoing;
-    RingItem *ring_item;
-    RedsOutItem *item;
+    NotifyPipeItem *item = spice_malloc(sizeof(NotifyPipeItem));

-    for (;;) {
-        if (!main_chan->peer || outgoing->item || !(ring_item = 
ring_get_tail(&outgoing->pipe))) {
-            return;
-        }
-        ring_remove(ring_item);
-        outgoing->item = item = (RedsOutItem *)ring_item;
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_NOTIFY);
+    item->mess = mess;
+    item->mess_len = mess_len;
+    return item;
+}

-        spice_marshaller_flush(item->m);
-        item->header->size = spice_marshaller_get_total_size(item->m) - 
sizeof(SpiceDataHeader);
+static MigrateBeginPipeItem *main_migrate_begin_item_new(
+    MainChannel *main_chan, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
+{
+    MigrateBeginPipeItem *item = spice_malloc(sizeof(MigrateBeginPipeItem));

-        outgoing->vec_size = spice_marshaller_fill_iovec(item->m,
-                                                         outgoing->vec_buf,
-                                                         REDS_MAX_SEND_IOVEC, 
0);
-        main_channel_send_data(main_chan);
-    }
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    item->port = port;
+    item->sport = sport;
+    item->host = host;
+    item->cert_pub_key_type = cert_pub_key_type;
+    item->cert_pub_key_len = cert_pub_key_len;
+    item->cert_pub_key = cert_pub_key;
+    return item;
  }

-static void main_channel_push_pipe_item(MainChannel *main_chan, RedsOutItem 
*item)
+static MultiMediaTimePipeItem *main_multi_media_time_item_new(
+    MainChannel *main_chan, int time)
  {
-    ring_add(&main_chan->outgoing.pipe,&item->link);
-    main_channel_push(main_chan);
+    MultiMediaTimePipeItem *item;
+
+    item = spice_malloc(sizeof(MultiMediaTimePipeItem));
+    red_channel_pipe_item_init(&main_chan->base,&item->base,
+                               SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
+    item->time = time;
+    return item;
  }

  static void main_channel_push_channels(MainChannel *main_chan)
  {
-    SpiceMsgChannels* channels_info;
      RedsOutItem *item;

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
-    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels) 
+ reds_num_of_channels() * sizeof(SpiceChannelId));
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_CHANNELS_LIST);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_channels(MainChannel *main_chan)
+{
+    SpiceMsgChannels* channels_info;
+
+    channels_info = (SpiceMsgChannels *)spice_malloc(sizeof(SpiceMsgChannels)
+                            + reds_num_of_channels() * sizeof(SpiceChannelId));
      reds_fill_channels(channels_info);
-    spice_marshall_msg_main_channels_list(item->m, channels_info);
+    spice_marshall_msg_main_channels_list(
+        main_chan->base.send_data.marshaller, channels_info);
      free(channels_info);
-    main_channel_push_pipe_item(main_chan, item);
  }

  int main_channel_push_ping(Channel *channel, int size)
  {
-    struct timespec time_space;
-    RedsOutItem *item;
-    SpiceMsgPing ping;
      MainChannel *main_chan = channel->data;
-
-    if (!main_chan) {
+    PingPipeItem *item;
+
+    if (main_chan == NULL) {
          return FALSE;
      }
-    item = new_out_item(main_chan, SPICE_MSG_PING);
+    item = main_ping_item_new(main_chan, size);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+    return TRUE;
+}
+
+static void main_channel_marshall_ping(MainChannel *main_chan, int size)
+{
+    struct timespec time_space;
+    SpiceMsgPing ping;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
      ping.id = ++main_chan->ping_id;
      clock_gettime(CLOCK_MONOTONIC,&time_space);
      ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 
1000LL;
-    spice_marshall_msg_ping(item->m,&ping);
+    spice_marshall_msg_ping(m,&ping);

      while (size>  0) {
          int now = MIN(ZERO_BUF_SIZE, size);
          size -= now;
-        spice_marshaller_add_ref(item->m, zero_page, now);
+        spice_marshaller_add_ref(m, zero_page, now);
      }
+}

-    main_channel_push_pipe_item(main_chan, item);
+void main_channel_push_mouse_mode(Channel *channel, int current_mode,
+                                  int is_client_mouse_allowed)
+{
+    MainChannel *main_chan = channel->data;
+    MouseModePipeItem *item;

-    return TRUE;
+    item = main_mouse_mode_item_new(main_chan, current_mode,
+                                    is_client_mouse_allowed);
+    red_channel_pipe_add(&main_chan->base,&item->base);
  }

-void main_channel_push_mouse_mode(Channel *channel, int current_mode, int 
is_client_mouse_allowed)
+static void main_channel_marshall_mouse_mode(MainChannel *main_chan, int 
current_mode, int is_client_mouse_allowed)
  {
      SpiceMsgMainMouseMode mouse_mode;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MOUSE_MODE);
      mouse_mode.supported_modes = SPICE_MOUSE_MODE_SERVER;
      if (is_client_mouse_allowed) {
          mouse_mode.supported_modes |= SPICE_MOUSE_MODE_CLIENT;
      }
      mouse_mode.current_mode = current_mode;
-
-    spice_marshall_msg_main_mouse_mode(item->m,&mouse_mode);
-
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_mouse_mode(main_chan->base.send_data.marshaller,
+&mouse_mode);
  }

  void main_channel_push_agent_connected(Channel *channel)
@@ -383,186 +351,319 @@ void main_channel_push_agent_connected(Channel *channel)
      RedsOutItem *item;
      MainChannel *main_chan = channel->data;

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
-    main_channel_push_pipe_item(main_chan, item);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_CONNECTED);
+    red_channel_pipe_add(&main_chan->base,&item->base);
  }

  void main_channel_push_agent_disconnected(Channel *channel)
  {
-    SpiceMsgMainAgentDisconnect disconnect;
      RedsOutItem *item;
      MainChannel *main_chan = channel->data;

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    item = main_pipe_item_new(main_chan, SPICE_MSG_MAIN_AGENT_DISCONNECTED);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_agent_disconnected(MainChannel *main_chan)
+{
+    SpiceMsgMainAgentDisconnect disconnect;
+
      disconnect.error_code = SPICE_LINK_ERR_OK;
-    spice_marshall_msg_main_agent_disconnected(item->m,&disconnect);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_disconnected(
+        main_chan->base.send_data.marshaller,&disconnect);
  }

  void main_channel_push_tokens(Channel *channel, uint32_t num_tokens)
  {
-    SpiceMsgMainAgentTokens tokens;
-    RedsOutItem *item;
      MainChannel *main_chan = channel->data;
+    TokensPipeItem *item = main_tokens_item_new(main_chan, num_tokens);
+
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_tokens(MainChannel *main_chan, uint32_t 
num_tokens)
+{
+    SpiceMsgMainAgentTokens tokens;

-    if (!main_chan) {
-        return;
-    }
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_TOKEN);
      tokens.num_tokens = num_tokens;
-    spice_marshall_msg_main_agent_token(item->m,&tokens);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_agent_token(
+        main_chan->base.send_data.marshaller,&tokens);
  }

  void main_channel_push_agent_data(Channel *channel, uint8_t* data, size_t len,
             spice_marshaller_item_free_func free_data, void *opaque)
  {
-    RedsOutItem *item;
      MainChannel *main_chan = channel->data;
+    AgentDataPipeItem *item;
+
+    item = main_agent_data_item_new(main_chan, data, len, free_data, opaque);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_AGENT_DATA);
-    spice_marshaller_add_ref_full(item->m, data, len, free_data, opaque);
-    main_channel_push_pipe_item(main_chan, item);
+static void main_channel_marshall_agent_data(MainChannel *main_chan,
+                                  AgentDataPipeItem *item)
+{
+    spice_marshaller_add_ref_full(main_chan->base.send_data.marshaller,
+        item->data, item->len, item->free_data, item->opaque);
  }

  static void main_channel_push_migrate_data_item(MainChannel *main_chan)
  {
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MIGRATE_DATA);
-    SpiceMarshaller *m = item->m;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE_DATA);
+
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_migrate_data_item(MainChannel *main_chan)
+{
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
      MainMigrateData *data = (MainMigrateData 
*)spice_marshaller_reserve_space(m, sizeof(MainMigrateData));

      reds_marshall_migrate_data_item(m, data); // TODO: from reds split. ugly 
separation.
-    data->serial = main_chan->serial;
+    data->serial = red_channel_get_message_serial(&main_chan->base);
      data->ping_id = main_chan->ping_id;
-    main_channel_push_pipe_item(main_chan, item);
  }

-static void main_channel_receive_migrate_data(MainChannel *main_chan, 
MainMigrateData *data, uint8_t *end)
+static void main_channel_receive_migrate_data(MainChannel *main_chan,
+                                  MainMigrateData *data, uint8_t *end)
  {
-    main_chan->serial = data->serial;
+    red_channel_set_message_serial(&main_chan->base, data->serial);
      main_chan->ping_id = data->ping_id;
  }

-void main_channel_push_init(Channel *channel, int connection_id, int 
display_channels_hint,
-    int current_mouse_mode, int is_client_mouse_allowed, int multi_media_time,
+void main_channel_push_init(Channel *channel, int connection_id,
+    int display_channels_hint, int current_mouse_mode,
+    int is_client_mouse_allowed, int multi_media_time,
      int ram_hint)
  {
-    RedsOutItem *item;
-    SpiceMsgMainInit init;
+    InitPipeItem *item;
      MainChannel *main_chan = channel->data;

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_INIT);
-    init.session_id = connection_id;
-    init.display_channels_hint = display_channels_hint;
-    init.current_mouse_mode = current_mouse_mode;
+    item = main_init_item_new(main_chan,
+             connection_id, display_channels_hint, current_mouse_mode,
+             is_client_mouse_allowed, multi_media_time, ram_hint);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_init(MainChannel *main_chan,
+                                       InitPipeItem *item)
+{
+    SpiceMsgMainInit init;
+
+    init.session_id = item->connection_id;
+    init.display_channels_hint = item->display_channels_hint;
+    init.current_mouse_mode = item->current_mouse_mode;
      init.supported_mouse_modes = SPICE_MOUSE_MODE_SERVER;
-    if (is_client_mouse_allowed) {
+    if (item->is_client_mouse_allowed) {
          init.supported_mouse_modes |= SPICE_MOUSE_MODE_CLIENT;
      }
      init.agent_connected = reds_has_vdagent();
      init.agent_tokens = REDS_AGENT_WINDOW_SIZE;
-    init.multi_media_time = multi_media_time;
-    init.ram_hint = ram_hint;
-    spice_marshall_msg_main_init(item->m,&init);
-    main_channel_push_pipe_item(main_chan, item);
+    init.multi_media_time = item->multi_media_time;
+    init.ram_hint = item->ram_hint;
+    spice_marshall_msg_main_init(main_chan->base.send_data.marshaller,&init);
  }

  void main_channel_push_notify(Channel *channel, uint8_t *mess, const int 
mess_len)
  {
-    // TODO possible free-then-use bug - caller frees mess after this, but is 
that pointer being
-    // used by spice_marshaller?
-    RedsOutItem *item;
-    SpiceMsgNotify notify;
      MainChannel *main_chan = channel->data;
+    NotifyPipeItem *item = main_notify_item_new(main_chan, mess, mess_len);
+
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}

-    item = new_out_item(main_chan, SPICE_MSG_NOTIFY);
-    notify.time_stamp = get_time_stamp();
+static void main_channel_marshall_notify(MainChannel *main_chan, 
NotifyPipeItem *item)
+{
+    SpiceMsgNotify notify;
+    SpiceMarshaller *m = main_chan->base.send_data.marshaller;
+
+    notify.time_stamp = get_time_stamp(); // TODO - move to 
main_new_notify_item
      notify.severity = SPICE_NOTIFY_SEVERITY_WARN;
      notify.visibilty = SPICE_NOTIFY_VISIBILITY_HIGH;
      notify.what = SPICE_WARN_GENERAL;
-    notify.message_len = mess_len;
-    spice_marshall_msg_notify(item->m,&notify);
-    spice_marshaller_add(item->m, mess, mess_len + 1);
-    main_channel_push_pipe_item(main_chan, item);
+    notify.message_len = item->mess_len;
+    spice_marshall_msg_notify(m,&notify);
+    spice_marshaller_add(m, item->mess, item->mess_len + 1);
  }

-void main_channel_push_migrate_begin(Channel *channel, int port, int sport, 
char *host,
-    uint16_t cert_pub_key_type, uint32_t cert_pub_key_len, uint8_t 
*cert_pub_key)
+void main_channel_push_migrate_begin(Channel *channel, int port, int sport,
+    char *host, uint16_t cert_pub_key_type, uint32_t cert_pub_key_len,
+    uint8_t *cert_pub_key)
  {
      MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_BEGIN);
+    MigrateBeginPipeItem *item = main_migrate_begin_item_new(main_chan, port,
+        sport, host, cert_pub_key_type, cert_pub_key_len, cert_pub_key);
+
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_migrate_begin(MainChannel *main_chan,
+    MigrateBeginPipeItem *item)
+{
      SpiceMsgMainMigrationBegin migrate;

-    migrate.port = port;
-    migrate.sport = sport;
-    migrate.host_size = strlen(host) + 1;
-    migrate.host_data = (uint8_t *)host;
-    migrate.pub_key_type = cert_pub_key_type;
-    migrate.pub_key_size = cert_pub_key_len;
-    migrate.pub_key_data = cert_pub_key;
-    spice_marshall_msg_main_migrate_begin(item->m,&migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    migrate.port = item->port;
+    migrate.sport = item->sport;
+    migrate.host_size = strlen(item->host) + 1;
+    migrate.host_data = (uint8_t *)item->host;
+    migrate.pub_key_type = item->cert_pub_key_type;
+    migrate.pub_key_size = item->cert_pub_key_len;
+    migrate.pub_key_data = item->cert_pub_key;
+    spice_marshall_msg_main_migrate_begin(main_chan->base.send_data.marshaller,
+&migrate);
  }

  void main_channel_push_migrate(Channel *channel)
  {
-    RedsOutItem *item;
-    SpiceMsgMigrate migrate;
      MainChannel *main_chan = channel->data;
+    RedsOutItem *item = main_pipe_item_new(main_chan, SPICE_MSG_MIGRATE);
+
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static void main_channel_marshall_migrate(MainChannel *main_chan)
+{
+    SpiceMsgMigrate migrate;

-    item = new_out_item(main_chan, SPICE_MSG_MIGRATE);
      migrate.flags = SPICE_MIGRATE_NEED_FLUSH | 
SPICE_MIGRATE_NEED_DATA_TRANSFER;
-    spice_marshall_msg_migrate(item->m,&migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_migrate(main_chan->base.send_data.marshaller,&migrate);
  }

  void main_channel_push_migrate_cancel(Channel *channel)
  {
      MainChannel *main_chan = channel->data;
-    RedsOutItem *item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_CANCEL);
+    RedsOutItem *item = main_pipe_item_new(main_chan,
+                                           SPICE_MSG_MAIN_MIGRATE_CANCEL);

-    main_channel_push_pipe_item(main_chan, item);
+    red_channel_pipe_add(&main_chan->base,&item->base);
  }

  void main_channel_push_multi_media_time(Channel *channel, int time)
  {
-    SpiceMsgMainMultiMediaTime time_mes;
-    RedsOutItem *item;
      MainChannel *main_chan = channel->data;

-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MULTI_MEDIA_TIME);
-    time_mes.time = time;
-    spice_marshall_msg_main_multi_media_time(item->m,&time_mes);
-    main_channel_push_pipe_item(main_chan, item);
+    MultiMediaTimePipeItem *item =
+        main_multi_media_time_item_new(main_chan, time);
+    red_channel_pipe_add(&main_chan->base,&item->base);
+}
+
+static PipeItem *main_migrate_switch_item_new(MainChannel *main_chan)
+{
+    PipeItem *item = spice_malloc(sizeof(*item));
+
+    red_channel_pipe_item_init(&main_chan->base, item,
+                               SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+    return item;
  }

  void main_channel_push_migrate_switch(Channel *channel)
  {
+    MainChannel *main_chan = channel->data;
+
+    red_channel_pipe_add(&main_chan->base,
+        main_migrate_switch_item_new(main_chan));
+}
+
+static void main_channel_marshall_migrate_switch(MainChannel *main_chan)
+{
      SpiceMsgMainMigrationSwitchHost migrate;
-    RedsOutItem *item;
-    MainChannel *main_chan;
-
-    if (!channel) {
-        return;
-    }
-    main_chan = channel->data;
+
      red_printf("");
-    item = new_out_item(main_chan, SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST);
+
      reds_fill_mig_switch(&migrate);
-    spice_marshall_msg_main_migrate_switch_host(item->m,&migrate);
-    main_channel_push_pipe_item(main_chan, item);
+    spice_marshall_msg_main_migrate_switch_host(
+        main_chan->base.send_data.marshaller,&migrate);
+
      reds_mig_release();
  }

-static void main_channel_handle_message(void *opaque, size_t size, uint32_t 
type, void *message)
+static void main_channel_marshall_multi_media_time(MainChannel *main_chan,
+    MultiMediaTimePipeItem *item)
+{
+    SpiceMsgMainMultiMediaTime time_mes;
+
+    time_mes.time = item->time;
+    spice_marshall_msg_main_multi_media_time(
+        main_chan->base.send_data.marshaller,&time_mes);
+}
+
+static void main_channel_send_item(RedChannel *channel, PipeItem *base)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    red_channel_reset_send_data(channel);
+    red_channel_init_send_data(channel, base->type, base);
+    switch (base->type) {
+        case SPICE_MSG_MAIN_CHANNELS_LIST:
+            main_channel_marshall_channels(main_chan);
+            break;
+        case SPICE_MSG_PING:
+            main_channel_marshall_ping(main_chan,
+                SPICE_CONTAINEROF(base, PingPipeItem, base)->size);
+            break;
+        case SPICE_MSG_MAIN_MOUSE_MODE:
+            {
+                MouseModePipeItem *item =
+                    SPICE_CONTAINEROF(base, MouseModePipeItem, base);
+                main_channel_marshall_mouse_mode(main_chan,
+                    item->current_mode, item->is_client_mouse_allowed);
+                break;
+            }
+        case SPICE_MSG_MAIN_AGENT_DISCONNECTED:
+            main_channel_marshall_agent_disconnected(main_chan);
+            break;
+        case SPICE_MSG_MAIN_AGENT_TOKEN:
+            main_channel_marshall_tokens(main_chan,
+                SPICE_CONTAINEROF(base, TokensPipeItem, base)->tokens);
+            break;
+        case SPICE_MSG_MAIN_AGENT_DATA:
+            main_channel_marshall_agent_data(main_chan,
+                SPICE_CONTAINEROF(base, AgentDataPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE_DATA:
+            main_channel_marshall_migrate_data_item(main_chan);
+            break;
+        case SPICE_MSG_MAIN_INIT:
+            main_channel_marshall_init(main_chan,
+                SPICE_CONTAINEROF(base, InitPipeItem, base));
+            break;
+        case SPICE_MSG_NOTIFY:
+            main_channel_marshall_notify(main_chan,
+                SPICE_CONTAINEROF(base, NotifyPipeItem, base));
+            break;
+        case SPICE_MSG_MIGRATE:
+            main_channel_marshall_migrate(main_chan);
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_BEGIN:
+            main_channel_marshall_migrate_begin(main_chan,
+                SPICE_CONTAINEROF(base, MigrateBeginPipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MULTI_MEDIA_TIME:
+            main_channel_marshall_multi_media_time(main_chan,
+                SPICE_CONTAINEROF(base, MultiMediaTimePipeItem, base));
+            break;
+        case SPICE_MSG_MAIN_MIGRATE_SWITCH_HOST:
+            main_channel_marshall_migrate_switch(main_chan);
+            break;
+    };
+    red_channel_begin_send_message(channel);
+}
+
+static void main_channel_release_pipe_item(RedChannel *channel,
+    PipeItem *base, int item_pushed)
+{
+    free(base);
+}
+
+static int main_channel_handle_parsed(RedChannel *channel, size_t size, 
uint32_t type, void *message)
  {
-    MainChannel *main_chan = opaque;
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);

      switch (type) {
      case SPICE_MSGC_MAIN_AGENT_START:
          red_printf("agent start");
          if (!main_chan) {
-            return;
+            return FALSE;
          }
          reds_on_main_agent_start(main_chan);
          break;
@@ -650,28 +751,29 @@ static void main_channel_handle_message(void *opaque, 
size_t size, uint32_t type
      default:
          red_printf("unexpected type %d", type);
      }
+    return TRUE;
  }

-static void main_channel_event(int fd, int event, void *data)
+static void main_channel_on_error(RedChannel *channel)
  {
-    MainChannel *main_chan = data;
+    reds_disconnect();
+}

-    if (event&  SPICE_WATCH_EVENT_READ) {
-        if (handle_incoming(main_chan->peer,&main_chan->in_handler)) {
-            main_disconnect(main_chan);
-            reds_disconnect();
-        }
-    }
-    if (event&  SPICE_WATCH_EVENT_WRITE) {
-        RedsOutgoingData *outgoing =&main_chan->outgoing;
-        if (main_channel_send_data(main_chan)) {
-            main_channel_push(main_chan);
-            if (!outgoing->item&&  main_chan->peer) {
-                core->watch_update_mask(main_chan->peer->watch,
-                                        SPICE_WATCH_EVENT_READ);
-            }
-        }
-    }
+static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannel *channel, 
SpiceDataHeader *msg_header)
+{
+    MainChannel *main_chan = SPICE_CONTAINEROF(channel, MainChannel, base);
+
+    return main_chan->recv_buf;
+}
+
+static void main_channel_release_msg_rcv_buf(RedChannel *channel, 
SpiceDataHeader *msg_header,
+                                               uint8_t *msg)
+{
+}
+
+static int main_channel_config_socket(RedChannel *channel)
+{
+    return TRUE;
  }

  static void main_channel_link(Channel *channel, RedsStreamContext *peer, int 
migration,
@@ -679,41 +781,44 @@ static void main_channel_link(Channel *channel, 
RedsStreamContext *peer, int mig
                          uint32_t *caps)
  {
      MainChannel *main_chan;
-
-    main_chan = spice_malloc0(sizeof(MainChannel));
+    red_printf("");
+    ASSERT(channel->data == NULL);
+
+    main_chan = (MainChannel*)red_channel_create_parser(
+        sizeof(*main_chan), peer, core, migration, FALSE /* handle_acks */
+        ,main_channel_config_socket
+        ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
+        ,main_channel_handle_parsed
+        ,main_channel_alloc_msg_rcv_buf
+        ,main_channel_release_msg_rcv_buf
+        ,main_channel_send_item
+        ,main_channel_release_pipe_item
+        ,main_channel_on_error
+        ,main_channel_on_error);
+    ASSERT(main_chan);
      channel->data = main_chan;
-    main_chan->peer = peer;
-    main_chan->in_handler.shut = FALSE;
-    main_chan->in_handler.parser = 
spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
-    main_chan->in_handler.opaque = main_chan;
-    main_chan->in_handler.handle_message = main_channel_handle_message;
-    ring_init(&main_chan->outgoing.pipe);
-    main_chan->outgoing.vec = main_chan->outgoing.vec_buf;
-    peer->watch = core->watch_add(peer->socket,
-                                  SPICE_WATCH_EVENT_READ,
-                                  main_channel_event, main_chan);
  }

  int main_channel_getsockname(Channel *channel, struct sockaddr *sa, socklen_t 
*salen)
  {
      MainChannel *main_chan = channel->data;

-    return main_chan ? getsockname(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getsockname(main_chan->base.peer->socket, sa, salen) : 
-1;
  }

  int main_channel_getpeername(Channel *channel, struct sockaddr *sa, socklen_t 
*salen)
  {
      MainChannel *main_chan = channel->data;

-    return main_chan ? getpeername(main_chan->peer->socket, sa, salen) : -1;
+    return main_chan ? getpeername(main_chan->base.peer->socket, sa, salen) : 
-1;
  }

  void main_channel_close(Channel *channel)
  {
      MainChannel *main_chan = channel->data;

-    if (main_chan&&  main_chan->peer) {
-        close(main_chan->peer->socket);
+    if (main_chan&&  main_chan->base.peer) {
+        close(main_chan->base.peer->socket);
      }
  }

@@ -722,9 +827,8 @@ static void main_channel_shutdown(Channel *channel)
      MainChannel *main_chan = channel->data;

      if (main_chan != NULL) {
-        main_disconnect(main_chan); // TODO - really here? reset peer etc.
+        main_disconnect(main_chan);
      }
-    free(main_chan);
  }

  static void main_channel_migrate()
_______________________________________________
Spice-devel mailing list
[email protected]
http://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to