RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: [email protected] Module: rpm Date: 28-May-2017 20:01:53 Branch: rpm-5_4 Handle: 2017052818015201 Modified files: (Branch: rpm-5_4) rpm CHANGES rpm/rpmio msqio.c poptIO.c rpmio.c rpmmalloc.c rpmmsq.h tmq.c Log: - msqio: replace pthreads monitor with yarn. Summary: Revision Changes Path 1.3501.2.564+1 -0 rpm/CHANGES 1.1.2.15 +422 -194 rpm/rpmio/msqio.c 1.94.2.31 +6 -4 rpm/rpmio/poptIO.c 1.230.2.53 +1 -3 rpm/rpmio/rpmio.c 1.29.2.8 +1 -1 rpm/rpmio/rpmmalloc.c 1.1.2.12 +66 -10 rpm/rpmio/rpmmsq.h 1.1.2.11 +65 -51 rpm/rpmio/tmq.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/CHANGES ============================================================================ $ cvs diff -u -r1.3501.2.563 -r1.3501.2.564 CHANGES --- rpm/CHANGES 27 May 2017 14:22:17 -0000 1.3501.2.563 +++ rpm/CHANGES 28 May 2017 18:01:52 -0000 1.3501.2.564 @@ -1,4 +1,5 @@ 5.4.17 -> 5.4.18: + - jbj: msqio: replace pthreads monitor with yarn. - jbj: rpmio: wire up the fooInit callback, pass the message to fooDbug. - jbj: rpmio: stub in fooDbug and fooInit callbacks for pools. - jbj: msqio: add a monitor to handle multiple readers/writers. @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/msqio.c ============================================================================ $ cvs diff -u -r1.1.2.14 -r1.1.2.15 msqio.c --- rpm/rpmio/msqio.c 27 May 2017 14:22:18 -0000 1.1.2.14 +++ rpm/rpmio/msqio.c 28 May 2017 18:01:52 -0000 1.1.2.15 @@ -22,6 +22,7 @@ #include <rpmmacro.h> #include <rpmcb.h> /* XXX rpmIsDebug() */ #include <rpmzlog.h> /* XXX rpmzLog type */ +#include <yarn.h> #define _RPMMSQ_INTERNAL #include "rpmmsq.h" @@ -39,50 +40,31 @@ #define MSQONLY(fd) assert(fdGetIo(fd) == msqio) -static int _lockdebug = 0; -static int _conddebug = -1; #define Z(_rc) assert((_rc) == 0) -#define LOCK(_m) \ - { if (_lockdebug) \ - rpmzLogAdd(msq->zlog, "*** LOCKING(%p) %s", &_m, #_m); \ - Z(pthread_mutex_lock(&_m)); \ - } -#define UNLOCK(_m) \ - { Z(pthread_mutex_unlock(&_m)); \ - if (_lockdebug) \ - rpmzLogAdd(msq->zlog, "*** UNLOCKED(%p) %s", &_m, #_m); \ - } -#define SIGNAL(_c) \ - { if (_conddebug) \ - rpmzLogAdd(msq->zlog, "*** SIGNAL(%p) %s", &_c, #_c); \ - Z(pthread_cond_broadcast(&_c)); \ - } -#define WAIT(_c,_m) \ - { if (_conddebug) \ - rpmzLogAdd(msq->zlog, "*** WAIT(%p) %s", &_c, #_c); \ - Z(pthread_cond_wait(&_c, &_m)); \ - } -#ifdef NOTYET -static int _yarndebug = -1; +static int _lockdebug = 0; +static int _conddebug = -1; #define PEEK(_bolt) yarnPeekLock(_bolt) #define POSSESS(_bolt) \ { yarnPossess(_bolt); \ - if (_yarndebug) fprintf(stderr, "*** POSSESS(%p)\t\t%ld\n", _bolt, PEEK(_bolt));\ + if (_lockdebug) \ + rpmzLogAdd(msq->zlog, "***\t POSSESS(%p)\t%ld", _bolt, PEEK(_bolt)); \ } #define RELEASE(_bolt) \ - { if (_yarndebug) fprintf(stderr, "*** RELEASE(%p)\t\t%ld\n", _bolt, PEEK(_bolt));\ + { if (_lockdebug) \ + rpmzLogAdd(msq->zlog, "***\t RELEASE(%p)\t%ld", _bolt, PEEK(_bolt)); \ yarnRelease(_bolt); \ } #define TWIST(_bolt, _op, _val) \ - { if (_yarndebug) fprintf(stderr, "*** TWIST(%p, %d, %d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\ + { if (_conddebug) \ + rpmzlogAdd(msq->zlog, "***\t TWIST(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt)); \ yarnTwist(_bolt, _op, _val); \ } #define WAITFOR(_bolt, _op, _val) \ - { if (_yarndebug) fprintf(stderr, "*** WAITFOR(%p, %d, %d)\t%ld\n", _bolt, _op, _val, PEEK(_bolt));\ + { if (_conddebug) \ + rpmzLogAdd(msq->zlog, "***\t WAITFOR(%p, %d, %d)\t%ld", _bolt, _op, _val, PEEK(_bolt));\ yarnWaitFor(_bolt, _op, _val); \ } -#endif #ifdef __cplusplus GENfree(rpmmsq) @@ -347,66 +329,142 @@ PRINT(d, msgsize); #endif /* WITH_MSQ */ if (stats) { - LOCK(msq->m); - PRINT(d, inflight); + PRINT(d, nqueued); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); PRINT(d, nagain); - UNLOCK(msq->m); } + PRINT(p, head); + PRINT(p, tail); + PRINT(d, nqueued); #undef PRINT } } /* =============================================================== */ -static void rpmmsqFini(void *_msq) +int _rpmaio_debug = 0; + +static char *LIO_[] = { "READ", "WRITE", "NOP", "DSYNC", "SYNC", "CLOSE" }; +static char * rpmaioDbug(void *_aio, char *b, size_t nb) { - rpmmsq msq = (rpmmsq) _msq; + rpmaio aio = (rpmaio) _aio; + size_t len = strlen(b); + char * be = b + len; + rpmioItem item = (rpmioItem) aio; + long use; - if (msq) { -SPEW("%s: inflight %d:%d sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->inflight, msq->msgmax, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain); - msq->msgmax = MSQ_MSGMAX; - msq->msgsize = MSQ_MSGSIZE; + if (aio && (use = PEEK(item->use)) > 1) { + int colorize = isatty(fileno(stderr)); +#define ANSI_BRIGHT_BLUE "\x1b[34;1m" +#define ANSI_RESET "\x1b[0m" + *be++ = '\n'; + if (colorize) be = stpcpy(be, ANSI_BRIGHT_BLUE); + be += sprintf(be, "========================== aio(%p) use %ld ix %d\n", + aio, use, aio->ix); +#define PRINT_AIO(_fmt, _foo) \ + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb._foo); } +#define PRINT_AIO_SIGEV(_fmt, _foo) \ + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, aio->cb.aio_sigevent._foo); } + PRINT_AIO(d, aio_fildes); + be += sprintf(be, "%25s: %s\n", "aio_lio_opcode", + LIO_[aio->cb.aio_lio_opcode % (sizeof(LIO_)/sizeof(LIO_[0]))]); + if (aio->cb.aio_reqprio) + PRINT_AIO(d, aio_reqprio); + PRINT_AIO(p, aio_buf); + PRINT_AIO(zd, aio_nbytes); + switch (aio->cb.aio_sigevent.sigev_notify) { + default: + break; + case SIGEV_THREAD: + PRINT_AIO_SIGEV(p, sigev_value.sival_ptr); + PRINT_AIO_SIGEV(d, sigev_signo); + PRINT_AIO_SIGEV(d, sigev_notify); + PRINT_AIO_SIGEV(p, sigev_notify_function); + PRINT_AIO_SIGEV(p, sigev_notify_attributes); + break; + } +#undef PRINT_AIO_SIGEV +#undef PRINT_AIO + be--; + if (colorize) be = stpcpy(be, ANSI_RESET); + *be = '\0'; + } + return b; +} - Z(pthread_mutex_destroy(&msq->m)); - Z(pthread_cond_destroy(&msq->e)); - Z(pthread_cond_destroy(&msq->f)); - msq->inflight = 0; - msq->nsent = 0; - msq->nrecv = 0; - msq->ntimeout = 0; - msq->nagain = 0; +static void rpmaioInit(void *_aio) +{ + rpmaio aio = (rpmaio) _aio; - msq->flags = 0; - msq->qname = _free(msq->qname); - msq->fmode = _free(msq->fmode); - msq->fdno = -1; - msq->qid = -1; - msq->oflags = 0; - msq->omode = 0; - msq->key = 0; - msq->mtype = 0; + if (aio) { + aio->cb.aio_fildes = 0; + aio->cb.aio_lio_opcode = 0; + aio->cb.aio_reqprio = 0; + aio->cb.aio_buf = NULL; + aio->cb.aio_nbytes = 0; + aio->ix = -1; + } +} - if (msq->zlog) - msq->zlog = rpmzLogDump(msq->zlog, NULL); +static void rpmaioFini(void *_aio) +{ + rpmaio aio = (rpmaio) _aio; + + if (aio) { + aio->cb.aio_fildes = 0; + aio->cb.aio_lio_opcode = 0; + aio->cb.aio_reqprio = 0; + aio->cb.aio_buf = NULL; + aio->cb.aio_nbytes = 0; + aio->ix = -1; } } +RPMIOPOOL_MODULE(aio) + +rpmaio rpmaioNew(int fdno, int op, int prio, void *b, size_t nb) +{ + rpmaio aio = rpmaioGetPool(_rpmaioPool); + + aio->_item.next = NULL; /* XXX rpmmalloc.c ? */ + aio->cb.aio_fildes = fdno; + aio->cb.aio_lio_opcode = op; + aio->cb.aio_reqprio = prio; + aio->cb.aio_buf = b; + aio->cb.aio_nbytes = nb; + + aio->cb.__error_code = EINPROGRESS; /* XXX */ + aio->cb.__return_value = 0; /* XXX */ + + aio->ix = -1; + + return rpmaioLink(aio); +} + +/* =============================================================== */ static char * rpmmsqDbug(void *_msq, char *b, size_t nb) { rpmmsq msq = (rpmmsq) _msq; size_t len = strlen(b); char * be = b + len; + rpmioItem item = (rpmioItem) msq; + long use; - if (msq) { + if (msq && (use = PEEK(item->use)) > 0) { int colorize = isatty(fileno(stderr)); #define ANSI_BRIGHT_BLUE "\x1b[34;1m" #define ANSI_RESET "\x1b[0m" *be++ = '\n'; if (colorize) be = stpcpy(be, ANSI_BRIGHT_BLUE); + be += sprintf(be, "========================== msq(%p) use %ld\n", + msq, use); #define PRINT(_fmt, _foo) \ - { be += sprintf(be, "%19s: %"#_fmt"\n", #_foo, msq->_foo); } + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->_foo); } +#define PRINT_ATTRS(_fmt, _foo) \ + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->oattrs._foo); } +#define PRINT_SIGEV(_fmt, _foo) \ + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, msq->sigev._foo); } PRINT(s, qname); PRINT(s, fmode); PRINT(d, fdno); @@ -418,13 +476,32 @@ PRINT(ld, mtype); PRINT(d, msgmax); PRINT(d, msgsize); - LOCK(msq->m); - PRINT(d, inflight); + PRINT(d, nqueued); PRINT(d, nsent); PRINT(d, nrecv); PRINT(d, ntimeout); PRINT(d, nagain); - UNLOCK(msq->m); + PRINT_ATTRS(ld, mq_flags); + PRINT_ATTRS(ld, mq_maxmsg); + PRINT_ATTRS(ld, mq_msgsize); + PRINT_ATTRS(ld, mq_curmsgs); + switch (msq->sigev.sigev_notify) { + default: + break; + case SIGEV_THREAD: + PRINT_SIGEV(p, sigev_value.sival_ptr); + PRINT_SIGEV(d, sigev_signo); + PRINT_SIGEV(d, sigev_notify); + PRINT_SIGEV(p, sigev_notify_function); + PRINT_SIGEV(p, sigev_notify_attributes); + break; + } + PRINT(p, head); + PRINT(p, tail); + PRINT(d, nqueued); + +#undef PRINT_SIGEV +#undef PRINT_ATTRS #undef PRINT be--; if (colorize) be = stpcpy(be, ANSI_RESET); @@ -435,8 +512,53 @@ static void rpmmsqInit(void *_msq) { +} + +static void rpmmsqFini(void *_msq) +{ rpmmsq msq = (rpmmsq) _msq; -fprintf(stderr, "<== %s(%p)\n", __FUNCTION__, msq); + + if (msq) { +SPEW("%s: queued %d sent %d recv %d timeout %d again %d\n", __FUNCTION__, msq->nqueued, msq->nsent, msq->nrecv, msq->ntimeout, msq->nagain); + msq->msgmax = MSQ_MSGMAX; + msq->msgsize = MSQ_MSGSIZE; + + msq->nsent = 0; + msq->nrecv = 0; + msq->ntimeout = 0; + msq->nagain = 0; + + msq->flags = 0; + msq->qname = _free(msq->qname); + msq->fmode = _free(msq->fmode); + msq->fdno = -1; + msq->qid = -1; + msq->oflags = 0; + msq->omode = 0; + msq->key = 0; + msq->mtype = 0; + + /* Drain the queue. */ + while (1) { + rpmioItem item; + RPM_GNUC_TM_ATOMIC { + if ((item = msq->head) == NULL) + break; + item = msq->head; + msq->head = item->next; + item->next = NULL; /* XXX rpmmalloc.c ? */ + if (msq->head == NULL) + msq->tail = &msq->head; + } + while (item) + item = rpmioFreePoolItem(item, + __FUNCTION__, __FILE__, __LINE__); + } + msq->nqueued = 0; + + if (msq->zlog) + msq->zlog = rpmzLogDump(msq->zlog, NULL); + } } RPMIOPOOL_MODULE(msq) @@ -486,10 +608,6 @@ oflags |= O_CLOEXEC; continue; break; - case 'l': /* XXX loopback mode */ - flags |= RPMMSQ_FLAGS_LOOP; - continue; - break; case 'n': /* XXX O_NONBLOCK */ oflags |= O_NONBLOCK; continue; @@ -548,12 +666,12 @@ msq->msgmax = MSQ_MSGMAX; msq->msgsize = MSQ_MSGSIZE; - /* Initialize the monitor. */ /* XXX TODO: use POSIX shared mutexes. */ - Z(pthread_mutex_init(&msq->m, NULL)); - Z(pthread_cond_init(&msq->e, NULL)); - Z(pthread_cond_init(&msq->f, NULL)); - msq->inflight = 0; + RPM_GNUC_TM_ATOMIC { + msq->head = NULL; + msq->tail = &msq->head; + } + msq->nqueued = 0; msq->nsent = 0; msq->nrecv = 0; msq->ntimeout = 0; @@ -572,13 +690,11 @@ msq->qname = rpmGetPath("/", path, NULL); key_t key = 0; - if (!strcmp(msq->qname, "/private") - || !strcmp(msq->qname, "/IPC_PRIVATE")) { + if (!strcmp(msq->qname, "/private")) { key = IPC_PRIVATE; msq->flags |= RPMMSQ_FLAGS_DELETE; /* XXX delete on close. */ } else - if (!strcmp(msq->qname, "/program") - || !strcmp(msq->qname, "/__progname_key")) { + if (!strcmp(msq->qname, "/program")) { key = __progname_key; } else { const char * lpath = msq->qname; @@ -586,7 +702,7 @@ if (!strcmp(lpath, "/rpm")) lpath = "/usr/lib/rpm"; key = ftok(lpath, projid); - if (key == -1) { + if (key == -1) { int lvl = RPMLOG_WARNING; rpmlog(lvl, "%s: ftok(%s,0x%02x) failed: %m\n", __FUNCTION__, path, (projid & 0xff)); @@ -627,26 +743,20 @@ { #if defined(WITH_MQ) /* XXX put *open in a monitor. */ - struct mq_attr _attrs = { - .mq_flags = (oflags & O_NONBLOCK), - .mq_maxmsg = msq->msgmax, - .mq_msgsize = msq->msgsize, - .mq_curmsgs = 0, - }, *attrs = &_attrs; - msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, attrs); - - if (msq->qid != -1 && MSQF_ISSET(LOOP)) { - pthread_attr_t attr; - Z(pthread_getattr_default_np(&attr)); - Z(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); - struct sigevent sigev = { - .sigev_notify = SIGEV_THREAD, - .sigev_notify_function = rpmmsqReader, - .sigev_notify_attributes = &attr, - .sigev_value.sival_ptr = msq, - }; - Z(rpmmsqNotify(msq, &sigev)); - } + msq->oattrs.mq_flags = (oflags & O_NONBLOCK); + msq->oattrs.mq_maxmsg = msq->msgmax; + msq->oattrs.mq_msgsize = msq->msgsize; + msq->oattrs.mq_curmsgs = 0; + msq->qid = Mq_open(msq->qname, msq->oflags, msq->omode, &msq->oattrs); + + Z(pthread_getattr_default_np(&msq->attr)); + Z(pthread_attr_setdetachstate(&msq->attr, PTHREAD_CREATE_DETACHED)); + msq->sigev.sigev_notify = SIGEV_THREAD; + msq->sigev.sigev_notify_function = rpmmsqReader; + msq->sigev.sigev_notify_attributes = &msq->attr; + msq->sigev.sigev_value.sival_ptr = msq; + if (msq->qid != -1) + Z(rpmmsqNotify(msq, &msq->sigev)); #endif /* WITH_MQ */ } break; case RPMMSQ_TYPE_SYSV: @@ -675,20 +785,29 @@ case RPMMSQ_TYPE_POSIX: { #if defined(WITH_MQ) - if (MSQF_ISSET(LOOP)) return 0; /* XXX EOF with loopback service. */ - unsigned int prio = 0; - /* Consumer in monitor. */ - LOCK(msq->m); - while (msq->inflight == 0) - WAIT(msq->e, msq->m); - rc = Mq_receive(msq->qid, buf, count, &prio); + rpmaio aio = rpmaioNew(msq->qid, LIO_READ, 0, buf, count); + aio->ix = ++msq->nqueued; + + rpmioItem item = rpmioLinkPoolItem(&aio->_item, + __FUNCTION__, __FILE__, __LINE__); + + /* Queue the read. */ + POSSESS(item->use); + RPM_GNUC_TM_ATOMIC { + *msq->tail = item; + msq->tail = (rpmioItem *) &item->next; + } + WAITFOR(item->use, TO_BE, 1); + RELEASE(item->use); + + rc = aio->cb.__return_value; /* XXX aio_return */ if (rc >= 0) - msq->nrecv++; - if (msq->inflight-- == msq->msgmax) - SIGNAL(msq->f); - UNLOCK(msq->m); + prio = aio->cb.aio_reqprio; /* XXX prio */ + else + errno = aio_error(&aio->cb); + aio = rpmaioFree(aio); if (rc >= 0) { if (priop) @@ -719,7 +838,7 @@ break; } -SPEW("<== %s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", __FUNCTION__, msq, buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0)); +SPEW("<==\t%s(%p,%p[%lu],%p) qid %d rc %ld *priop %lu\n", __FUNCTION__, msq, buf, count, priop, (msq ? msq->qid : -1), rc, (priop ? *priop : 0)); return rc; } @@ -734,24 +853,38 @@ { #if defined(WITH_MQ) - /* Producer in monitor. */ - LOCK(msq->m); - while (msq->inflight == msq->msgmax) - WAIT(msq->f, msq->m); - rc = Mq_send(msq->qid, buf, count, prio); - if (msq->inflight++ == 0) - SIGNAL(msq->e); - UNLOCK(msq->m); + if (msq->nsent == 0) { + rc = Mq_send(msq->qid, buf, count, prio); + if (rc == 0) + msq->nsent++; - /* (loopback mode) Wait for the reader thread. */ - if (msq->nsent == 0 && MSQF_ISSET(LOOP)) { /* Yield the CPU. */ + sched_yield(); struct timespec ts = { 0, 100*1000 }; Z(nanosleep(&ts, NULL)); - } + /* Wait for the reader thread. */ + } else { + rpmaio aio = rpmaioNew(msq->qid, LIO_WRITE, prio, (char *)buf, count); + aio->ix = ++msq->nqueued; + aio->cb.aio_reqprio = prio; /* XXX prio */ + + rpmioItem item = rpmioLinkPoolItem(&aio->_item, + __FUNCTION__, __FILE__, __LINE__); + + /* Queue the write. */ + POSSESS(item->use); + RPM_GNUC_TM_ATOMIC { + *msq->tail = item; + msq->tail = (rpmioItem *) &item->next; + } + WAITFOR(item->use, TO_BE, 1); + RELEASE(item->use); - if (rc == 0) - msq->nsent++; + rc = aio->cb.__return_value; /* XXX aio_return */ + if (rc < 0) + errno = aio_error(&aio->cb); + aio = rpmaioFree(aio); + } if (rc == 0) /* XXX remap to write(2) return */ rc = count; @@ -781,7 +914,7 @@ break; } -SPEW("<== %s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, buf, count, prio, (msq ? msq->qid : -1), rc); +SPEW("<==\t%s(%p,%p[%lu],%lu) qid %d rc %ld\n", __FUNCTION__, msq, buf, count, prio, (msq ? msq->qid : -1), rc); return rc; } @@ -796,7 +929,7 @@ off_t p = pos; #endif int rc = -2; /* assume failure */ -SPEW("<== %s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, SEEK_[whence&0x3], (msq ? msq->qid : -1), rc); +SPEW("<==\t%s(%p, %ld, SEEK_%s) qid %d rc %d\n", __FUNCTION__, msq, p, SEEK_[whence&0x3], (msq ? msq->qid : -1), rc); return rc; } @@ -804,7 +937,7 @@ { int rc = -2; /* assume failure */ -SPEW("==> %s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : -1)); +SPEW("==>\t%s(%p,%d) qid %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : -1)); if (msq) switch (msq->flags & RPMMSQ_TYPE_MASK) { case RPMMSQ_TYPE_DEFAULT: @@ -812,21 +945,64 @@ { #if defined(WITH_MQ) if (msq->qid != -1) { + if (MSQF_ISSET(INFO) || _rpmmsq_debug) rpmmsqDump(NULL, msq, NULL); - /* Producer in monitor. */ - LOCK(msq->m); - while (msq->inflight == msq->msgmax) - WAIT(msq->f, msq->m); +#ifdef DYING + /* Wait for the queue to drain. */ + while (1) { + rpmioItem item; + RPM_GNUC_TM_ATOMIC { + if ((item = msq->head) == NULL) + break; + } + POSSESS(item->use); + WAITFOR(item->use, TO_BE, 0); + RELEASE(item->use); + } + /* Turn off the sigev detached thread. */ - if (MSQF_ISSET(LOOP)) - rc = rpmmsqNotify(msq, NULL); - rc = Mq_close(msq->qid); + int qid = msq->qid; msq->qid = -1; - if (msq->inflight++ == 0) - SIGNAL(msq->e); - UNLOCK(msq->m); + rc = rpmmsqNotify(msq, NULL); + + /* Yield the CPU. */ + sched_yield(); + struct timespec ts = { 0, 100*1000 }; + Z(nanosleep(&ts, NULL)); + /* Wait for the reader thread. */ + +#else + if (msq->nsent == 0) { + rc = rpmmsqNotify(msq, NULL); + rc = Mq_close(msq->qid); + if (rc == 0) + msq->qid = -1; + break; + } + unsigned prio = 0; + rpmaio aio = rpmaioNew(msq->qid, LIO_CLOSE, prio, NULL, 0); + aio->ix = ++msq->nqueued; + aio->cb.aio_reqprio = prio; /* XXX prio */ + + rpmioItem item = rpmioLinkPoolItem(&aio->_item, + __FUNCTION__, __FILE__, __LINE__); + + /* Queue the write. */ + POSSESS(item->use); + RPM_GNUC_TM_ATOMIC { + *msq->tail = item; + msq->tail = (rpmioItem *) &item->next; + } + WAITFOR(item->use, TO_BE, 1); + RELEASE(item->use); + + rc = aio->cb.__return_value; /* XXX aio_return */ + if (rc < 0) + errno = aio_error(&aio->cb); + aio = rpmaioFree(aio); +#endif } @@ -850,7 +1026,7 @@ default: break; } -SPEW("<== %s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : -1), rc); +SPEW("<==\t%s(%p,%d) qid %d rc %d\n", __FUNCTION__, msq, delete, (msq ? msq->qid : -1), rc); return rc; } @@ -969,6 +1145,33 @@ return rc; } +static ssize_t rpmmsqReaderRead(rpmmsq msq, char *b, size_t nb, unsigned *priop) +{ + ssize_t rc; + + errno = 0; + do { + struct timespec ts = { 0, 1000 }; + + rc = Mq_receive(msq->qid, b, nb, priop); + if (rc < 0) { + switch (errno) { + case ETIMEDOUT: /* XXX Mq_timedreceive() */ + msq->ntimeout++; + Z(nanosleep(&ts, NULL)); + continue; + case EAGAIN: /* XXX O_NONBLOCK */ + msq->nagain++; + Z(nanosleep(&ts, NULL)); + continue; + } + } + break; + } while (rc < 0); + + return rc; +} + void rpmmsqReader(union sigval sv) { #if defined(WITH_MQ) @@ -976,50 +1179,66 @@ assert(msq = rpmmsqLink(msq)); SPEW("==> %s(%p) qid %d\n", __FUNCTION__, msq, msq->qid); - char b[BUFSIZ]; - size_t nb = sizeof(b); - int rc; + ssize_t rc; while (msq->qid != -1) { - unsigned int prio = 0; - int rc; - /* Consumer in monitor. */ - LOCK(msq->m); - while (msq->inflight == 0) - WAIT(msq->e, msq->m); - errno = 0; - do { - struct timespec ts = { 0, 1000 }; - rc = Mq_receive(msq->qid, b, nb, &prio); - if (rc < 0) { - switch (errno) { - case ETIMEDOUT: /* XXX Mq_timedreceive() */ - msq->ntimeout++; - Z(nanosleep(&ts, NULL)); - continue; - case EAGAIN: /* XXX O_NONBLOCK */ - msq->nagain++; - Z(nanosleep(&ts, NULL)); - continue; + while (1) { + rpmioItem item; + + /* Get next request from queue. */ + RPM_GNUC_TM_ATOMIC { + if ((item = msq->head) == NULL) + break; + msq->head = item->next; + item->next = NULL; /* XXX rpmmalloc.c ? */ + if (msq->head == NULL) + msq->tail = &msq->head; + } + + rpmaio aio = (rpmaio) item; + char *b = (char *)aio->cb.aio_buf; + size_t nb = aio->cb.aio_nbytes; + + errno = 0; + switch (aio->cb.aio_lio_opcode) { + case LIO_READ: + { unsigned int prio = 0; + rc = rpmmsqReaderRead(msq, b, nb, &prio); + if (rc >= 0) { + msq->nrecv++; + aio->cb.aio_reqprio = prio; /* XXX prio */ + } + } break; + case LIO_WRITE: + { unsigned int prio = aio->cb.aio_reqprio; /* XXX prio */ + rc = Mq_send(msq->qid, (const char *)b, nb, prio); + if (rc == 0) + msq->nsent++; + } break; + case LIO_CLOSE: + { + if (b == NULL && nb == 0) { + rc = Mq_close(msq->qid); + if (rc == 0) + msq->qid = -1; } + } break; + case LIO_DSYNC: + case LIO_SYNC: + case LIO_READ64: + case LIO_WRITE64: + default: + break; } - if (rc >= 0) - msq->nrecv++; - break; - } while (rc < 0); - if (msq->inflight-- == msq->msgmax) - SIGNAL(msq->f); - UNLOCK(msq->m); - if (rc >= 0) { - /* Deliver the mssage through a callback. */ - int xx = rpmmsqDeliver(msq, b, rc, prio); - (void)xx; + aio->cb.__error_code = errno; /* XXX */ + aio->cb.__return_value = rc; /* XXX */ + aio = rpmaioFree(aio); } } -SPEW("<== %s(%p) qid %d rc %d\n", __FUNCTION__, msq, msq->qid, rc); +SPEW("<== %s(%p) qid %d rc %ld\n", __FUNCTION__, msq, msq->qid, rc); msq = rpmmsqFree(msq); @@ -1029,24 +1248,36 @@ static void rpmmsqDumpST(const char * msg, void * _ptr, FILE * fp) { struct stat *st = (struct stat *) _ptr; + char b[BUFSIZ]; + char * be = b; + int colorize = isatty(fileno(stderr)); if (fp == NULL) fp = stderr; - if (msg) fprintf(fp, "================ %s\n", msg); +#define ANSI_BRIGHT_BLUE "\x1b[34;1m" +#define ANSI_RESET "\x1b[0m" + if (colorize) be = stpcpy(be, ANSI_BRIGHT_BLUE); + if (msg) be += sprintf(be, "========================== %s\n", msg); if (st) { - fprintf(fp, "\t dev: 0x%lx\n", (unsigned long)st->st_dev); - fprintf(fp, "\t ino: %lu\n", (unsigned long)st->st_ino); - fprintf(fp, "\t mode: 0%o\n", st->st_mode); - fprintf(fp, "\t nlink: %lu\n", (unsigned long)st->st_nlink); - fprintf(fp, "\t uid: %d\n", st->st_uid); - fprintf(fp, "\t gid: %d\n", st->st_gid); - fprintf(fp, "\t rdev: 0x%lx\n", (unsigned long)st->st_rdev); - fprintf(fp, "\t size: %lu\n", (unsigned long)st->st_size); - fprintf(fp, "\tblksize: %lu\n", (unsigned long)st->st_blksize); - fprintf(fp, "\t blocks: %lu\n", (unsigned long)st->st_blocks); - fprintf(fp, "\t atime: %lu\n", (unsigned long)st->st_atime); - fprintf(fp, "\t mtime: %lu\n", (unsigned long)st->st_mtime); - fprintf(fp, "\t ctime: %lu\n", (unsigned long)st->st_mtime); +#define PRINT_STAT(_fmt, _foo) \ + { be += sprintf(be, "%25s: %"#_fmt"\n", #_foo, st->st_##_foo); } + PRINT_STAT(lx, dev); + PRINT_STAT(lu, ino); + PRINT_STAT( o, mode); + PRINT_STAT(lu, nlink); + PRINT_STAT( d, uid); + PRINT_STAT( d, gid); + PRINT_STAT(lx, rdev); + PRINT_STAT(zu, size); + PRINT_STAT(zu, blksize); + PRINT_STAT(zu, blocks); + PRINT_STAT(lu, atime); + PRINT_STAT(lu, mtime); + PRINT_STAT(lu, ctime); +#undef PRINT_STAT + *(--be) = '\0'; + if (colorize) be = stpcpy(be, ANSI_RESET); + fprintf(fp, "%s\n", b); } } @@ -1123,11 +1354,10 @@ break; const char * lpath = rpmGetPath("/dev/mqueue/", msq->qname, NULL); - struct stat sb; - rc = Stat(lpath, &sb); + rc = Stat(lpath, &msq->sb); if (!rc) - rpmmsqDumpST(lpath, &sb, NULL); + rpmmsqDumpST(lpath, &msq->sb, NULL); /* XXX .fdio avoids select/poll issues on /dev/mqueue with .ufdio. */ { FD_t fd = Fopen(lpath, "r.fdio"); @@ -1149,16 +1379,14 @@ case RPMMSQ_TYPE_SYSV: { #if defined(WITH_MSQ) - struct msqid_ds ds; - rc = rpmmsqCtl(msq, IPC_STAT, &ds); + rc = rpmmsqCtl(msq, IPC_STAT, &msq->ds); if (!rc) - rpmmsqDumpDS(NULL, &ds, NULL); + rpmmsqDumpDS(NULL, &msq->ds, NULL); #if defined(IPC_INFO) /* XXX linux-only? */ - struct msginfo mi; - rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&mi); + rc = rpmmsqCtl(msq, IPC_INFO, (struct msqid_ds *)&msq->mi); if (!rc) - rpmmsqDumpMI(NULL, &mi, NULL); + rpmmsqDumpMI(NULL, &msq->mi, NULL); #endif #endif /* WITH_MSQ */ } break; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/poptIO.c ============================================================================ $ cvs diff -u -r1.94.2.30 -r1.94.2.31 poptIO.c --- rpm/rpmio/poptIO.c 25 May 2017 19:44:41 -0000 1.94.2.30 +++ rpm/rpmio/poptIO.c 28 May 2017 18:01:52 -0000 1.94.2.31 @@ -865,18 +865,20 @@ { "rpmiobdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmiob_debug, -1, N_("Debug rpmio I/O buffers"), NULL}, + { "rpmaiodebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmaio_debug, -1, + N_("Debug AIO wrapper"), NULL}, { "rpmasndebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmasn_debug, -1, N_("Debug embedded ASN.1 interpreter"), NULL}, { "rpmbagdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmbag_debug, -1, N_("Debug depsolver wrappers "), NULL}, { "rpmcvsdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmcvs_debug, -1, - N_("Debug CVS wrappers "), NULL}, + N_("Debug CVS wrappers"), NULL}, { "rpmgitdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmgit_debug, -1, - N_("Debug GIT wrappers "), NULL}, + N_("Debug GIT wrappers"), NULL}, { "rpmsetdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmset_debug, -1, - N_("Debug SET-VERSION wrappers "), NULL}, + N_("Debug SET-VERSION wrappers"), NULL}, { "rpmsvndebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmsvn_debug, -1, - N_("Debug Subversion wrappers "), NULL}, + N_("Debug Subversion wrappers"), NULL}, { "rpmtpmdebug", '\0', POPT_ARG_VAL|POPT_ARGFLAG_DOC_HIDDEN, &_rpmtpm_debug, -1, N_("Debug TPM emulator"), NULL}, @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmio.c ============================================================================ $ cvs diff -u -r1.230.2.52 -r1.230.2.53 rpmio.c --- rpm/rpmio/rpmio.c 27 May 2017 14:22:18 -0000 1.230.2.52 +++ rpm/rpmio/rpmio.c 28 May 2017 18:01:53 -0000 1.230.2.53 @@ -2659,7 +2659,6 @@ * - bzopen: 'q' sets verbosity to 0 * - bzopen: 'v' does verbosity++ (up to 4) * - HACK: '.' terminates, rest is type of I/O - * - HACK: 'l' loopback mode (msqio) * - HACK: 'n' non-blocking (O_NONBLOCK) * - HACK: 't' truncate (O_TRUNC) * - HACK: 'D' sync (O_DIRECT) @@ -2732,8 +2731,6 @@ if (--nstdio > 0) *stdio++ = c; continue; break; - case 'l': /* XXX loopback mode (rpmmsqNew) */ - goto other; case 'n': flags |= O_NONBLOCK; goto other; @@ -3779,6 +3776,7 @@ RPMIOPOOL_FREE(sp) RPMIOPOOL_INTERP_FREE(sx) + RPMIOPOOL_FREE(aio) RPMIOPOOL_FREE(msq) RPMIOPOOL_FREE(html) @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmalloc.c ============================================================================ $ cvs diff -u -r1.29.2.7 -r1.29.2.8 rpmmalloc.c --- rpm/rpmio/rpmmalloc.c 27 May 2017 14:22:18 -0000 1.29.2.7 +++ rpm/rpmio/rpmmalloc.c 28 May 2017 18:01:53 -0000 1.29.2.8 @@ -115,7 +115,7 @@ pool->size = size; pool->limit = limit; pool->flags = flags; - pool->dbug = (char * (*)(void *item, char *b, size_t nb)) dbug; + pool->dbug = dbug; pool->init = init; pool->fini = fini; pool->reused = 0; @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/rpmmsq.h ============================================================================ $ cvs diff -u -r1.1.2.11 -r1.1.2.12 rpmmsq.h --- rpm/rpmio/rpmmsq.h 26 May 2017 19:53:49 -0000 1.1.2.11 +++ rpm/rpmio/rpmmsq.h 28 May 2017 18:01:53 -0000 1.1.2.12 @@ -26,20 +26,68 @@ RPMMSQ_FLAGS_SYSV = RPMMSQ_TYPE_SYSV, /* bits 2-7 reserved */ RPMMSQ_FLAGS_INFO = _MFB( 8), - RPMMSQ_FLAGS_LOOP = _MFB( 9), - RPMMSQ_FLAGS_RESET = _MFB(10), - RPMMSQ_FLAGS_DELETE = _MFB(11), - RPMMSQ_FLAGS_DEBUG = _MFB(12), - /* bits 13-31 unused */ + RPMMSQ_FLAGS_RESET = _MFB( 9), + RPMMSQ_FLAGS_DELETE = _MFB(10), + RPMMSQ_FLAGS_DEBUG = _MFB(11), + /* bits 14-31 unused */ } rpmmsqFlags; #undef _MFB #undef _KFB /** */ +#include <aio.h> + +enum { + LIO_DSYNC = LIO_NOP+1, + LIO_SYNC, + LIO_CLOSE, + LIO_READ64 = LIO_READ | 128, + LIO_WRITE64 = LIO_WRITE | 128, +}; + +extern int _rpmaio_debug; + +typedef struct rpmaio_s * rpmaio; + +struct rpmaio_s { + struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ + struct aiocb cb; + int ix; +}; + +/** + * Unreference a aio wrapper instance. + * @param aio aio wrapper + * @return NULL on last dereference + */ +rpmaio rpmaioUnlink (rpmaio aio); +#define rpmaioUnlink(_aio) \ + ((rpmaio)rpmioUnlinkPoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, __LINE__)) + +/** + * Reference a aio wrapper instance. + * @param aio aio wrapper + * @return new aio wrapper reference + */ +rpmaio rpmaioLink (rpmaio aio); +#define rpmaioLink(_aio) \ + ((rpmaio)rpmioLinkPoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, __LINE__)) + +/** + * Destroy a aio wrapper. + * @param aio aio wrapper + * @return NULL on last dereference + */ +rpmaio rpmaioFree(rpmaio aio); +#define rpmaioFree(_aio) \ + ((rpmaio)rpmioFreePoolItem((rpmioItem)(_aio), __FUNCTION__, __FILE__, __LINE__)) + #if defined(_RPMMSQ_INTERNAL) struct rpmmsq_s { struct rpmioItem_s _item; /*!< usage mutex and pool identifier. */ + rpmioItem head; /*!< message queue head. */ + rpmioItem * tail; /*!< message queue tail. */ const char * qname; /*!> message queue path. */ const char * fmode; int fdno; @@ -52,17 +100,25 @@ long mtype; /*!< SysV: message type. */ int msgmax; /*!< max. inflight messages. */ - int msgsize; /*!< max. message size. */ + int msgsize; /*!< max. message size. */ - pthread_mutex_t m; /*!< monitor mutex. */ - pthread_cond_t e; /*!< empty condition. */ - pthread_cond_t f; /*!< full condition. */ - int inflight; /*!< no. of inflight messages. */ + int nqueued; /*!< no. messages queued. */ int nsent; /*!< no. messages sent. */ int nrecv; /*!< no. messages received. */ int ntimeout; /*!< no. of receive timeouts. */ int nagain; /*!< no. of waits (O_NONBLOCK). */ + struct mq_attr oattrs; /*!< mq_open attributes. */ + pthread_attr_t attr; /*!< mq_notify thread attr */ + struct sigevent sigev; /*!< mq_notify sigevent_t */ + + struct stat sb; +#if defined(WITH_MSQ) + struct msqid_ds ds; + struct msginfo mi; +#endif /* WITH_MSQ */ + + rpmzLog zlog; }; #endif /* _RPMMSQ_INTERNAL */ @@ . patch -p0 <<'@@ .' Index: rpm/rpmio/tmq.c ============================================================================ $ cvs diff -u -r1.1.2.10 -r1.1.2.11 tmq.c --- rpm/rpmio/tmq.c 26 May 2017 13:22:18 -0000 1.1.2.10 +++ rpm/rpmio/tmq.c 28 May 2017 18:01:53 -0000 1.1.2.11 @@ -62,7 +62,6 @@ RPMMSQ_FLAGS_DEBUG | RPMMSQ_FLAGS_DELETE | RPMMSQ_FLAGS_RESET | - RPMMSQ_FLAGS_LOOP | RPMMSQ_FLAGS_INFO | RPMMSQ_FLAGS_SYSV | RPMMSQ_FLAGS_POSIX | @@ -337,14 +336,16 @@ static int doMSQ(ARGV_t av, int ac) { int rc = -2; /* assume failure */ - char b[BUFSIZ]; - size_t nb = sizeof(b); + char wb[BUFSIZ]; + size_t nwb = sizeof(wb); + size_t nw; + char rb[BUFSIZ]; + size_t nrb = sizeof(rb); + size_t nr; size_t blen; int xx; - (void)b; - (void)nb; - (void)blen; + (void)nrb; #define F_ISSET(_f, _FLAG) ((_f) & RPMMSQ_FLAGS_##_FLAG) fprintf(stderr, "*** %s: flags 0x%x\n", __FUNCTION__, flags); @@ -360,7 +361,6 @@ if (F_ISSET(flags, RESET)) *te++ = 'R'; if (F_ISSET(flags, INFO)) *te++ = 'I'; if (F_ISSET(flags, DELETE)) *te++ = 'D'; - if (F_ISSET(flags, LOOP)) *te++ = 'l'; if (oflags & O_CLOEXEC) *te++ = 'e'; if (oflags & O_NONBLOCK) *te++ = 'n'; if (oflags & O_TRUNC) *te++ = 't'; @@ -378,28 +378,35 @@ #ifdef DISABLE rpmmsq msq = rpmmsqNew(qname, fmode, -1, flags); - memset(b, 0, nb); + memset(wb, 0, nwb); if (msq) { - - strcpy(b, "yadda yadda"); - blen = strlen(b); - xx = rpmmsqSend(msq, b, blen, priority); - - strcpy(b, "bing bang boom"); - blen = strlen(b); - xx = rpmmsqSend(msq, b, blen, priority); - - memset(b, 0, nb); - xx = rpmmsqRecv(msq, b, nb, NULL); - memset(b, 0, nb); - xx = rpmmsqRecv(msq, b, nb, NULL); + const char msg1[] = "yadda yadda"; + size_t nmsg1 = sizeof(msg1) - 1; + const char msg2[] = "bing bang boom"; + size_t nmsg2 = sizeof(msg2) - 1; + + strcpy(wb, msg1); + blen = strlen(wb); + nw = rpmmsqSend(msq, wb, blen, priority); + assert(nw == blen); + + strcpy(wb, msg2); + blen = strlen(wb); + nw = rpmmsqSend(msq, wb, blen, priority); + assert(nw == blen); + + memset(rb, 0, nrb); + nr = rpmmsqRecv(msq, rb, nrb, NULL); + assert(nr == nmsg1); + memset(rb, 0, nrb); + nr = rpmmsqRecv(msq, rb, nrb, NULL); + assert(nr == nmsg2); for (int i = 0; i < count; i++) { - snprintf(b, nb, "%d", i); - blen = strlen(b); - xx = rpmmsqSend(msq, b, blen, priority); - memset(b, 0, nb); - xx = rpmmsqRecv(msq, b, nb, NULL); + blen = snprintf(wb, nwb, "%d", i); + nw = rpmmsqSend(msq, wb, blen, priority); + memset(rb, 0, blen); + nr = rpmmsqRecv(msq, rb, nrb, NULL); } } @@ -410,37 +417,48 @@ FD_t fd = Fopen(qname, fmode); if (fd) { -#ifndef DISABLE +#ifdef DISABLE xx = Ftell(fd); - strcpy(b, "foo bar baz"); - blen = strlen(b); - xx = Fwrite(b, 1, blen, fd); +#endif /* DISABLE */ + + strcpy(wb, "foo bar baz"); + blen = strlen(wb); + nw = Fwrite(wb, 1, blen, fd); + +#ifdef DISABLE xx = Ferror(fd); xx = Fflush(fd); xx = Ftell(fd); +#endif /* DISABLE */ + + memset(rb, 0, blen); + nr = Fread(rb, 1, blen, fd); + assert(nr == nw && !strncmp(rb, wb, blen)); - memset(b, 0, blen); - xx = Fread(b, 1, blen, fd); +#ifdef DISABLE xx = Ftell(fd); xx = Fileno(fd); if (_rpmio_debug) fprintf(stderr, "<== Fileno(%p) rc %d\n", fd, xx); +#endif /* DISABLE */ - strcpy(b, "blah blah blah"); - blen = strlen(b); - xx = Fwrite(b, 1, blen, fd); - memset(b, 0, blen); - xx = Fread(b, 1, blen, fd); + strcpy(wb, "blah blah blah"); + blen = strlen(wb); + nw = Fwrite(wb, 1, blen, fd); + memset(rb, 0, blen); + nr = Fread(rb, 1, blen, fd); + assert(nr == nw && !strncmp(rb, wb, blen)); for (int i = 0; i < count; i++) { - snprintf(b, nb, "%d", i); - blen = strlen(b); - xx = Fwrite(b, 1, blen, fd); - memset(b, 0, blen); - xx = Fread(b, 1, blen, fd); + blen = snprintf(wb, nwb, "%d", i); + nw = Fwrite(wb, 1, blen, fd); + memset(rb, 0, blen); + nr = Fread(rb, 1, blen, fd); + assert(nr == nw && !strncmp(rb, wb, blen)); } +#ifdef DISABLE #ifdef NOTYET xx = Feof(fd); xx = Clearerr(fd); @@ -450,14 +468,13 @@ xx = Fgetpos(fd, &pos); xx = Fsetpos(fd, &pos); -#else FD_t nfd = Fdopen(fd, fmode); if (nfd) { - strcpy(b, "foo bar baz"); - blen = strlen(b); - xx = Fwrite(b, 1, blen, nfd); - memset(b, 0, blen); - xx = Fread(b, 1, blen, nfd); + strcpy(wb, "foo bar baz"); + blen = strlen(wb); + nw = Fwrite(wb, 1, blen, nfd); + memset(rb, 0, blen); + nr = Fread(rb, 1, blen, nfd); xx = Fclose(nfd); } #endif /* DISABLE */ @@ -494,9 +511,6 @@ N_("Display queue info on close"), NULL }, { "delete", 'D', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE, &flags, _MSQBIT(DELETE), N_("Remove queue after closing"), NULL }, - /* XXX -L no workie */ - { "loopback", 'L', POPT_BIT_SET|POPT_ARGFLAG_TOGGLE,&flags, _MSQBIT(LOOP), - N_("Read messages after sending"), NULL }, { "count", 'c', POPT_ARG_INT, &count, 0, N_("Send COUNT messages"), N_("COUNT") }, @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository [email protected]
