On Sat, Feb 12, 2011 at 08:56:40PM +0100, Marc-André Lureau wrote: > Ack. I understand that SpiceWatch is not being used, and instead > red_worker uses its own loop in red_worker_main(). >
Yes, we have our own epoll based event loop in the red_worker thread, that doesn't use watches at all. Actually this is annoying when I tried to do a packet aggregation test (need to timeout). > On Fri, Feb 11, 2011 at 6:23 PM, Alon Levy <[email protected]> wrote: > > --- > > server/red_worker.c | 437 > > +++++---------------------------------------------- > > 1 files changed, 40 insertions(+), 397 deletions(-) > > > > diff --git a/server/red_worker.c b/server/red_worker.c > > index cd3d1a8..b28709f 100644 > > --- a/server/red_worker.c > > +++ b/server/red_worker.c > > @@ -57,6 +57,7 @@ > > #include "demarshallers.h" > > #include "generated_marshallers.h" > > #include "zlib_encoder.h" > > +#include "red_channel.h" > > > > //#define COMPRESS_STAT > > //#define DUMP_BITMAP > > @@ -252,11 +253,6 @@ enum { > > PIPE_ITEM_TYPE_DESTROY_SURFACE, > > }; > > > > -typedef struct PipeItem { > > - RingItem link; > > - int type; > > -} PipeItem; > > - > > typedef struct VerbItem { > > PipeItem base; > > uint16_t verb; > > @@ -347,81 +343,6 @@ typedef struct LocalCursor { > > #define PALETTE_CACHE_HASH_MASK (PALETTE_CACHE_HASH_SIZE - 1) > > #define PALETTE_CACHE_HASH_KEY(id) ((id) & PALETTE_CACHE_HASH_MASK) > > > > -typedef struct RedChannel RedChannel; > > -typedef void (*channel_disconnect_proc)(RedChannel *channel); > > -typedef void (*channel_hold_pipe_item_proc)(RedChannel *channel, PipeItem > > *item); > > -typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem > > *item); > > -typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, > > PipeItem *item, int item_pushed); > > -typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t > > size, uint16_t type, void *message); > > - > > -#define MAX_SEND_VEC 100 > > - > > -typedef int (*get_outgoing_msg_size_proc)(void *opaque); > > -typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int > > *vec_size, int pos); > > -typedef void (*on_outgoing_error_proc)(void *opaque); > > -typedef void (*on_outgoing_block_proc)(void *opaque); > > -typedef void (*on_outgoing_msg_done_proc)(void *opaque); > > - > > -typedef struct OutgoingHandler { > > - void *opaque; > > - struct iovec vec_buf[MAX_SEND_VEC]; > > - int vec_size; > > - struct iovec *vec; > > - int pos; > > - int size; > > - get_outgoing_msg_size_proc get_msg_size; > > - prepare_outgoing_proc prepare; > > - on_outgoing_error_proc on_error; > > - on_outgoing_block_proc on_block; > > - on_outgoing_msg_done_proc on_msg_done; > > -#ifdef RED_STATISTICS > > - uint64_t *out_bytes_counter; > > -#endif > > -} OutgoingHandler; > > - > > -struct RedChannel { > > - spice_parse_channel_func_t parser; > > - RedsStreamContext *peer; > > - int migrate; > > - > > - Ring pipe; > > - uint32_t pipe_size; > > - > > - struct { > > - uint32_t client_window; > > - uint32_t generation; > > - uint32_t client_generation; > > - uint32_t messages_window; > > - } ack_data; > > - > > - struct { > > - int blocked; > > - uint64_t serial; > > - SpiceDataHeader *header; > > - SpiceMarshaller *marshaller; > > - uint32_t size; > > - uint32_t pos; > > - void *item; > > - } send_data; > > - > > - struct { > > - uint8_t buf[RECIVE_BUF_SIZE]; > > - SpiceDataHeader *message; > > - uint8_t *now; > > - uint8_t *end; > > - } incoming; > > - > > - OutgoingHandler outgoing; > > - > > - channel_disconnect_proc disconnect; > > - channel_hold_pipe_item_proc hold_item; > > - channel_release_pipe_item_proc release_item; > > - channel_handle_parsed_proc handle_parsed; > > - channel_send_pipe_item_proc send_item; > > - > > - int during_send; > > -}; > > - > > typedef struct ImageItem { > > PipeItem link; > > int refs; > > @@ -637,6 +558,7 @@ typedef struct CommonChannel { > > EventListener listener; > > uint32_t id; > > struct RedWorker *worker; > > + uint8_t recv_buf[RECIVE_BUF_SIZE]; > > } CommonChannel; > > > > > > @@ -990,7 +912,6 @@ typedef struct BitmapData { > > > > static void red_draw_qxl_drawable(RedWorker *worker, Drawable *drawable); > > static void red_current_flush(RedWorker *worker, int surface_id); > > -static void red_channel_push(RedChannel *channel); > > #ifdef DRAW_ALL > > #define red_update_area(worker, rect, surface_id) > > #define red_draw_drawable(worker, item) > > @@ -1004,9 +925,7 @@ static void red_display_release_stream(DisplayChannel > > *display, StreamAgent *age > > static inline void red_detach_stream(RedWorker *worker, Stream *stream); > > static void red_stop_stream(RedWorker *worker, Stream *stream); > > static inline void red_stream_maintenance(RedWorker *worker, Drawable > > *candidate, Drawable *sect); > > -static inline void red_channel_begin_send_message(RedChannel *channel); > > static inline void display_begin_send_message(DisplayChannel *channel); > > -static void red_channel_receive(RedChannel *channel); > > static void red_release_pixmap_cache(DisplayChannel *channel); > > static void red_release_glz(DisplayChannel *channel); > > static void red_freeze_glz(DisplayChannel *channel); > > @@ -1204,36 +1123,12 @@ static void show_draw_item(RedWorker *worker, > > DrawItem *draw_item, const char *p > > draw_item->base.rgn.extents.y2); > > } > > > > -static void red_channel_init_send_data(RedChannel *channel, uint16_t type, > > PipeItem *item) > > -{ > > - if (item) { > > - channel->hold_item(channel, item); > > - ASSERT(channel->send_data.item == NULL); > > - channel->send_data.item = item; > > - } > > - channel->send_data.header->type = type; > > -} > > - > > static inline void red_pipe_item_init(PipeItem *item, int type) > > { > > ring_item_init(&item->link); > > item->type = type; > > } > > > > -static inline void red_channel_pipe_add(RedChannel *channel, PipeItem > > *item) > > -{ > > - ASSERT(channel); > > - channel->pipe_size++; > > - ring_add(&channel->pipe, &item->link); > > -} > > - > > -static inline void red_channel_pipe_add_after(RedChannel *channel, > > PipeItem *item, PipeItem *pos) > > -{ > > - ASSERT(channel && pos); > > - channel->pipe_size++; > > - ring_add_after(&item->link, &pos->link); > > -} > > - > > static inline int pipe_item_is_linked(PipeItem *item) > > { > > return ring_item_is_linked(&item->link); > > @@ -1392,19 +1287,16 @@ static void release_upgrade_item(RedWorker* worker, > > UpgradeItem *item) > > } > > } > > > > -static void red_channel_pipe_clear(RedChannel *channel) > > +static uint8_t *common_alloc_recv_buf(RedChannel *channel, SpiceDataHeader > > *msg_header) > > { > > - PipeItem *item; > > + CommonChannel *common = SPICE_CONTAINEROF(channel, CommonChannel, > > base); > > > > - ASSERT(channel); > > - if (channel->send_data.item) { > > - channel->release_item(channel, channel->send_data.item, TRUE); > > - } > > - while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { > > - ring_remove(&item->link); > > - channel->release_item(channel, item, FALSE); > > - } > > - channel->pipe_size = 0; > > + return common->recv_buf; > > +} > > + > > +static void common_release_recv_buf(RedChannel *channel, SpiceDataHeader > > *msg_header, uint8_t* msg) > > +{ > > + return; > > } > > > > #define CLIENT_PIXMAPS_CACHE > > @@ -6041,19 +5933,6 @@ static void fill_cursor(CursorChannel > > *cursor_channel, SpiceCursor *red_cursor, > > } > > } > > > > -static inline void red_channel_reset_send_data(RedChannel *channel) > > -{ > > - spice_marshaller_reset(channel->send_data.marshaller); > > - channel->send_data.header = (SpiceDataHeader *) > > - spice_marshaller_reserve_space(channel->send_data.marshaller, > > sizeof(SpiceDataHeader)); > > - spice_marshaller_set_base(channel->send_data.marshaller, > > sizeof(SpiceDataHeader)); > > - channel->send_data.pos = 0; > > - channel->send_data.header->type = 0; > > - channel->send_data.header->size = 0; > > - channel->send_data.header->sub_list = 0; > > - channel->send_data.header->serial = ++channel->send_data.serial; > > -} > > - > > static inline void red_display_reset_send_data(DisplayChannel *channel) > > { > > red_channel_reset_send_data((RedChannel *)channel); > > @@ -7307,82 +7186,6 @@ static void inline channel_release_res(RedChannel > > *channel) > > channel->send_data.item = NULL; > > } > > > > -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec > > *vec, > > - int *vec_size, int pos) > > -{ > > - RedChannel *channel = (RedChannel *)opaque; > > - > > - *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller, > > - vec, MAX_SEND_VEC, pos); > > -} > > - > > -static void red_channel_peer_on_out_block(void *opaque) > > -{ > > - RedChannel *channel = (RedChannel *)opaque; > > - > > - channel->send_data.blocked = TRUE; > > -} > > - > > -static void red_channel_peer_on_out_msg_done(void *opaque) > > -{ > > - RedChannel *channel = (RedChannel *)opaque; > > - > > - channel->send_data.size = 0; > > - if (channel->send_data.item) { > > - channel->release_item(channel, channel->send_data.item, TRUE); > > - channel->send_data.item = NULL; > > - } > > - channel->send_data.blocked = FALSE; > > -} > > - > > -static void red_peer_handle_outgoing(RedsStreamContext *peer, > > OutgoingHandler *handler) > > -{ > > - int n; > > - > > - ASSERT(peer); > > - if (handler->size == 0) { > > - handler->vec = handler->vec_buf; > > - handler->size = handler->get_msg_size(handler->opaque); > > - if (!handler->size) { // nothing to be sent > > - return; > > - } > > - } > > - for (;;) { > > - handler->prepare(handler->opaque, handler->vec, > > &handler->vec_size, handler->pos); > > - if ((n = peer->cb_writev(peer->ctx, handler->vec, > > handler->vec_size)) == -1) { > > - switch (errno) { > > - case EAGAIN: > > - handler->on_block(handler->opaque); > > - return; > > - case EINTR: > > - break; > > - case EPIPE: > > - handler->on_error(handler->opaque); > > - return; > > - default: > > - red_printf("%s", strerror(errno)); > > - handler->on_error(handler->opaque); > > - return; > > - } > > - } else { > > - handler->pos += n; > > - stat_inc_counter(handler->out_bytes_counter, n); > > - if (handler->pos == handler->size) { // finished writing data > > - handler->on_msg_done(handler->opaque); > > - handler->vec = handler->vec_buf; > > - handler->pos = 0; > > - handler->size = 0; > > - return; > > - } > > - } > > - } > > -} > > - > > -static void red_channel_send(RedChannel *channel) > > -{ > > - red_peer_handle_outgoing(channel->peer, &channel->outgoing); > > -} > > - > > static void display_channel_push_release(DisplayChannel *channel, uint8_t > > type, uint64_t id, > > uint64_t* sync_data) > > { > > @@ -7407,16 +7210,6 @@ static void > > display_channel_push_release(DisplayChannel *channel, uint8_t type, > > free_list->res->resources[free_list->res->count++].id = id; > > } > > > > -static inline void red_channel_begin_send_message(RedChannel *channel) > > -{ > > - spice_marshaller_flush(channel->send_data.marshaller); > > - channel->send_data.size = > > spice_marshaller_get_total_size(channel->send_data.marshaller); > > - channel->send_data.header->size = channel->send_data.size - > > sizeof(SpiceDataHeader); > > - channel->ack_data.messages_window++; > > - channel->send_data.header = NULL; /* avoid writing to this until we > > have a new message */ > > - red_channel_send(channel); > > -} > > - > > static inline void display_begin_send_message(DisplayChannel *channel) > > { > > FreeList *free_list = &channel->send_data.free_list; > > @@ -8298,25 +8091,6 @@ static void red_send_surface_destroy(DisplayChannel > > *display, uint32_t surface_i > > red_channel_begin_send_message(channel); > > } > > > > -static inline int red_channel_waiting_for_ack(RedChannel *channel) > > -{ > > - return (channel->ack_data.messages_window > > > channel->ack_data.client_window * 2); > > -} > > - > > -static inline PipeItem *red_channel_pipe_get(RedChannel *channel) > > -{ > > - PipeItem *item; > > - if (!channel || channel->send_data.blocked || > > - red_channel_waiting_for_ack(channel) || > > - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { > > - return NULL; > > - } > > - > > - --channel->pipe_size; > > - ring_remove(&item->link); > > - return item; > > -} > > - > > static void display_channel_send_item(RedChannel *base, PipeItem > > *pipe_item) > > { > > DisplayChannel *display_channel = (DisplayChannel > > *)red_ref_channel(base); > > @@ -8410,29 +8184,6 @@ static void display_channel_send_item(RedChannel > > *base, PipeItem *pipe_item) > > red_unref_channel(&display_channel->common.base); > > } > > > > -void red_channel_push(RedChannel *channel) > > -{ > > - PipeItem *pipe_item; > > - > > - if (!channel) { > > - return; > > - } > > - if (!channel->during_send) { > > - channel->during_send = TRUE; > > - } else { > > - return; > > - } > > - > > - if (channel->send_data.blocked) { > > - red_channel_send(channel); > > - } > > - > > - while ((pipe_item = red_channel_pipe_get(channel))) { > > - channel->send_item(channel, pipe_item); > > - } > > - channel->during_send = FALSE; > > -} > > - > > static void cursor_channel_send_item(RedChannel *channel, PipeItem > > *pipe_item) > > { > > CursorChannel *cursor_channel = SPICE_CONTAINEROF(channel, > > CursorChannel, common.base); > > @@ -8556,11 +8307,7 @@ void red_show_tree(RedWorker *worker) > > } > > } > > > > -static inline int red_channel_is_connected(RedChannel *channel) > > -{ > > - return !!channel->peer; > > -} > > - > > +// TODO: move to red_channel > > static void red_disconnect_channel(RedChannel *channel) > > { > > channel_release_res(channel); > > @@ -8570,7 +8317,7 @@ static void red_disconnect_channel(RedChannel > > *channel) > > > > channel->peer = NULL; > > channel->send_data.blocked = FALSE; > > - channel->send_data.size = channel->send_data.pos = 0; > > + channel->send_data.size = 0; > > spice_marshaller_reset(channel->send_data.marshaller); > > red_unref_channel(channel); > > } > > @@ -8963,26 +8710,6 @@ static void on_new_display_channel(RedWorker *worker) > > } > > } > > > > -static int red_channel_handle_message(RedChannel *channel, uint32_t size, > > uint16_t type, void *message) > > -{ > > - switch (type) { > > - case SPICE_MSGC_ACK_SYNC: > > - channel->ack_data.client_generation = *(uint32_t *)message; > > - break; > > - case SPICE_MSGC_ACK: > > - if (channel->ack_data.client_generation == > > channel->ack_data.generation) { > > - channel->ack_data.messages_window -= > > channel->ack_data.client_window; > > - } > > - break; > > - case SPICE_MSGC_DISCONNECTING: > > - break; > > - default: > > - red_printf("invalid message type %u", type); > > - return FALSE; > > - } > > - return TRUE; > > -} > > - > > static GlzSharedDictionary *_red_find_glz_dictionary(uint8_t dict_id) > > { > > RingItem *now; > > @@ -9289,78 +9016,6 @@ static int display_channel_handle_message(RedChannel > > *channel, uint32_t size, ui > > } > > } > > > > -static void red_channel_receive(RedChannel *channel) > > -{ > > - for (;;) { > > - ssize_t n; > > - n = channel->incoming.end - channel->incoming.now; > > - ASSERT(n); > > - ASSERT(channel->peer); > > - if ((n = channel->peer->cb_read(channel->peer->ctx, > > channel->incoming.now, n)) <= 0) { > > - if (n == 0) { > > - channel->disconnect(channel); > > - return; > > - } > > - ASSERT(n == -1); > > - switch (errno) { > > - case EAGAIN: > > - return; > > - case EINTR: > > - break; > > - case EPIPE: > > - channel->disconnect(channel); > > - return; > > - default: > > - red_printf("%s", strerror(errno)); > > - channel->disconnect(channel); > > - return; > > - } > > - } else { > > - channel->incoming.now += n; > > - for (;;) { > > - SpiceDataHeader *header = channel->incoming.message; > > - uint8_t *data = (uint8_t *)(header+1); > > - size_t parsed_size; > > - uint8_t *parsed; > > - message_destructor_t parsed_free; > > - > > - n = channel->incoming.now - (uint8_t *)header; > > - if (n < sizeof(SpiceDataHeader) || > > - n < sizeof(SpiceDataHeader) + header->size) { > > - break; > > - } > > - parsed = channel->parser((void *)data, data + > > header->size, header->type, > > - SPICE_VERSION_MINOR, > > &parsed_size, &parsed_free); > > - > > - if (parsed == NULL) { > > - red_printf("failed to parse message type %d", > > header->type); > > - channel->disconnect(channel); > > - return; > > - } > > - > > - if (!channel->handle_parsed(channel, parsed_size, > > header->type, parsed)) { > > - free(parsed); > > - channel->disconnect(channel); > > - return; > > - } > > - parsed_free(parsed); > > - channel->incoming.message = (SpiceDataHeader *)((uint8_t > > *)header + > > - > > sizeof(SpiceDataHeader) + > > - > > header->size); > > - } > > - > > - if (channel->incoming.now == (uint8_t > > *)channel->incoming.message) { > > - channel->incoming.now = channel->incoming.buf; > > - channel->incoming.message = (SpiceDataHeader > > *)channel->incoming.buf; > > - } else if (channel->incoming.now == channel->incoming.end) { > > - memcpy(channel->incoming.buf, channel->incoming.message, > > n); > > - channel->incoming.now = channel->incoming.buf + n; > > - channel->incoming.message = (SpiceDataHeader > > *)channel->incoming.buf; > > - } > > - } > > - } > > -} > > - > > int common_channel_config_socket(RedChannel *channel) > > { > > int flags; > > @@ -9391,19 +9046,25 @@ static void > > free_common_channel_from_listener(EventListener *ctx) > > > > free(common); > > } > > - > > -static void red_channel_default_peer_on_error(RedChannel *channel) > > +void worker_watch_update_mask(SpiceWatch *watch, int event_mask) > > { > > - channel->disconnect(channel); > > } > > > > -static int red_channel_peer_get_out_msg_size(void *opaque) > > +SpiceWatch *worker_watch_add(int fd, int event_mask, SpiceWatchFunc func, > > void *opaque) > > { > > - RedChannel *channel = (RedChannel *)opaque; > > + return NULL; // apparently allowed? > > +} > > > > - return channel->send_data.size; > > +void worker_watch_remove(SpiceWatch *watch) > > +{ > > } > > > > +SpiceCoreInterface worker_core = { > > + .watch_update_mask = worker_watch_update_mask, > > + .watch_add = worker_watch_add, > > + .watch_remove = worker_watch_remove, > > +}; > > + > > static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t > > channel_id, > > RedsStreamContext *peer, int migrate, > > event_listener_action_proc handler, > > @@ -9417,47 +9078,31 @@ static RedChannel *__new_channel(RedWorker *worker, > > int size, uint32_t channel_i > > RedChannel *channel; > > CommonChannel *common; > > > > - ASSERT(size >= sizeof(*channel)); > > - common = spice_malloc0(size); > > - channel = &common->base; > > - ASSERT(common == (CommonChannel*)channel); > > - channel->peer = peer; > > - if (!common_channel_config_socket(channel)) { > > + channel = red_channel_create_parser(size, peer, &worker_core, migrate, > > + TRUE /* handle_acks */, > > + common_channel_config_socket, > > + > > spice_get_client_channel_parser(channel_id, NULL), > > + handle_parsed, > > + common_alloc_recv_buf, > > + common_release_recv_buf, > > + hold_item, > > + send_item, > > + release_item, > > + red_channel_default_peer_on_error, > > + red_channel_default_peer_on_error); > > + common = (CommonChannel *)channel; > > + if (!channel) { > > goto error; > > } > > common->id = worker->id; > > - channel->parser = spice_get_client_channel_parser(channel_id, NULL); > > common->listener.refs = 1; > > common->listener.action = handler; > > common->listener.free = free_common_channel_from_listener; > > - channel->disconnect = disconnect; > > - channel->send_item = send_item; > > - channel->hold_item = hold_item; > > - channel->release_item = release_item; > > - channel->handle_parsed = handle_parsed; > > - channel->peer = peer; > > common->worker = worker; > > - channel->ack_data.messages_window = ~0; // blocks send message (maybe > > use send_data.blocked + > > - // block flags) > > + // TODO: Should this be distinctive for the Display/Cursor channels? > > doesn't > > + // make sense, does it? > > channel->ack_data.client_window = IS_LOW_BANDWIDTH() ? > > WIDE_CLIENT_ACK_WINDOW : > > > > NARROW_CLIENT_ACK_WINDOW; > > - channel->ack_data.client_generation = ~0; > > - channel->incoming.message = (SpiceDataHeader *)channel->incoming.buf; > > - channel->incoming.now = channel->incoming.buf; > > - channel->incoming.end = channel->incoming.buf + > > sizeof(channel->incoming.buf); > > - ring_init(&channel->pipe); > > - channel->send_data.marshaller = spice_marshaller_new(); > > - > > - channel->outgoing.opaque = channel; > > - channel->outgoing.pos = 0; > > - channel->outgoing.size = 0; > > - channel->outgoing.out_bytes_counter = 0; > > - > > - channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; > > - channel->outgoing.prepare = red_channel_peer_prepare_out_msg; > > - channel->outgoing.on_block = red_channel_peer_on_out_block; > > - channel->outgoing.on_error = > > (on_outgoing_error_proc)red_channel_default_peer_on_error; > > - channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; > > > > event.events = EPOLLIN | EPOLLOUT | EPOLLET; > > event.data.ptr = &common->listener; > > @@ -9466,8 +9111,6 @@ static RedChannel *__new_channel(RedWorker *worker, > > int size, uint32_t channel_i > > goto error; > > } > > > > - channel->migrate = migrate; > > - > > return channel; > > > > error: > > -- > > 1.7.4 > > > > _______________________________________________ > > Spice-devel mailing list > > [email protected] > > http://lists.freedesktop.org/mailman/listinfo/spice-devel > > > > > > -- > Marc-André Lureau _______________________________________________ Spice-devel mailing list [email protected] http://lists.freedesktop.org/mailman/listinfo/spice-devel
