RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   28-May-2017 22:18:16
  Branch: rpm-5_4                          Handle: 2017052820181501

  Modified files:           (Branch: rpm-5_4)
    rpm                     CHANGES
    rpm/rpmio               msqio.c rpmaio.c rpmmsq.h

  Log:
    - msqio: abstract out the AIO queue (AIOQ_t for now).

  Summary:
    Revision    Changes     Path
    1.3501.2.566+1  -0      rpm/CHANGES
    1.1.2.17    +87 -97     rpm/rpmio/msqio.c
    1.1.2.2     +1  -1      rpm/rpmio/rpmaio.c
    1.1.2.14    +1  -3      rpm/rpmio/rpmmsq.h
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/CHANGES
  ============================================================================
  $ cvs diff -u -r1.3501.2.565 -r1.3501.2.566 CHANGES
  --- rpm/CHANGES       28 May 2017 18:39:22 -0000      1.3501.2.565
  +++ rpm/CHANGES       28 May 2017 20:18:15 -0000      1.3501.2.566
  @@ -1,4 +1,5 @@
   5.4.17 -> 5.4.18:
  +    - jbj: msqio: abstract out the AIO queue (AIOQ_t for now).
       - jbj: rpmaio: stub in an AIO aiocb pool.
       - jbj: msqio: replace pthreads monitor with yarn.
       - jbj: rpmio: wire up the fooInit callback, pass the message to fooDbug.
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/msqio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.16 -r1.1.2.17 msqio.c
  --- rpm/rpmio/msqio.c 28 May 2017 18:39:22 -0000      1.1.2.16
  +++ rpm/rpmio/msqio.c 28 May 2017 20:18:16 -0000      1.1.2.17
  @@ -29,6 +29,13 @@
   
   #define      _RPMAIO_INTERNAL
   #include "rpmaio.h"
  +
  +typedef      struct AIOQ_s * AIOQ_t;
  +struct AIOQ_s {
  +    rpmioItem head;             /*!< AIO cb queue head. */
  +    rpmioItem * tail;           /*!< AIO cb queue tail. */
  +};
  +
   #define      _RPMMSQ_INTERNAL
   #include "rpmmsq.h"
   
  @@ -47,30 +54,6 @@
   
   #define Z(_rc)  assert((_rc) == 0)
   
  -static int _lockdebug = 0;
  -static int _conddebug = -1;
  -#define PEEK(_bolt)  yarnPeekLock(_bolt)
  -#define      POSSESS(_bolt) \
  -    {        yarnPossess(_bolt); \
  -     if (_lockdebug) \
  -         rpmzLogAdd(msq->zlog, "***\t  POSSESS(%p)\t%ld", _bolt, 
PEEK(_bolt)); \
  -    }
  -#define      RELEASE(_bolt) \
  -    {        if (_lockdebug) \
  -         rpmzLogAdd(msq->zlog, "***\t  RELEASE(%p)\t%ld", _bolt, 
PEEK(_bolt)); \
  -     yarnRelease(_bolt); \
  -    }
  -#define      TWIST(_bolt, _op, _val) \
  -    {        if (_conddebug) \
  -         rpmzlogAdd(msq->zlog, "***\t    TWIST(%p, %d, %d)\t%ld", _bolt, 
_op, _val, PEEK(_bolt)); \
  -     yarnTwist(_bolt, _op, _val); \
  -    }
  -#define      WAITFOR(_bolt, _op, _val) \
  -    {        if (_conddebug) \
  -         rpmzLogAdd(msq->zlog, "***\t  WAITFOR(%p, %d, %d)\t%ld", _bolt, 
_op, _val, PEEK(_bolt));\
  -     yarnWaitFor(_bolt, _op, _val); \
  -    }
  -
   #ifdef __cplusplus
   GENfree(rpmmsq)
   #endif       /* __cplusplus */
  @@ -340,14 +323,81 @@
            PRINT(d, ntimeout);
            PRINT(d, nagain);
        }
  -     PRINT(p, head);
  -     PRINT(p, tail);
  +     PRINT(p, aioq.head);
  +     PRINT(p, aioq.tail);
        PRINT(d, nqueued);
   #undef       PRINT
       }
   }
   
   /* =============================================================== */
  +static int _lockdebug = 0;
  +static int _conddebug = -1;
  +#define PEEK(_bolt)  yarnPeekLock(_bolt)
  +#define      POSSESS(_bolt) \
  +    {        yarnPossess(_bolt); \
  +     if (zlog && _lockdebug) \
  +         rpmzLogAdd(zlog, "***\t  POSSESS(%p)\t%ld", _bolt, PEEK(_bolt)); \
  +    }
  +#define      RELEASE(_bolt) \
  +    {        if (zlog && _lockdebug) \
  +         rpmzLogAdd(zlog, "***\t  RELEASE(%p)\t%ld", _bolt, PEEK(_bolt)); \
  +     yarnRelease(_bolt); \
  +    }
  +#define      TWIST(_bolt, _op, _val) \
  +    {        if (zlog && _conddebug) \
  +         rpmzlogAdd(zlog, "***\t    TWIST(%p, %d, %d)\t%ld", _bolt, _op, 
_val, PEEK(_bolt)); \
  +     yarnTwist(_bolt, _op, _val); \
  +    }
  +#define      WAITFOR(_bolt, _op, _val) \
  +    {        if (zlog && _conddebug) \
  +         rpmzLogAdd(zlog, "***\t  WAITFOR(%p, %d, %d)\t%ld", _bolt, _op, 
_val, PEEK(_bolt));\
  +     yarnWaitFor(_bolt, _op, _val); \
  +    }
  +
  +static inline
  +void aioqInit(AIOQ_t aioq)
  +{
  +    RPM_GNUC_TM_ATOMIC {
  +     aioq->head = NULL;
  +     aioq->tail = &aioq->head;
  +    }
  +}
  +
  +static inline
  +rpmioItem aioqGetHead(AIOQ_t aioq)
  +{
  +    rpmioItem item;
  +    RPM_GNUC_TM_ATOMIC {
  +     if ((item = aioq->head) != NULL) {
  +         aioq->head = item->next;
  +         item->next = NULL;          /* XXX rpmmalloc.c ? */
  +         if (aioq->head == NULL)
  +             aioq->tail = &aioq->head;
  +     }
  +    }
  +    return item;
  +}
  +
  +static inline
  +void aioqPutTail(AIOQ_t aioq, rpmioItem item)
  +{
  +    RPM_GNUC_TM_ATOMIC {
  +     *aioq->tail = item;
  +     aioq->tail = (rpmioItem *) &item->next;
  +    }
  +}
  +
  +static inline
  +void aioqPutWait(AIOQ_t aioq, rpmioItem item, rpmzLog zlog)
  +{
  +    POSSESS(item->use);
  +    aioqPutTail(aioq, item);
  +    WAITFOR(item->use, TO_BE, 1);
  +    RELEASE(item->use);
  +}
  +
  +/* =============================================================== */
   static char * rpmmsqDbug(void *_msq, char *b, size_t nb)
   {
       rpmmsq msq = (rpmmsq) _msq;
  @@ -401,8 +451,8 @@
            PRINT_SIGEV(p, sigev_notify_attributes);
            break;
        }
  -     PRINT(p, head);
  -     PRINT(p, tail);
  +     PRINT(p, aioq.head);
  +     PRINT(p, aioq.tail);
        PRINT(d, nqueued);
   
   #undef       PRINT_SIGEV
  @@ -446,15 +496,8 @@
        /* Drain the queue. */
        while (1) {
            rpmioItem item;
  -         RPM_GNUC_TM_ATOMIC {
  -             if ((item = msq->head) == NULL)
  -                 break;
  -             item = msq->head;
  -             msq->head = item->next;
  -             item->next = NULL;              /* XXX rpmmalloc.c ? */
  -             if (msq->head == NULL)
  -                 msq->tail = &msq->head;
  -         }
  +         if ((item = aioqGetHead(&msq->aioq)) == NULL)
  +             break;
            while (item)
                item = rpmioFreePoolItem(item,
                        __FUNCTION__, __FILE__, __LINE__);
  @@ -571,11 +614,8 @@
       msq->msgmax = MSQ_MSGMAX;
       msq->msgsize = MSQ_MSGSIZE;
   
  -    /* XXX TODO: use POSIX shared mutexes. */
  -    RPM_GNUC_TM_ATOMIC {
  -     msq->head = NULL;
  -     msq->tail = &msq->head;
  -    }
  +    /* XXX TODO: use POSIX shared mutexes? */
  +    aioqInit(&msq->aioq);
       msq->nqueued = 0;
       msq->nsent = 0;
       msq->nrecv = 0;
  @@ -699,13 +739,7 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
        /* Queue the read. */
  -     POSSESS(item->use);
  -     RPM_GNUC_TM_ATOMIC {
  -         *msq->tail = item;
  -         msq->tail = (rpmioItem *) &item->next;
  -     }
  -     WAITFOR(item->use, TO_BE, 1);
  -     RELEASE(item->use);
  +     aioqPutWait(&msq->aioq, item, msq->zlog);
   
        rc = aio->cb.__return_value;            /* XXX aio_return */
        if (rc >= 0)
  @@ -777,13 +811,7 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
            /* Queue the write. */
  -         POSSESS(item->use);
  -         RPM_GNUC_TM_ATOMIC {
  -             *msq->tail = item;
  -             msq->tail = (rpmioItem *) &item->next;
  -         }
  -         WAITFOR(item->use, TO_BE, 1);
  -         RELEASE(item->use);
  +         aioqPutWait(&msq->aioq, item, msq->zlog);
   
            rc = aio->cb.__return_value;        /* XXX aio_return */
            if (rc < 0)
  @@ -854,31 +882,6 @@
            if (MSQF_ISSET(INFO) || _rpmmsq_debug)
                rpmmsqDump(NULL, msq, NULL);
   
  -#ifdef       DYING
  -         /* Wait for the queue to drain. */
  -         while (1) {
  -             rpmioItem item;
  -             RPM_GNUC_TM_ATOMIC {
  -                 if ((item = msq->head) == NULL)
  -                     break;
  -             }
  -             POSSESS(item->use);
  -             WAITFOR(item->use, TO_BE, 0);
  -             RELEASE(item->use);
  -         }
  -
  -         /* Turn off the sigev detached thread. */
  -         int qid = msq->qid;
  -         msq->qid = -1;
  -         rc = rpmmsqNotify(msq, NULL);
  -
  -         /* Yield the CPU. */
  -         sched_yield();
  -         struct timespec ts = { 0, 100*1000 };
  -         Z(nanosleep(&ts, NULL));
  -         /* Wait for the reader thread. */
  -
  -#else
            if (msq->nsent == 0)  {
                rc = rpmmsqNotify(msq, NULL);
                rc = Mq_close(msq->qid);
  @@ -895,19 +898,12 @@
                        __FUNCTION__, __FILE__, __LINE__);
   
            /* Queue the write. */
  -         POSSESS(item->use);
  -         RPM_GNUC_TM_ATOMIC {
  -             *msq->tail = item;
  -             msq->tail = (rpmioItem *) &item->next;
  -         }
  -         WAITFOR(item->use, TO_BE, 1);
  -         RELEASE(item->use);
  +         aioqPutWait(&msq->aioq, item, msq->zlog);
   
            rc = aio->cb.__return_value;        /* XXX aio_return */
            if (rc < 0)
                errno = aio_error(&aio->cb);
            aio = rpmaioFree(aio);
  -#endif
   
        }
   
  @@ -1092,14 +1088,8 @@
            rpmioItem item;
   
            /* Get next request from queue. */
  -         RPM_GNUC_TM_ATOMIC {
  -             if ((item = msq->head) == NULL)
  -                 break;
  -             msq->head = item->next;
  -             item->next = NULL;              /* XXX rpmmalloc.c ? */
  -             if (msq->head == NULL)
  -                 msq->tail = &msq->head;
  -         }
  +         if ((item = aioqGetHead(&msq->aioq)) == NULL)
  +             break;
   
            rpmaio aio = (rpmaio) item;
            char *b = (char *)aio->cb.aio_buf;
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmaio.c
  ============================================================================
  $ cvs diff -u -r1.1.2.1 -r1.1.2.2 rpmaio.c
  --- rpm/rpmio/rpmaio.c        28 May 2017 18:39:22 -0000      1.1.2.1
  +++ rpm/rpmio/rpmaio.c        28 May 2017 20:18:16 -0000      1.1.2.2
  @@ -27,7 +27,7 @@
   
   #define Z(_rc)  assert((_rc) == 0)
   
  -#define PEEK(_bolt)     yarnPeekLock(_bolt)
  +#define PEEK(_bolt)  yarnPeekLock(_bolt)
   
   #ifdef __cplusplus
   GENfree(rpmaio)
  @@ .
  patch -p0 <<'@@ .'
  Index: rpm/rpmio/rpmmsq.h
  ============================================================================
  $ cvs diff -u -r1.1.2.13 -r1.1.2.14 rpmmsq.h
  --- rpm/rpmio/rpmmsq.h        28 May 2017 18:39:22 -0000      1.1.2.13
  +++ rpm/rpmio/rpmmsq.h        28 May 2017 20:18:16 -0000      1.1.2.14
  @@ -39,8 +39,7 @@
   #if defined(_RPMMSQ_INTERNAL)
   struct rpmmsq_s {
       struct rpmioItem_s _item;        /*!< usage mutex and pool identifier. */
  -    rpmioItem head;          /*!< message queue head. */
  -    rpmioItem * tail;                /*!< message queue tail. */
  +    struct AIOQ_s aioq;              /*!< message inflight queue */
       const char * qname;              /*!> message queue path. */
       const char * fmode;
       int fdno;
  @@ -71,7 +70,6 @@
       struct msginfo mi;
   #endif       /* WITH_MSQ */
   
  -
       rpmzLog zlog;
   };
   #endif       /* _RPMMSQ_INTERNAL */
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to