OK, so I went back and I fixed a couple of issues and reattached the two
modified test programs, added RCV/SND buffer shaping and now it uses
zmq_msg_init_data (zero-copy) for better performance. I'm getting about
2.5GB/s avg at best which is a lot better then with remote_thr local_thr
but still a 25% less then what I'm expecting at least 3.4GB/s.
When I initiate 4 simultaneous procesess(not threads) for each client and
server via separate ports the total does add up to ~3.3GB/s as it should.
The trouble is for that to work that way I need to bind 4 ports and the
whole point in using accept is to have multiple connections on the same
port traditionally.
Is there a way to achieve the desired throughput via 0MQ without using
separate ports for each socket? I think using multiple connections (via
separate threads) on the same ZMQ socket should naturally do it but
according to the results it doesn't happen.
On Mon, Jan 7, 2013 at 7:16 PM, A. Mark <[email protected]> wrote:
> Hello,
>
> I'm very interested in porting my current transfer engine to 0MQ. The
> current engine is written in pure BSD sockets and has certain limitations
> that would be easily overcome by QMQ's intelligent and versatile design.
> However my main concern is performance on very long messages in access of
> 1MB. The current backbone MT design is the following:
>
>
> control node (client ) <---> server A--- worker node 1 <---> worker node 1
> ------ server B
>
> |
> |
> |------------ worker node 2 <--->
> worker node 2 -----------|
>
> | |
> --------------worker node N <--->
> worker node N ----------
>
> So the control client controls whatever task needs to be performed by
> submitting requests to a server, the actual work is done by the worker
> nodes in each separate thread on the server. The worker nodes are
> synchronized across the two servers but they work independently since they
> are working on the same task. Each worker node has it's own FD but connect
> to the same TCP address and port. The main task of each node is to perform
> some transformation on some large data buffer from a buffer pool then push
> the finished result to the other server. My current benchmarks gives me
> 3.5GBytes/s using TCP over the local loop when simply pushing the buffers
> without doing any work.
>
> I ran the 0MQ benchmarks local_thr and remote_thr, and the performance is
> only 1.5GB/s at best, with large buffers(messages) and lower with small
> ones. I'm also concerned looking at the benchmarks for the 10GE test. My
> current engine can perform at a steady 1.1GBytes/s with large buffers over
> 10GE.
>
> I've also tried a modified version of the two benchmarks to try to emulate
> the above situation, but the performance is about the same. The modified MT
> code is attached.
>
> Is there something else I need to do to get the best performance out of
> 0MQ using MT for this work flow engine?
>
>
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
const char *connect_to;
int message_count;
int message_size;
int threads = 1;
int workers = 1;
void my_free (void *data, void *hint)
{
//free (data);
}
static void *worker_routine (void *ctx) {
int rc,i;
void *buf = NULL;
zmq_msg_t msg;
if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}
void *s = zmq_socket (ctx, ZMQ_PUSH);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return NULL;
}
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
int sndbuflen;
size_t sndbuflenlen = (size_t)sizeof sndbuflen;
rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return NULL;
}
printf("RCVBUF=%d before\n", sndbuflen);
sndbuflen = 1024*1024*32;
rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return NULL;
}
rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return NULL;
}
printf("RCVBUF=%d after\n", sndbuflen);
rc = zmq_connect (s, connect_to);
if (rc != 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
return NULL;
}
for (i = 0; i != message_count; i++) {
rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
if (rc != 0) {
printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
return NULL;
}
rc = zmq_msg_send( &msg, s, 0);
if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
return NULL;
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
exit (1);
}
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return NULL;
}
free( buf);
return NULL;
}
int main (int argc, char *argv [])
{
void *ctx;
int rc;
int i;
void *p;
if (argc != 6) {
printf ("usage: remote_thr <connect-to> <message-size> <zmq-threads> <workers>"
"<message-count>\n");
return 1;
}
connect_to = argv [1];
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
threads = atoi (argv [4]);
workers = atoi (argv [5]);
ctx = zmq_init( threads);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
pthread_t worker[128];
for (i = 0; i < workers; i++) {
pthread_create (&worker[i], NULL, worker_routine, ctx);
printf("Worker %d spawned\n", i);
}
sleep(1);
for (i = 0; i < workers; i++) {
pthread_join( worker[i], &p);
printf("Worker %d joined\n", i);
}
rc = zmq_term (ctx);
if (rc != 0) {
printf ("error in zmq_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>
typedef struct US_TIMER US_TIMER;
struct US_TIMER{
struct timeval time_was;
struct timeval time_now;
};
/* Records the current timer state
*/
void tm_init( US_TIMER *t){
if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}
t->time_was = t->time_now;
}
/* Returns the time passed in microsecond precision in seconds since last init
of timer.
*/
float tm_secs( US_TIMER *t){
register float seconds;
if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}
seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
(((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));
t->time_was = t->time_now;
return( seconds);
}
void my_free (void *data, void *hint)
{
// free (data);
}
int main (int argc, char *argv [])
{
US_TIMER timer;
const char *bind_to;
int message_count;
size_t message_size;
int threads;
void *ctx;
void *s;
int rc;
int i;
void *buf = NULL;
if (argc != 5) {
printf ("usage: local_thr <bind-to> <message-size> <message-count> <threads>\n");
return 1;
}
bind_to = argv [1];
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
threads = atoi (argv [4]);
if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}
ctx = zmq_init (threads);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_PULL);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
int rcvbuflen;
size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}
printf("RCVBUF=%d before\n", rcvbuflen);
rcvbuflen = 1024*1024*32;
rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
if (rc != 0) {
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
return -1;
}
printf("RCVBUF=%d after\n", rcvbuflen);
printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));
rc = zmq_bind (s, bind_to);
if (rc != 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
tm_init( &timer);
for (i = 0; i != message_count; i++) {
rc = zmq_msg_recv (&msg, s, 0);
if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
return -1;
}
if ((rc = zmq_msg_size (&msg)) != message_size) {
printf ("message of incorrect size (%d) received in loop %d\n", rc, i);
return -1;
}
}
float secs = tm_secs( &timer);
float total = (((float) message_count) * ((float) message_size)) / 1000000000.0;
printf ("message size: %d Bytes, time: %f secs\n", (int) message_size, secs);
printf ("Through %.1f GB @ %.1f GB/s\n", total, total/secs);
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_term (ctx);
if (rc != 0) {
printf ("error in zmq_term: %s\n", zmq_strerror (errno));
return -1;
}
return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev