On Tue, Jan 24, 2017 at 12:48:49AM +0100, Marko Tiikkaja wrote:
> Did you forget the attachment?

I guess I must have.  Attached this time.
/*
 * Copyright (c) 2016 Two Sigma Open Source, LLC.
 * All Rights Reserved
 *
 * Permission to use, copy, modify, and distribute this software and its
 * documentation for any purpose, without fee, and without a written agreement
 * is hereby granted, provided that the above copyright notice and this
 * paragraph and the following two paragraphs appear in all copies.
 *
 * IN NO EVENT SHALL TWO SIGMA OPEN SOURCE, LLC BE LIABLE TO ANY PARTY FOR
 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
 * EVEN IF TWO SIGMA OPEN SOURCE, LLC HAS BEEN ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 * TWO SIGMA OPEN SOURCE, LLC SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING,
 * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 * FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS"
 * BASIS, AND TWO SIGMA OPEN SOURCE, LLC HAS NO OBLIGATIONS TO PROVIDE
 * MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 */

/*
 * Based on src/test/examples/testlibpq2.c from Postgres 9.4.4
 *
 * pqasyncnotifier - LISTENs and reports notifications without polling
 *
 * Usage: pqasyncnotifier CONNINFO TABLE_NAME
 */

#include <twosigma.h>
#ifdef WIN32
#include <windows.h>
#endif

#include <sys/time.h>
#include <sys/types.h>
#include <errno.h>
#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <time.h>
#include <libpq-fe.h>

static void
err(PGconn *conn, ConnStatusType rcc, ExecStatusType rce, const char *msg)
{
    int e = errno;

    fprintf(stderr, "Error: %s: ", msg);
    if (conn == NULL || (rcc == CONNECTION_OK && rce == PGRES_COMMAND_OK))
        fprintf(stderr, "%s", strerror(e));
    else if (conn != NULL && (rcc != CONNECTION_OK || rce != PGRES_COMMAND_OK))
        fprintf(stderr, "%s", PQerrorMessage(conn));
    else
        fprintf(stderr, "Unknown error");
    fprintf(stderr, "\n");
    if (conn)
        PQfinish(conn);
    exit(e ? e : 1);
}

static struct option lopts[] = {
    {"help", 0, 0, 'h'},
    {"verbose", 0, 0, 'v'},
    {"daemonize", 0, 0, 'D'},
    {"include-payload", 0, 0, 'd'},
    {"include-pid", 0, 0, 'p'},
    {"include-time", 0, 0, 't'},
    {"include-channel-name", 0, 0, 'c'},
};

static void
usage(const char *prog, int e)
{
    size_t i;

    if (strrchr(prog, '/') != NULL && strrchr(prog, '/')[1] != '\0')
        prog = strrchr(prog, '/') + 1;

    fprintf(stderr, "Usage: %s [options] CONNINFO TABLE-NAME ...\n", prog);
    fprintf(stderr, "    Options:\n");
    for (i = 0; i < sizeof(lopts)/sizeof(lopts[0]); i++)
        fprintf(stderr, "\t-%c, --%s\n", lopts[i].val, lopts[i].name);
    exit(e);
}

static
void
daemonize(int ready)
{
    static int pipe_fds[2] = {-1, -1};
    pid_t pid;
    char code = 1;

    if (ready) {
        errno = 0;
        if (pipe_fds[1] != -1) {
            while (write(pipe_fds[1], "", sizeof("")) != 1 && errno != EINTR)
                ;
            if (errno != 0)
                err(NULL, 0, 0, "write() failed while daemonizing");
            if (close(pipe_fds[1]) != 0)
                err(NULL, 0, 0, "close() failed while daemonizing");
            pipe_fds[1] = -1;
        }
        printf("READY: %jd\n", (intmax_t)getpid());
        return;
    }

    if (pipe(pipe_fds) == -1)
        err(NULL, 0, 0, "pipe() failed");
    pid = fork();
    if (pid == -1)
        err(NULL, 0, 0, "fork() failed");
    if (pid == 0) {
        (void) close(pipe_fds[0]);
        pipe_fds[0] = -1;
        return;
    }
    (void) close(pipe_fds[1]);
    pipe_fds[1] = -1;
    while (read(pipe_fds[0], &code, sizeof(code)) != 1 && errno != EINTR)
        ;
    _exit(code);
}

int
main(int argc, char **argv)
{
    const char          *prog = argv[0];
    const char          *conninfo;
    PGconn              *conn = NULL;
    PGresult            *res;
    PGnotify            *notify;
    ConnStatusType      rcc = CONNECTION_OK;
    ExecStatusType      rce = PGRES_COMMAND_OK;
    int                 include_relname = 0;
    int                 include_payload = 0;
    int                 include_pid = 0;
    int                 include_time = 0;
    int                 verbose = 0;
    char                c;

    setlinebuf(stdout);

    while ((c = getopt_long(argc, argv, "+Dcdhptv", lopts, NULL)) != -1) {
        switch (c) {
        case 'D':
            daemonize(0);
        case 'c':
            include_relname = 1;
            break;
        case 'd':
            include_payload = 1;
            break;
        case 'h':
            usage(prog, 0);
            break;
        case 'p':
            include_pid = 1;
            break;
        case 't':
            include_time = 1;
            break;
        case 'v':
            verbose = 1;
            break;
        default:
            usage(prog, 1);
            break;
        }
    }

    argc -= optind;
    argv += optind;

    if (argc < 2)
        usage(prog, 1);

    conninfo = argv[0];

    argc--;
    argv++;

    conn = PQconnectdb(conninfo);
    if (conn == NULL || (rcc = PQstatus(conn)) != CONNECTION_OK)
        err(conn, rcc, rce, "Connection to database failed");

    for (; argc > 0; argc--, argv++) {
        char *listen_cmd;

        if (asprintf(&listen_cmd, "LISTEN %s", argv[0]) == -1 ||
            listen_cmd == NULL)
            err(conn, rcc, rce, "Could not format SQL statement");

        if (verbose)
            printf("Executing %s\n", listen_cmd);

        res = PQexec(conn, listen_cmd);
        free(listen_cmd);
        if (res == NULL)
            err(conn, rcc, rce, "PQexec failed (out of memory?)");

        rce = PQresultStatus(res);
        PQclear(res);
        if (rce != PGRES_COMMAND_OK)
            err(conn, rcc, rce, "LISTEN command failed");
    }

    daemonize(1);

    for (;;) {
        int     sock;
        fd_set  input_mask;

        sock = PQsocket(conn);
        if (sock < 0)
            err(conn, rcc, rce, "PQsocket failed");

        FD_ZERO(&input_mask);
        FD_SET(sock, &input_mask);

        if (select(sock + 1, &input_mask, NULL, NULL, NULL) < 0)
            err(conn, rcc, rce, "select() failed");

        PQconsumeInput(conn);
        while ((notify = PQnotifies(conn)) != NULL) {
            if (include_time)
                printf("%jd ", (long)time(NULL));
            printf("NOTIFY");
            if (include_relname)
                printf(" %s", notify->relname);
            if (include_pid)
                printf(" %d", notify->be_pid);
            if (include_payload)
                printf(": %s", notify->extra);
            printf("\n");
            PQfreemem(notify);
        }
    }

    PQfinish(conn);
    return 0;
}

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to