Hi,
   Anyone encountered this problem?  The process still occupies large physical 
memory when no any packets transmitting, after a turn of running of big 
traffic.  Seems some memory used by ZMQ not freed back to system. I'm wondering 
if this is normal or not?  Or anything wrong with me?  Thanks.

# ./zmq-mem-test -c -p 169.254.0.10 -l 512 -n 10000000
Client started on tcp://169.254.0.10:5999.
Actual sent msg count: 60000.

Total 9971945 freed in customized freer.

20012 root      20   0 1950.1m 214.7m   0.0  1.8   0:05.03 S              `- 
zmq-mem-test
20012 root      20   0 1950.1m 214.7m   0.0  1.8   0:05.03 S              `- 
zmq-mem-test

The attached file is the example code with zero copy method zmq_msg_init_data 
and zmq_msg_send.  I'm just testing if all malloced memory from heap can be 
freed by asynchronous ZMQ handler. HWM is set with 30000 with no blocking 
sending, so many sending failed because of queue full.

./zmq-mem-test -c -p 169.254.0.10 -l 512 -n 10000000
./zmq-mem-test -s -p 169.254.0.10

--------^_^--------
Best Regards
Yan Limin

#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <zmq.h>

void *g_context = NULL; /* ZMQ_CONTEXT*/
void *g_router = NULL; /* ZMQ_ROUTER */
void *g_dealer = NULL; /* ZMQ_DEALER */
int g_as_server = 0;
int g_send_num = 1;
int g_msg_size = 100;
char g_server_ip[32] = {0};
int g_freer_count = 0;

#define atomic_inc(x)   __sync_add_and_fetch((x),1)

#define MAX_MSG_LEN (65536 + 40)
pthread_mutex_t msg_ext_mutex = PTHREAD_MUTEX_INITIALIZER;

#pragma pack(1)
typedef struct peer_id{
    uint16_t node_addr;
    uint32_t app_id;
    uint16_t queue_id;
}peer_id_t;
#pragma pack()

/* should be multi-thread safe */
static void __customized_freer(void* data, void* hint)
{
    free(data);
    atomic_inc(&g_freer_count);
}

static void __do_msg_sending(void)
{
    int rc = 0;
    char *buffer;
    peer_id_t   dst;
    int actual_sent_count = 0;
    const char fake_src[] = {0, 0x55, 0xaa, 0x55, 0xaa}; /* fake ZMQ 5 bytes 
src */

    do {
        zmq_msg_t msg;
        buffer = malloc(MAX_MSG_LEN);
        if (!buffer) {
            usleep(1000);
            continue;
        }

        pthread_mutex_lock(&msg_ext_mutex);
        zmq_send(g_dealer, (void *)fake_src, 5, ZMQ_SNDMORE);
        zmq_send(g_dealer, (void *)&dst, sizeof(dst), ZMQ_SNDMORE);
        zmq_msg_init_data(&msg, buffer, g_msg_size, __customized_freer, NULL);
        rc = zmq_msg_send(&msg, g_dealer, 0);
        pthread_mutex_unlock(&msg_ext_mutex);

        if (rc != -1) {
            actual_sent_count++;
        }
        zmq_msg_close(&msg);

    } while (--g_send_num > 0);

    printf("Actual sent msg count: %d. \n", actual_sent_count);
}

static int __run_client(void)
{
    int rc;
    int hwm_limit = 30000;
    uint32_t linger_time = 0;
    int send_timeout = 0;

    uint16_t id = 0x55AA;

    g_dealer = zmq_socket(g_context, ZMQ_DEALER);
    assert(NULL != g_dealer);

    rc = zmq_setsockopt(g_dealer, ZMQ_IDENTITY, &id, sizeof(id));
    assert(rc == 0);
    rc = zmq_setsockopt(g_dealer, ZMQ_SNDHWM, &hwm_limit, sizeof(hwm_limit));
    assert(rc == 0);
    rc = zmq_setsockopt(g_dealer, ZMQ_LINGER, &linger_time, 
sizeof(linger_time));
    assert(rc == 0);
    rc = zmq_setsockopt(g_dealer, ZMQ_SNDTIMEO, &send_timeout, 
sizeof(send_timeout));
    assert(rc == 0);

    rc = zmq_connect(g_dealer, g_server_ip);
    assert(rc == 0);
    printf("Client started on %s.\n", g_server_ip);

    __do_msg_sending();

    zmq_close(&g_dealer);

    /* Wait any key press just for memory checking */
    getchar();
    printf("Total %d freed in customized freer.\n", g_freer_count);

    return 0;
}

static int __run_server(void)
{
    int rc;
    int hand_over = 1;
    uint32_t linger_time = 0;
    int hwm_limit = 30000;

    g_router = zmq_socket(g_context, ZMQ_ROUTER);
    assert(NULL != g_router);

    rc = zmq_setsockopt(g_router, ZMQ_ROUTER_HANDOVER, &hand_over, 
sizeof(hand_over));
    assert(rc == 0);
    rc = zmq_setsockopt(g_router, ZMQ_LINGER, &linger_time, 
sizeof(linger_time));
    assert(rc == 0);
    rc = zmq_setsockopt(g_router, ZMQ_RCVHWM, &hwm_limit, sizeof(hwm_limit));
    assert(rc == 0);

    rc = zmq_bind(g_router, g_server_ip);
    assert(rc == 0);
    printf("Server started on %s.\n", g_server_ip);

    do {
        zmq_msg_t msg;
        rc = zmq_msg_recv(&msg, g_router, 0);
        if (rc != -1) {
            zmq_msg_close(&msg);
        }
    } while (1);

    zmq_close(&g_router);
    return 0;
}

static void __helpinfo(void)
{
    printf("-h: Show help info.\n");
    printf("-c: Start as client.\n");
    printf("-s: Start as server.\n");
    printf("-n: Message number to be sent. Only for client. Default:1.\n");
    printf("-p: Server IP address.\n");
    printf("-l: Message size. Default:100.\n");
    printf("\n");
}

static int __parse_args(char argc, char* argv[])
{
    int opt = 0;

    if (argc < 3) {
        __helpinfo();
        return -1;
    }
    while ((opt = getopt(argc, argv, "hcsp:n:l:")) != -1) {
        switch (opt) {
            case 'c':
                g_as_server = 0;
                break;
            case 's':
                g_as_server = 1;
                break;
            case 'p':
                memset(g_server_ip, 0, 32);
                sprintf(g_server_ip, "tcp://%s:5999", optarg);
                break;
            case 'n':
                g_send_num = atoi(optarg);
                break;
            case 'l':
                g_msg_size = atoi(optarg);
                break;
            case 'h':
                __helpinfo();
                break;
            default:
                break;
        }
    }

    return 0;
}

int main(int argc, char* argv[])
{
    int rc;

    rc = __parse_args(argc, argv);
    if (rc != 0) {
        return rc;
    }

    g_context = zmq_ctx_new();
    assert(NULL != g_context);
    zmq_ctx_set(g_context, ZMQ_IO_THREADS, 2);

    if (g_as_server == 0) {
        __run_client();
    }
    else {
        __run_server();
    }

    zmq_ctx_shutdown(&g_context);

    return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to