Re: [PATCH v3 2/2] net/colo-compare.c: handling of the full primary or secondary queue

2020-03-27 Thread Derek Su
Lukas Straub  於 2020年3月28日 週六 上午2:28寫道:
>
> On Sat, 28 Mar 2020 02:20:21 +0800
> Derek Su  wrote:
>
> > Lukas Straub  於 2020年3月28日 週六 上午1:46寫道:
> > >
> > > On Wed, 25 Mar 2020 17:43:54 +0800
> > > Derek Su  wrote:
> > >
> > > > The pervious handling of the full primary or queue is only dropping
> > > > the packet. If there are lots of clients to the guest VM,
> > > > the "drop" will lead to the lost of the networking connection
> > > > until next checkpoint.
> > > >
> > > > To address the issue, this patch drops the packet firstly.
> > > > Then, send all queued primary packets, remove all queued secondary
> > > > packets and do checkpoint.
> > > >
> > > > Signed-off-by: Derek Su 
> > > > ---
> > > >  net/colo-compare.c | 41 ++---
> > > >  1 file changed, 30 insertions(+), 11 deletions(-)
> > > >
> > > > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > > > index cdd87b2aa8..1a52f50fbe 100644
> > > > --- a/net/colo-compare.c
> > > > +++ b/net/colo-compare.c
> > > > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> > > >  [SECONDARY_IN] = "secondary",
> > > >  };
> > > >
> > > > +enum {
> > > > +QUEUE_INSERT_ERR = -1,
> > > > +QUEUE_INSERT_OK = 0,
> > > > +QUEUE_INSERT_FULL = 1,
> > > > +};
> > > > +
> > > >  static int compare_chr_send(CompareState *s,
> > > >  const uint8_t *buf,
> > > >  uint32_t size,
> > > > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, 
> > > > Packet *pkt, uint32_t *max_ack)
> > > >  }
> > > >
> > > >  /*
> > > > - * Return 0 on success, if return -1 means the pkt
> > > > - * is unsupported(arp and ipv6) and will be sent later
> > > > + * Return QUEUE_INSERT_OK on success.
> > > > + * If return QUEUE_INSERT_FULL means list is full, and
> > > > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > > > + * will be sent later
> > > >   */
> > > >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > > >  {
> > > > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int 
> > > > mode, Connection **con)
> > > >  if (parse_packet_early(pkt)) {
> > > >  packet_destroy(pkt, NULL);
> > > >  pkt = NULL;
> > > > -return -1;
> > > > +return QUEUE_INSERT_ERR;
> > > >  }
> > > >  fill_connection_key(pkt, );
> > > >
> > > > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int 
> > > > mode, Connection **con)
> > > >   "drop packet", colo_mode[mode]);
> > > >  packet_destroy(pkt, NULL);
> > > >  pkt = NULL;
> > > > +return QUEUE_INSERT_FULL;
> > > >  }
> > > >
> > > >  *con = conn;
> > > >
> > > > -return 0;
> > > > +return QUEUE_INSERT_OK;
> > > >  }
> > > >
> > > >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > > > @@ -995,17 +1004,22 @@ static void 
> > > > compare_pri_rs_finalize(SocketReadState *pri_rs)
> > > >  {
> > > >  CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> > > >  Connection *conn = NULL;
> > > > +int ret;
> > > >
> > > > -if (packet_enqueue(s, PRIMARY_IN, )) {
> > > > +ret = packet_enqueue(s, PRIMARY_IN, );
> > > > +if (ret == QUEUE_INSERT_OK) {
> > > > +/* compare packet in the specified connection */
> > > > +colo_compare_connection(conn, s);
> > > > +} else if (ret == QUEUE_INSERT_FULL) {
> > > > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > > > +colo_compare_inconsistency_notify(s);
> > > > +} else {
> > > >  trace_colo_compare_main("primary: unsupported packet in");
> > > >  compare_chr_send(s,
> > > >   pri_rs->buf,
> > > >   pri_rs->packet_len,
> > > >   pri_rs->vnet_hdr_len,
> > > >   false);
> > > > -} else {
> > > > -/* compare packet in the specified connection */
> > > > -colo_compare_connection(conn, s);
> > > >  }
> > > >  }
> > > >
> > > > @@ -1013,12 +1027,17 @@ static void 
> > > > compare_sec_rs_finalize(SocketReadState *sec_rs)
> > > >  {
> > > >  CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> > > >  Connection *conn = NULL;
> > > > +int ret;
> > > >
> > > > -if (packet_enqueue(s, SECONDARY_IN, )) {
> > > > -trace_colo_compare_main("secondary: unsupported packet in");
> > > > -} else {
> > > > +ret = packet_enqueue(s, SECONDARY_IN, );
> > > > +if (ret == QUEUE_INSERT_OK) {
> > > >  /* compare packet in the specified connection */
> > > >  colo_compare_connection(conn, s);
> > > > +} else if (ret == QUEUE_INSERT_FULL) {
> > > > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > > > +colo_compare_inconsistency_notify(s);
> > > > +} else {
> > > > +trace_colo_compare_main("secondary: unsupported packet in");
> > > >  

Re: [PATCH v3 2/2] net/colo-compare.c: handling of the full primary or secondary queue

2020-03-27 Thread Lukas Straub
On Sat, 28 Mar 2020 02:20:21 +0800
Derek Su  wrote:

> Lukas Straub  於 2020年3月28日 週六 上午1:46寫道:
> >
> > On Wed, 25 Mar 2020 17:43:54 +0800
> > Derek Su  wrote:
> >  
> > > The pervious handling of the full primary or queue is only dropping
> > > the packet. If there are lots of clients to the guest VM,
> > > the "drop" will lead to the lost of the networking connection
> > > until next checkpoint.
> > >
> > > To address the issue, this patch drops the packet firstly.
> > > Then, send all queued primary packets, remove all queued secondary
> > > packets and do checkpoint.
> > >
> > > Signed-off-by: Derek Su 
> > > ---
> > >  net/colo-compare.c | 41 ++---
> > >  1 file changed, 30 insertions(+), 11 deletions(-)
> > >
> > > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > > index cdd87b2aa8..1a52f50fbe 100644
> > > --- a/net/colo-compare.c
> > > +++ b/net/colo-compare.c
> > > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> > >  [SECONDARY_IN] = "secondary",
> > >  };
> > >
> > > +enum {
> > > +QUEUE_INSERT_ERR = -1,
> > > +QUEUE_INSERT_OK = 0,
> > > +QUEUE_INSERT_FULL = 1,
> > > +};
> > > +
> > >  static int compare_chr_send(CompareState *s,
> > >  const uint8_t *buf,
> > >  uint32_t size,
> > > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet 
> > > *pkt, uint32_t *max_ack)
> > >  }
> > >
> > >  /*
> > > - * Return 0 on success, if return -1 means the pkt
> > > - * is unsupported(arp and ipv6) and will be sent later
> > > + * Return QUEUE_INSERT_OK on success.
> > > + * If return QUEUE_INSERT_FULL means list is full, and
> > > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > > + * will be sent later
> > >   */
> > >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> > >  {
> > > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, 
> > > Connection **con)
> > >  if (parse_packet_early(pkt)) {
> > >  packet_destroy(pkt, NULL);
> > >  pkt = NULL;
> > > -return -1;
> > > +return QUEUE_INSERT_ERR;
> > >  }
> > >  fill_connection_key(pkt, );
> > >
> > > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int 
> > > mode, Connection **con)
> > >   "drop packet", colo_mode[mode]);
> > >  packet_destroy(pkt, NULL);
> > >  pkt = NULL;
> > > +return QUEUE_INSERT_FULL;
> > >  }
> > >
> > >  *con = conn;
> > >
> > > -return 0;
> > > +return QUEUE_INSERT_OK;
> > >  }
> > >
> > >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > > @@ -995,17 +1004,22 @@ static void 
> > > compare_pri_rs_finalize(SocketReadState *pri_rs)
> > >  {
> > >  CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> > >  Connection *conn = NULL;
> > > +int ret;
> > >
> > > -if (packet_enqueue(s, PRIMARY_IN, )) {
> > > +ret = packet_enqueue(s, PRIMARY_IN, );
> > > +if (ret == QUEUE_INSERT_OK) {
> > > +/* compare packet in the specified connection */
> > > +colo_compare_connection(conn, s);
> > > +} else if (ret == QUEUE_INSERT_FULL) {
> > > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > > +colo_compare_inconsistency_notify(s);
> > > +} else {
> > >  trace_colo_compare_main("primary: unsupported packet in");
> > >  compare_chr_send(s,
> > >   pri_rs->buf,
> > >   pri_rs->packet_len,
> > >   pri_rs->vnet_hdr_len,
> > >   false);
> > > -} else {
> > > -/* compare packet in the specified connection */
> > > -colo_compare_connection(conn, s);
> > >  }
> > >  }
> > >
> > > @@ -1013,12 +1027,17 @@ static void 
> > > compare_sec_rs_finalize(SocketReadState *sec_rs)
> > >  {
> > >  CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> > >  Connection *conn = NULL;
> > > +int ret;
> > >
> > > -if (packet_enqueue(s, SECONDARY_IN, )) {
> > > -trace_colo_compare_main("secondary: unsupported packet in");
> > > -} else {
> > > +ret = packet_enqueue(s, SECONDARY_IN, );
> > > +if (ret == QUEUE_INSERT_OK) {
> > >  /* compare packet in the specified connection */
> > >  colo_compare_connection(conn, s);
> > > +} else if (ret == QUEUE_INSERT_FULL) {
> > > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > > +colo_compare_inconsistency_notify(s);
> > > +} else {
> > > +trace_colo_compare_main("secondary: unsupported packet in");
> > >  }
> > >  }
> > >  
> >
> > Hi,
> > I don't think we have to flush here because the (post-)checkpoint event 
> > will flush the packets for us.
> >  
> 
> Hi,
> Yes, the periodical checkpoint can flush the packets.
> 
> But, if many clients send many packets to the vm,
> there is a high 

Re: [PATCH v3 2/2] net/colo-compare.c: handling of the full primary or secondary queue

2020-03-27 Thread Derek Su
Lukas Straub  於 2020年3月28日 週六 上午1:46寫道:
>
> On Wed, 25 Mar 2020 17:43:54 +0800
> Derek Su  wrote:
>
> > The pervious handling of the full primary or queue is only dropping
> > the packet. If there are lots of clients to the guest VM,
> > the "drop" will lead to the lost of the networking connection
> > until next checkpoint.
> >
> > To address the issue, this patch drops the packet firstly.
> > Then, send all queued primary packets, remove all queued secondary
> > packets and do checkpoint.
> >
> > Signed-off-by: Derek Su 
> > ---
> >  net/colo-compare.c | 41 ++---
> >  1 file changed, 30 insertions(+), 11 deletions(-)
> >
> > diff --git a/net/colo-compare.c b/net/colo-compare.c
> > index cdd87b2aa8..1a52f50fbe 100644
> > --- a/net/colo-compare.c
> > +++ b/net/colo-compare.c
> > @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
> >  [SECONDARY_IN] = "secondary",
> >  };
> >
> > +enum {
> > +QUEUE_INSERT_ERR = -1,
> > +QUEUE_INSERT_OK = 0,
> > +QUEUE_INSERT_FULL = 1,
> > +};
> > +
> >  static int compare_chr_send(CompareState *s,
> >  const uint8_t *buf,
> >  uint32_t size,
> > @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet 
> > *pkt, uint32_t *max_ack)
> >  }
> >
> >  /*
> > - * Return 0 on success, if return -1 means the pkt
> > - * is unsupported(arp and ipv6) and will be sent later
> > + * Return QUEUE_INSERT_OK on success.
> > + * If return QUEUE_INSERT_FULL means list is full, and
> > + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> > + * will be sent later
> >   */
> >  static int packet_enqueue(CompareState *s, int mode, Connection **con)
> >  {
> > @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, 
> > Connection **con)
> >  if (parse_packet_early(pkt)) {
> >  packet_destroy(pkt, NULL);
> >  pkt = NULL;
> > -return -1;
> > +return QUEUE_INSERT_ERR;
> >  }
> >  fill_connection_key(pkt, );
> >
> > @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, 
> > Connection **con)
> >   "drop packet", colo_mode[mode]);
> >  packet_destroy(pkt, NULL);
> >  pkt = NULL;
> > +return QUEUE_INSERT_FULL;
> >  }
> >
> >  *con = conn;
> >
> > -return 0;
> > +return QUEUE_INSERT_OK;
> >  }
> >
> >  static inline bool after(uint32_t seq1, uint32_t seq2)
> > @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState 
> > *pri_rs)
> >  {
> >  CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> >  Connection *conn = NULL;
> > +int ret;
> >
> > -if (packet_enqueue(s, PRIMARY_IN, )) {
> > +ret = packet_enqueue(s, PRIMARY_IN, );
> > +if (ret == QUEUE_INSERT_OK) {
> > +/* compare packet in the specified connection */
> > +colo_compare_connection(conn, s);
> > +} else if (ret == QUEUE_INSERT_FULL) {
> > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > +colo_compare_inconsistency_notify(s);
> > +} else {
> >  trace_colo_compare_main("primary: unsupported packet in");
> >  compare_chr_send(s,
> >   pri_rs->buf,
> >   pri_rs->packet_len,
> >   pri_rs->vnet_hdr_len,
> >   false);
> > -} else {
> > -/* compare packet in the specified connection */
> > -colo_compare_connection(conn, s);
> >  }
> >  }
> >
> > @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState 
> > *sec_rs)
> >  {
> >  CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> >  Connection *conn = NULL;
> > +int ret;
> >
> > -if (packet_enqueue(s, SECONDARY_IN, )) {
> > -trace_colo_compare_main("secondary: unsupported packet in");
> > -} else {
> > +ret = packet_enqueue(s, SECONDARY_IN, );
> > +if (ret == QUEUE_INSERT_OK) {
> >  /* compare packet in the specified connection */
> >  colo_compare_connection(conn, s);
> > +} else if (ret == QUEUE_INSERT_FULL) {
> > +g_queue_foreach(>conn_list, colo_flush_packets, s);
> > +colo_compare_inconsistency_notify(s);
> > +} else {
> > +trace_colo_compare_main("secondary: unsupported packet in");
> >  }
> >  }
> >
>
> Hi,
> I don't think we have to flush here because the (post-)checkpoint event will 
> flush the packets for us.
>

Hi,
Yes, the periodical checkpoint can flush the packets.

But, if many clients send many packets to the vm,
there is a high probability that packets are dropped because the
primary/secondary queues are always full.
It causes lots of re-transmission between clients and vm and
deteriorate service response to clients.

Sincerely,
Derek Su

> Regards,
> Lukas Straub



Re: [PATCH v3 2/2] net/colo-compare.c: handling of the full primary or secondary queue

2020-03-27 Thread Lukas Straub
On Wed, 25 Mar 2020 17:43:54 +0800
Derek Su  wrote:

> The pervious handling of the full primary or queue is only dropping
> the packet. If there are lots of clients to the guest VM,
> the "drop" will lead to the lost of the networking connection
> until next checkpoint.
> 
> To address the issue, this patch drops the packet firstly.
> Then, send all queued primary packets, remove all queued secondary
> packets and do checkpoint.
> 
> Signed-off-by: Derek Su 
> ---
>  net/colo-compare.c | 41 ++---
>  1 file changed, 30 insertions(+), 11 deletions(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index cdd87b2aa8..1a52f50fbe 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -125,6 +125,12 @@ static const char *colo_mode[] = {
>  [SECONDARY_IN] = "secondary",
>  };
>  
> +enum {
> +QUEUE_INSERT_ERR = -1,
> +QUEUE_INSERT_OK = 0,
> +QUEUE_INSERT_FULL = 1,
> +};
> +
>  static int compare_chr_send(CompareState *s,
>  const uint8_t *buf,
>  uint32_t size,
> @@ -211,8 +217,10 @@ static int colo_insert_packet(GQueue *queue, Packet 
> *pkt, uint32_t *max_ack)
>  }
>  
>  /*
> - * Return 0 on success, if return -1 means the pkt
> - * is unsupported(arp and ipv6) and will be sent later
> + * Return QUEUE_INSERT_OK on success.
> + * If return QUEUE_INSERT_FULL means list is full, and
> + * QUEUE_INSERT_ERR means the pkt is unsupported(arp and ipv6) and
> + * will be sent later
>   */
>  static int packet_enqueue(CompareState *s, int mode, Connection **con)
>  {
> @@ -234,7 +242,7 @@ static int packet_enqueue(CompareState *s, int mode, 
> Connection **con)
>  if (parse_packet_early(pkt)) {
>  packet_destroy(pkt, NULL);
>  pkt = NULL;
> -return -1;
> +return QUEUE_INSERT_ERR;
>  }
>  fill_connection_key(pkt, );
>  
> @@ -258,11 +266,12 @@ static int packet_enqueue(CompareState *s, int mode, 
> Connection **con)
>   "drop packet", colo_mode[mode]);
>  packet_destroy(pkt, NULL);
>  pkt = NULL;
> +return QUEUE_INSERT_FULL;
>  }
>  
>  *con = conn;
>  
> -return 0;
> +return QUEUE_INSERT_OK;
>  }
>  
>  static inline bool after(uint32_t seq1, uint32_t seq2)
> @@ -995,17 +1004,22 @@ static void compare_pri_rs_finalize(SocketReadState 
> *pri_rs)
>  {
>  CompareState *s = container_of(pri_rs, CompareState, pri_rs);
>  Connection *conn = NULL;
> +int ret;
>  
> -if (packet_enqueue(s, PRIMARY_IN, )) {
> +ret = packet_enqueue(s, PRIMARY_IN, );
> +if (ret == QUEUE_INSERT_OK) {
> +/* compare packet in the specified connection */
> +colo_compare_connection(conn, s);
> +} else if (ret == QUEUE_INSERT_FULL) {
> +g_queue_foreach(>conn_list, colo_flush_packets, s);
> +colo_compare_inconsistency_notify(s);
> +} else {
>  trace_colo_compare_main("primary: unsupported packet in");
>  compare_chr_send(s,
>   pri_rs->buf,
>   pri_rs->packet_len,
>   pri_rs->vnet_hdr_len,
>   false);
> -} else {
> -/* compare packet in the specified connection */
> -colo_compare_connection(conn, s);
>  }
>  }
>  
> @@ -1013,12 +1027,17 @@ static void compare_sec_rs_finalize(SocketReadState 
> *sec_rs)
>  {
>  CompareState *s = container_of(sec_rs, CompareState, sec_rs);
>  Connection *conn = NULL;
> +int ret;
>  
> -if (packet_enqueue(s, SECONDARY_IN, )) {
> -trace_colo_compare_main("secondary: unsupported packet in");
> -} else {
> +ret = packet_enqueue(s, SECONDARY_IN, );
> +if (ret == QUEUE_INSERT_OK) {
>  /* compare packet in the specified connection */
>  colo_compare_connection(conn, s);
> +} else if (ret == QUEUE_INSERT_FULL) {
> +g_queue_foreach(>conn_list, colo_flush_packets, s);
> +colo_compare_inconsistency_notify(s);
> +} else {
> +trace_colo_compare_main("secondary: unsupported packet in");
>  }
>  }
>  

Hi,
I don't think we have to flush here because the (post-)checkpoint event will 
flush the packets for us.

Regards,
Lukas Straub


pgpojyvNhZArC.pgp
Description: OpenPGP digital signature