Dear,

As the guide said, the workers really were running in parallel, if there are 
more than one of them. In my example, the parent process acts as the ventilator 
and sink, a number of its child processes act as worker. The fact is that 
always only one child work in serial. 


I look forward to hearing more from you。


best regards
Yanming Wu






#include "zhelpers.h"
#include <pthread.h>

///////////////  libev
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <ev.h>
////////////////

#define MAXLEN 1023
#define PORT 9999
#define ADDR_IP "127.0.0.1"


void worker()
{
        printf("[%d] start\n", getpid());
        void *context = zmq_init (1);

    //  Socket to receive messages on
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_connect (receiver, "tcp://localhost:5557");

    //  Socket to send messages to
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_connect (sender, "tcp://localhost:5558");

    //sleep(20);

    //  Process tasks forever
    while (1) {
        char *string = s_recv (receiver);
        //  Simple progress indicator for the viewer
        fflush (stdout);
        printf ("[%d]%s --------------------------------------\n", getpid(), 
string);

        //  Do the work
        s_sleep (atoi (string));
        free (string);

                char bout[100] = {0};
                sprintf(bout, "[%d]%s", getpid(), string);
        //  Send results to sink
        s_send (sender, bout);
    }
    zmq_close (receiver);
    zmq_close (sender);
    zmq_term (context);

        exit(0);
}

static void *
send_routine (void *context) {
        printf("start send...\n");

    //  Socket to send messages on
    void *sender = zmq_socket (context, ZMQ_PUSH);
    zmq_bind (sender, "tcp://*:5557");
    //  Initialize random number generator
    srandom ((unsigned) time (NULL));

    //  Send 100 tasks
    int task_nbr_total = 100;
    int task_nbr = 0;
    int total_msec = 0;     //  Total expected cost in msecs


    for (task_nbr = 0; task_nbr < task_nbr_total; task_nbr++) {
        int workload;
        //  Random workload from 1 to 100msecs
        workload = randof (100) + 1;
        total_msec += workload;
        char string [10];
        sprintf (string, "%d", workload);
        s_send (sender, string);
    }

    printf ("Total expected cost: %d msec\n", total_msec);

        zmq_close (sender);
}

static void *
recv_routine (void *context) {

        printf("start recv...\n");

    //  Socket to send start of batch message on
    void *sink = zmq_socket (context, ZMQ_PULL);
    zmq_bind (sink, "tcp://*:5558");

    int64_t start_time = s_clock ();

    //  Process 100 confirmations
    for ( int task_nbr = 0; task_nbr < 100; task_nbr++) {
        char *string = s_recv (sink);
        printf("%s +++++++++++++++++++++++++++++++++++++++++\n", string);
                free (string);
                //if ((task_nbr / 10) * 10 == task_nbr)
        //    printf (":");
        //else
        //    printf ("."); 
        }
    
        //  Calculate and report duration of batch
    printf ("Total elapsed time: %d msec\n",(int) (s_clock () - start_time));

        zmq_close (sink);
}



int main (int argc, char**argv)
{
  /////////////////////////////////////////////////////////////
        //spawn child
  pid_t pid;

    for(int i = 0; i < 5; i++)
    {
        if( (pid = fork()) < 0 )
        {
            printf("error child\n");
            return -1;
        }
        else if( pid == 0)
        {
            worker();
        }
    }


        ////////////////////////////////////////////////////////////
        void *context = zmq_init (1);


        
        ////////////////////////////////////////////////////////////
        pthread_t send_t;
        pthread_create (&send_t, NULL, send_routine, context);  

        ////////////////////////////////////////////////////////////


        ////////////////////////////////////////////////////////////    
    pthread_t recv_t;
    pthread_create (&recv_t, NULL, recv_routine, context); 
        ////////////////////////////////////////////////////////////    
 

        pthread_join(recv_t,NULL);
        pthread_join(send_t,NULL);


    zmq_term (context);
    return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to