Dear,
As the guide said, the workers really were running in parallel, if there are
more than one of them. In my example, the parent process acts as the ventilator
and sink, a number of its child processes act as worker. The fact is that
always only one child work in serial.
I look forward to hearing more from you。
best regards
Yanming Wu
#include "zhelpers.h"
#include <pthread.h>
/////////////// libev
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <ev.h>
////////////////
#define MAXLEN 1023
#define PORT 9999
#define ADDR_IP "127.0.0.1"
void worker()
{
printf("[%d] start\n", getpid());
void *context = zmq_init (1);
// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");
// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");
//sleep(20);
// Process tasks forever
while (1) {
char *string = s_recv (receiver);
// Simple progress indicator for the viewer
fflush (stdout);
printf ("[%d]%s --------------------------------------\n", getpid(),
string);
// Do the work
s_sleep (atoi (string));
free (string);
char bout[100] = {0};
sprintf(bout, "[%d]%s", getpid(), string);
// Send results to sink
s_send (sender, bout);
}
zmq_close (receiver);
zmq_close (sender);
zmq_term (context);
exit(0);
}
static void *
send_routine (void *context) {
printf("start send...\n");
// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Initialize random number generator
srandom ((unsigned) time (NULL));
// Send 100 tasks
int task_nbr_total = 100;
int task_nbr = 0;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < task_nbr_total; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
zmq_close (sender);
}
static void *
recv_routine (void *context) {
printf("start recv...\n");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PULL);
zmq_bind (sink, "tcp://*:5558");
int64_t start_time = s_clock ();
// Process 100 confirmations
for ( int task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (sink);
printf("%s +++++++++++++++++++++++++++++++++++++++++\n", string);
free (string);
//if ((task_nbr / 10) * 10 == task_nbr)
// printf (":");
//else
// printf (".");
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",(int) (s_clock () - start_time));
zmq_close (sink);
}
int main (int argc, char**argv)
{
/////////////////////////////////////////////////////////////
//spawn child
pid_t pid;
for(int i = 0; i < 5; i++)
{
if( (pid = fork()) < 0 )
{
printf("error child\n");
return -1;
}
else if( pid == 0)
{
worker();
}
}
////////////////////////////////////////////////////////////
void *context = zmq_init (1);
////////////////////////////////////////////////////////////
pthread_t send_t;
pthread_create (&send_t, NULL, send_routine, context);
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
pthread_t recv_t;
pthread_create (&recv_t, NULL, recv_routine, context);
////////////////////////////////////////////////////////////
pthread_join(recv_t,NULL);
pthread_join(send_t,NULL);
zmq_term (context);
return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev