Hi,

I’m trying to write a small benchmark program using zeromq-4.0.4 that will
be used as a

prototype for higher level library. Test program creates a pair of
asynchronous sockets and

send a bunch of messages with no acknowledgement and a the end reads a
reply.


Surprisingly, this test program does not compare favourably with an
equivalent direct

implementation over TCP. I have the following timings for sending 10,000
messages of the

given size on the localhost:


packet   tcp   zmq       ratio

size                               zmq/tcp

(bytes)  (us)  (us)      (the smaller is better)

1        4597   4385         1.28
10       5505   4012       0.72

20       4507   3680       0.81

30       5672   4465       0.78

40       3949   7498       1.89

64       3795   4426       1.16

79       3785   4642       1.22

85       3753   7369       1.96
100      3789   7586      2.00
528      5547   8646      1.55
600      4695  13498     2.87
1000     5273  13378    2.53
1024     5489  15279    2.78
1360     5663  23681    4.18
2000     6633  31957    4.81
10000   25243 104826   4.15



Regardless that deviation is big (not represented in table) the general
ratio remains the same,

ZeroMQ is on par with tcp for small messages, and works much worse with
bigger one.


Both the small message and big message results are surprising to me. For
one, I would have

thought that any ZeroMQ overhead would be increasingly insignificant as one
scales up the

message size, yet these results seem to suggest the opposite. Second, I
would have thought

that ZeroMQ’s batching makes a huge difference in throughput, especially
when sending lots

of small messages. Yet I am seeing no case where ZeroMQ actually has better
throughput

than TCP. Mind, I am discussing *throughput* here, not *latency*, which I
do understand

can only be slightly worse in ZeroMQ as compared to TCP.


Is there something I am misunderstanding here? I have gone through several
`iterations of

my benchmarks, but perhaps you can point out any problem with it?

Also I wanted to ask about the best pattern for the following benchmarks.
This benchmark

is the core of another program where there are a  number of hosts that may
be connected with

“one-direction” connections, and sent asynchronous messages over that
connections, the only

requirement is that messages should not be lost (or thrown away due to
reach of the HWM)

or there should be a notification about missing messages, so library can
close connection and

notify user about problem.


At this time I’ve found that PUSH->PULL pattern works well here, however,
maybe it’s

possible to use some other pattern to reduce cost. Because with  PUSH->PULL
pattern on

each node I have to create 1 PULL socket, and N PUSH sockets (one per
remote host we are

connected to).


I hope that attached emails will be shown in mailing shown in maillist.


Thanks in advance.

---

Alexander Vershilov
mail-to: [email protected]
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <math.h>

// Note: this is not consistent across CPUs (and hence across threads on multicore machines) 
double timestamp() {
  struct timeval tp;
  gettimeofday(&tp, NULL);
  return ((double) tp.tv_sec) * 1e6 + (double) tp.tv_usec;
}

int server(int pings, int size) {
  printf("starting server\n");

  struct addrinfo hints, *res;
  int error, server_socket;

  memset(&hints, 0, sizeof(hints));
  hints.ai_family   = PF_INET;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags    = AI_PASSIVE;

  error = getaddrinfo(NULL, "8080", &hints, &res); 
  if(error) {
    printf("server error: %s\n", gai_strerror(error));
    return -1;
  }

  server_socket = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  if(server_socket < 0) {
    printf("server error: could not create socket\n");
    return -1;
  }

  int yes = 1;
  if(setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0) {
    printf("server error: could not set socket options\n");
    return -1;
  }

  if(bind(server_socket, res->ai_addr, res->ai_addrlen) < 0) {
    printf("server error: could not bind to socket\n");
    return -1;
  }

  listen(server_socket, 5);

  int client_socket;
  struct sockaddr_storage client_addr;
  socklen_t addr_size;
  client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &addr_size); 

  int i = 0, counter=0;
  for(i=0;i<pings;i++) {
    char* buf = malloc(size);
    ssize_t read = 0;
    for (;size-read;) read+=recv(client_socket, buf, size-read, 0);
    counter++;
    free(buf);
  }
  size_t sent;
  for (;sizeof(int)-sent;) sent+=send(client_socket, &counter, sizeof(int)-sent, 0);

  freeaddrinfo(res);
  return 0;
}

int client(int pings, int size) {
  printf("starting client\n");
  
  struct addrinfo hints, *res;
  int error, client_socket, i;

  memset(&hints, 0, sizeof(hints));
  hints.ai_family   = PF_INET;
  hints.ai_socktype = SOCK_STREAM;

  error = getaddrinfo("127.0.0.1", "8080", &hints, &res); 
  if(error) {
    printf("client error: %s\n", gai_strerror(error));
    return -1;
  }

  client_socket = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
  if(client_socket < 0) {
    printf("client error: could not create socket\n");
    return -1;
  }

  if(connect(client_socket, res->ai_addr, res->ai_addrlen) < 0) {
    printf("client error: could not connect: %s\n", strerror(errno));
    return -1;
  }

  char * msg = malloc(size);
  for (i=0;i < size; i++) {
      msg[i] = i;
  }
  double timestamp_before = timestamp();
  for(i = 0; i < pings; i++) {
    int sent=0;
    for (;size-sent;) sent+=send(client_socket, msg, size-sent, 0);

  }

  size_t read = 0;
  for (;sizeof(int)-read;) read+=recv(client_socket, msg, sizeof(int)-read, 0);
  double timestamp_after = timestamp();
  fprintf(stderr, "%i %lf\n", size, timestamp_after - timestamp_before);
  printf("client did %d pings\n", pings);

  freeaddrinfo(res);
  return 0;
}

int usage(int argc, char** argv) {
  printf("usage: %s <number of pings>\n", argv[0]);
  return -1;
}

int main(int argc, char** argv) {
  if(argc != 3) {
    return usage(argc, argv);
  } 

  int pings = 0;
  int size  = 0;
  sscanf(argv[1], "%d", &pings);
  sscanf(argv[2], "%d", &size);
  if(fork() == 0) {
    // TODO: we should wait until we know the server is ready
    return client(pings,size);
  } else {
    return server(pings,size);
  }
}
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <math.h>
#include <zmq.h>

// Note: this is not consistent across CPUs (and hence across threads on multicore machines) 
double timestamp() {
  struct timeval tp;
  gettimeofday(&tp, NULL);
  return ((double) tp.tv_sec) * 1e6 + (double) tp.tv_usec;
}

int server(void *ctx, int pings, int size) {
  printf("starting server\n");
  int rc = 0;

  void *pull = zmq_socket(ctx, ZMQ_PULL);
  rc = zmq_bind(pull, "tcp://127.0.0.1:5876");

  void *push = zmq_socket(ctx, ZMQ_PUSH);
  rc = zmq_connect(push, "tcp://127.0.0.1:5877");

  int counter = 0, i = 0;
  for(i=0;i<pings;i++) {
    char* buf = malloc(size);
    int read = zmq_recv(pull, buf, size, 0);
    counter++;
    // printf("server received '%s'\n", buf);
    free(buf);
  }
  zmq_send(push, &counter, sizeof(int), 0);
  return 0;
}

int client(void *ctx, int pings, int size) {
  int rc = 0;
  printf("starting client\n");
  
  void *pull = zmq_socket(ctx, ZMQ_PULL);
  rc = zmq_bind(pull, "tcp://127.0.0.1:5877");

  void *push = zmq_socket(ctx, ZMQ_PUSH);
  rc = zmq_connect(push, "tcp://127.0.0.1:5876");

  int i;
  char * msg = malloc(size);
  for (i=0;i < size; i++) {
      msg[i] = i;
  }
  double timestamp_before = timestamp();
  for(i = 0; i < pings; i++) {
    zmq_send(push, msg, size, 0);
    // printf("client received '%s'\n", buf);
  }
  zmq_send(push, "", 0, 0);
  zmq_recv(pull, msg, sizeof(int), 0);

  double timestamp_after = timestamp();
  fprintf(stderr, "%i %lf\n", size, timestamp_after - timestamp_before);

  printf("client did %d pings\n", pings);
  return 0;
}

int usage(int argc, char** argv) {
  printf("usage: %s <number of pings> <packet size>\n", argv[0]);
  return -1;
}

int main(int argc, char** argv) {
  if(argc != 3) {
    return usage(argc, argv);
  } 

  void* ctx = zmq_ctx_new();

  int pings = 0;
  int size  = 0;
  sscanf(argv[1], "%d", &pings);
  sscanf(argv[2], "%d", &size);

  if(fork() == 0) {
    // TODO: we should wait until we know the server is ready
    return client(ctx, pings, size);
  } else {
    return server(ctx, pings, size);
  }
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to