>> From: Wang Yong <wang.yong...@zte.com.cn> >>
>> Process pactkets in the IOThread which arrived over the socket. >> we use qio_channel_set_aio_fd_handler to set the handlers on the >> IOThread AioContext.then the packets from the primary and the secondary >> are processed in the IOThread. >> Finally remove the colo-compare thread using the IOThread instead. >> >> Signed-off-by: Wang Yong<wang.yong...@zte.com.cn> >> Signed-off-by: Wang Guang<wang.guan...@zte.com.cn> >> --- >> net/colo-compare.c | 133 ++++++++++++++++++++++++++++++++++++----------------- >> net/colo.h | 1 + >> 2 files changed, 91 insertions(+), 43 deletions(-) >> >> diff --git a/net/colo-compare.c b/net/colo-compare.c >> index b0942a4..e3af791 100644 >> --- a/net/colo-compare.c >> +++ b/net/colo-compare.c >> @@ -29,6 +29,7 @@ >> #include "qemu/sockets.h" >> #include "qapi-visit.h" >> #include "net/colo.h" >> +#include "io/channel.h" >> #include "sysemu/iothread.h" >> >> #define TYPE_COLO_COMPARE "colo-compare" >> @@ -82,11 +83,6 @@ typedef struct CompareState { >> GQueue conn_list >> /* hashtable to save connection */ >> GHashTable *connection_track_table >> - /* compare thread, a thread for each NIC */ >> - QemuThread thread >> - >> - GMainContext *worker_context >> - GMainLoop *compare_loop >> >> /*compare iothread*/ >> IOThread *iothread >> @@ -95,6 +91,14 @@ typedef struct CompareState { >> QEMUTimer *packet_check_timer >> } CompareState >> >> +typedef struct { >> + Chardev parent >> + QIOChannel *ioc /*I/O channel */ >We probably don't want to manipulate char backend's internal io channel. >All need here is to access the frontend API (char-fe.c) I believe, and >hide the internal implementation. char-fd.c ? These API can only watch events in the qemu main thread, not in the IOThread. I had to use the qio_channel_socket_set_aio_fd_handler function to monitor the char event in the IOThread,so the io channel is used here. ->qio_channel_socket_set_aio_fd_handler ->aio_set_fd_handler Thanks >> +} CompareChardev >> + >> +#define COMPARE_CHARDEV(obj) \ >> + OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET) >> + >> typedef struct CompareClass { >> ObjectClass parent_class >> } CompareClass >> @@ -107,6 +111,12 @@ enum { >> static int compare_chr_send(CharBackend *out, >> const uint8_t *buf, >> uint32_t size) >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b, >> + AioContext *ctx, >> + IOCanReadHandler *fd_can_read, >> + IOReadHandler *fd_read, >> + IOEventHandler *fd_event, >> + void *opaque) >> >> static gint seq_sorter(Packet *a, Packet *b, gpointer data) >> { >> @@ -534,6 +544,30 @@ err: >> return ret < 0 ? ret : -EIO >> } >> >> +static void compare_chr_read(void *opaque) >> +{ >> + Chardev *chr = opaque >> + uint8_t buf[CHR_READ_BUF_LEN] >> + int len, size >> + int max_size >> + >> + max_size = qemu_chr_be_can_write(chr) >> + if (max_size <= 0) { >> + return >> + } >> + >> + len = sizeof(buf) >> + if (len > max_size) { >> + len = max_size >> + } >> + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len) >> + if (size == 0) { >> + return >> + } else if (size > 0) { >> + qemu_chr_be_write(chr, buf, size) >> + } >> +} >> + >> static int compare_chr_can_read(void *opaque) >> { >> return COMPARE_READ_LEN_MAX >> @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) >> >> ret = net_fill_rstate(&s->pri_rs, buf, size) >> if (ret == -1) { >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, >> - NULL, NULL, true) >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, >> + NULL, NULL, NULL, NULL) >> error_report("colo-compare primary_in error") >> } >> } >> @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) >> >> ret = net_fill_rstate(&s->sec_rs, buf, size) >> if (ret == -1) { >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, >> - NULL, NULL, true) >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, >> + NULL, NULL, NULL, NULL) >> error_report("colo-compare secondary_in error") >> } >> } >> @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s) >> } >> } >> >> -static void *colo_compare_thread(void *opaque) >> -{ >> - CompareState *s = opaque >> - >> - s->worker_context = g_main_context_new() >> - >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, >> - compare_pri_chr_in, NULL, s, s->worker_context, true) >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, >> - compare_sec_chr_in, NULL, s, s->worker_context, true) >> - >> - s->compare_loop = g_main_loop_new(s->worker_context, FALSE) >> - >> - g_main_loop_run(s->compare_loop) >> - >> - g_main_loop_unref(s->compare_loop) >> - g_main_context_unref(s->worker_context) >> - return NULL >> -} >> >> static void colo_compare_iothread(CompareState *s) >> { >> object_ref(OBJECT(s->iothread)) >> s->ctx = iothread_get_aio_context(s->iothread) >> >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, >> + compare_chr_can_read, >> + compare_pri_chr_in, >> + NULL, >> + s) >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, >> + compare_chr_can_read, >> + compare_sec_chr_in, >> + NULL, >> + s) >> + >> colo_compare_timer_init(s) >> } >> >> +static void compare_chr_set_aio_fd_handlers(CharBackend *b, >> + AioContext *ctx, >> + IOCanReadHandler *fd_can_read, >> + IOReadHandler *fd_read, >> + IOEventHandler *fd_event, >> + void *opaque) >> +{ >> + CompareChardev *s >> + >> + if (!b->chr) { >> + return >> + } >> + s = COMPARE_CHARDEV(b->chr) >> + if (!s->ioc) { >> + return >> + } >So this is hacky, you can refer how vhost-user validate udp socket char >backend. I will investigate. Thanks >> + >> + b->chr_can_read = fd_can_read >> + b->chr_read = fd_read >> + b->chr_event = fd_event >> + b->opaque = opaque >> + remove_fd_in_watch(b->chr) >> + >> + if (b->chr_read) { >> + qio_channel_set_aio_fd_handler(s->ioc, ctx, >> + compare_chr_read, NULL, b->chr) >> + } else { >> + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL) >So instead of doing such hack, how about passing a AioContext * instead >of GMainContext * to qemu_chr_fe_set_handlers? IOThread AioContext ->GSource -> GMainContext is NULL if we still use the qemu_chr_fe_set_handlers, it will use the qemu main thread' GMainContext, then io will still be processed in the qemu main thread. so I encapsulate a function(compare_chr_set_aio_fd_handlers) to monitor char fd in the IOThread. Thanks >Thanks >> + } >> +} >> + >> static char *compare_get_pri_indev(Object *obj, Error **errp) >> { >> CompareState *s = COLO_COMPARE(obj) >> @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) >> { >> CompareState *s = COLO_COMPARE(uc) >> Chardev *chr >> - char thread_name[64] >> - static int compare_id >> >> if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { >> error_setg(errp, "colo compare needs 'primary_in' ," >> @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) >> g_free, >> connection_destroy) >> >> - sprintf(thread_name, "colo-compare %d", compare_id) >> - qemu_thread_create(&s->thread, thread_name, >> - colo_compare_thread, s, >> - QEMU_THREAD_JOINABLE) >> - compare_id++ >> - >> colo_compare_iothread(s) >> >> return >> @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj) >> { >> CompareState *s = COLO_COMPARE(obj) >> >> - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, >> - s->worker_context, true) >> - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, >> - s->worker_context, true) >> + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, >> + NULL, NULL, NULL, NULL) >> + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, >> + NULL, NULL, NULL, NULL) >> + >> qemu_chr_fe_deinit(&s->chr_out) >> colo_compare_timer_del(s) >> >> - g_main_loop_quit(s->compare_loop) >> - qemu_thread_join(&s->thread) >> - >> /* Release all unhandled packets after compare thead exited */ >> g_queue_foreach(&s->conn_list, colo_flush_packets, s) >> >> diff --git a/net/colo.h b/net/colo.h >> index 7c524f3..936dea1 100644 >> --- a/net/colo.h >> +++ b/net/colo.h >> @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable *connection_track_table, >> void connection_hashtable_reset(GHashTable *connection_track_table) >> Packet *packet_new(const void *data, int size) >> void packet_destroy(void *opaque, void *user_data) >> +void remove_fd_in_watch(Chardev *chr) >> >> #endif /* QEMU_COLO_PROXY_H */ 原始邮件 发件人: <jasow...@redhat.com> 收件人:王勇10170530 <zhang.zhanghaili...@huawei.com> <zhangchen.f...@cn.fujitsu.com> 抄送人: <lizhij...@cn.fujitsu.com> <qemu-devel@nongnu.org>王广10165992 日 期 :2017年06月07日 16:35 主 题 :Re: [PATCHv2 02/04] colo-compare: Process pactkets in the IOThread ofthe primary On 2017年06月05日 18:44, Yong Wang wrote: > From: Wang Yong <wang.yong...@zte.com.cn> > > Process pactkets in the IOThread which arrived over the socket. > we use qio_channel_set_aio_fd_handler to set the handlers on the > IOThread AioContext.then the packets from the primary and the secondary > are processed in the IOThread. > Finally remove the colo-compare thread using the IOThread instead. > > Signed-off-by: Wang Yong<wang.yong...@zte.com.cn> > Signed-off-by: Wang Guang<wang.guan...@zte.com.cn> > --- > net/colo-compare.c | 133 ++++++++++++++++++++++++++++++++++++----------------- > net/colo.h | 1 + > 2 files changed, 91 insertions(+), 43 deletions(-) > > diff --git a/net/colo-compare.c b/net/colo-compare.c > index b0942a4..e3af791 100644 > --- a/net/colo-compare.c > +++ b/net/colo-compare.c > @@ -29,6 +29,7 @@ > #include "qemu/sockets.h" > #include "qapi-visit.h" > #include "net/colo.h" > +#include "io/channel.h" > #include "sysemu/iothread.h" > > #define TYPE_COLO_COMPARE "colo-compare" > @@ -82,11 +83,6 @@ typedef struct CompareState { > GQueue conn_list > /* hashtable to save connection */ > GHashTable *connection_track_table > - /* compare thread, a thread for each NIC */ > - QemuThread thread > - > - GMainContext *worker_context > - GMainLoop *compare_loop > > /*compare iothread*/ > IOThread *iothread > @@ -95,6 +91,14 @@ typedef struct CompareState { > QEMUTimer *packet_check_timer > } CompareState > > +typedef struct { > + Chardev parent > + QIOChannel *ioc /*I/O channel */ We probably don't want to manipulate char backend's internal io channel. All need here is to access the frontend API (char-fe.c) I believe, and hide the internal implementation. > +} CompareChardev > + > +#define COMPARE_CHARDEV(obj) \ > + OBJECT_CHECK(CompareChardev, (obj), TYPE_CHARDEV_SOCKET) > + > typedef struct CompareClass { > ObjectClass parent_class > } CompareClass > @@ -107,6 +111,12 @@ enum { > static int compare_chr_send(CharBackend *out, > const uint8_t *buf, > uint32_t size) > +static void compare_chr_set_aio_fd_handlers(CharBackend *b, > + AioContext *ctx, > + IOCanReadHandler *fd_can_read, > + IOReadHandler *fd_read, > + IOEventHandler *fd_event, > + void *opaque) > > static gint seq_sorter(Packet *a, Packet *b, gpointer data) > { > @@ -534,6 +544,30 @@ err: > return ret < 0 ? ret : -EIO > } > > +static void compare_chr_read(void *opaque) > +{ > + Chardev *chr = opaque > + uint8_t buf[CHR_READ_BUF_LEN] > + int len, size > + int max_size > + > + max_size = qemu_chr_be_can_write(chr) > + if (max_size <= 0) { > + return > + } > + > + len = sizeof(buf) > + if (len > max_size) { > + len = max_size > + } > + size = CHARDEV_GET_CLASS(chr)->chr_sync_read(chr, (void *)buf, len) > + if (size == 0) { > + return > + } else if (size > 0) { > + qemu_chr_be_write(chr, buf, size) > + } > +} > + > static int compare_chr_can_read(void *opaque) > { > return COMPARE_READ_LEN_MAX > @@ -550,8 +584,8 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) > > ret = net_fill_rstate(&s->pri_rs, buf, size) > if (ret == -1) { > - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, > - NULL, NULL, true) > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, > + NULL, NULL, NULL, NULL) > error_report("colo-compare primary_in error") > } > } > @@ -567,8 +601,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) > > ret = net_fill_rstate(&s->sec_rs, buf, size) > if (ret == -1) { > - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, > - NULL, NULL, true) > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, > + NULL, NULL, NULL, NULL) > error_report("colo-compare secondary_in error") > } > } > @@ -605,34 +639,57 @@ static void colo_compare_timer_del(CompareState *s) > } > } > > -static void *colo_compare_thread(void *opaque) > -{ > - CompareState *s = opaque > - > - s->worker_context = g_main_context_new() > - > - qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, > - compare_pri_chr_in, NULL, s, s->worker_context, true) > - qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, > - compare_sec_chr_in, NULL, s, s->worker_context, true) > - > - s->compare_loop = g_main_loop_new(s->worker_context, FALSE) > - > - g_main_loop_run(s->compare_loop) > - > - g_main_loop_unref(s->compare_loop) > - g_main_context_unref(s->worker_context) > - return NULL > -} > > static void colo_compare_iothread(CompareState *s) > { > object_ref(OBJECT(s->iothread)) > s->ctx = iothread_get_aio_context(s->iothread) > > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, > + compare_chr_can_read, > + compare_pri_chr_in, > + NULL, > + s) > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, > + compare_chr_can_read, > + compare_sec_chr_in, > + NULL, > + s) > + > colo_compare_timer_init(s) > } > > +static void compare_chr_set_aio_fd_handlers(CharBackend *b, > + AioContext *ctx, > + IOCanReadHandler *fd_can_read, > + IOReadHandler *fd_read, > + IOEventHandler *fd_event, > + void *opaque) > +{ > + CompareChardev *s > + > + if (!b->chr) { > + return > + } > + s = COMPARE_CHARDEV(b->chr) > + if (!s->ioc) { > + return > + } So this is hacky, you can refer how vhost-user validate udp socket char backend. > + > + b->chr_can_read = fd_can_read > + b->chr_read = fd_read > + b->chr_event = fd_event > + b->opaque = opaque > + remove_fd_in_watch(b->chr) > + > + if (b->chr_read) { > + qio_channel_set_aio_fd_handler(s->ioc, ctx, > + compare_chr_read, NULL, b->chr) > + } else { > + qio_channel_set_aio_fd_handler(s->ioc, ctx, NULL, NULL, NULL) So instead of doing such hack, how about passing a AioContext * instead of GMainContext * to qemu_chr_fe_set_handlers? Thanks > + } > +} > + > static char *compare_get_pri_indev(Object *obj, Error **errp) > { > CompareState *s = COLO_COMPARE(obj) > @@ -736,8 +793,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) > { > CompareState *s = COLO_COMPARE(uc) > Chardev *chr > - char thread_name[64] > - static int compare_id > > if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { > error_setg(errp, "colo compare needs 'primary_in' ," > @@ -776,12 +831,6 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) > g_free, > connection_destroy) > > - sprintf(thread_name, "colo-compare %d", compare_id) > - qemu_thread_create(&s->thread, thread_name, > - colo_compare_thread, s, > - QEMU_THREAD_JOINABLE) > - compare_id++ > - > colo_compare_iothread(s) > > return > @@ -834,16 +883,14 @@ static void colo_compare_finalize(Object *obj) > { > CompareState *s = COLO_COMPARE(obj) > > - qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, > - s->worker_context, true) > - qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, > - s->worker_context, true) > + compare_chr_set_aio_fd_handlers(&s->chr_pri_in, s->ctx, > + NULL, NULL, NULL, NULL) > + compare_chr_set_aio_fd_handlers(&s->chr_sec_in, s->ctx, > + NULL, NULL, NULL, NULL) > + > qemu_chr_fe_deinit(&s->chr_out) > colo_compare_timer_del(s) > > - g_main_loop_quit(s->compare_loop) > - qemu_thread_join(&s->thread) > - > /* Release all unhandled packets after compare thead exited */ > g_queue_foreach(&s->conn_list, colo_flush_packets, s) > > diff --git a/net/colo.h b/net/colo.h > index 7c524f3..936dea1 100644 > --- a/net/colo.h > +++ b/net/colo.h > @@ -84,5 +84,6 @@ Connection *connection_get(GHashTable *connection_track_table, > void connection_hashtable_reset(GHashTable *connection_track_table) > Packet *packet_new(const void *data, int size) > void packet_destroy(void *opaque, void *user_data) > +void remove_fd_in_watch(Chardev *chr) > > #endif /* QEMU_COLO_PROXY_H */