RPM Package Manager, CVS Repository
  http://rpm5.org/cvs/
  ____________________________________________________________________________

  Server: rpm5.org                         Name:   Jeff Johnson
  Root:   /v/rpm/cvs                       Email:  j...@rpm5.org
  Module: rpm                              Date:   01-Jun-2017 20:17:20
  Branch: rpm-5_4                          Handle: 2017060118172000

  Modified files:           (Branch: rpm-5_4)
    rpm/rpmio               aiocat.c

  Log:
    - aio: sanity.

  Summary:
    Revision    Changes     Path
    1.1.2.2     +105 -123   rpm/rpmio/aiocat.c
  ____________________________________________________________________________

  patch -p0 <<'@@ .'
  Index: rpm/rpmio/aiocat.c
  ============================================================================
  $ cvs diff -u -r1.1.2.1 -r1.1.2.2 aiocat.c
  --- rpm/rpmio/aiocat.c        29 May 2017 20:44:36 -0000      1.1.2.1
  +++ rpm/rpmio/aiocat.c        1 Jun 2017 18:17:20 -0000       1.1.2.2
  @@ -20,14 +20,27 @@
   || and concurrently using one sproc'd process per file.  Thus in a
   || multiprocessor concurrent input can be done.
   ============================================================================ 
*/
  -#include "../config.h"
  -#define _SGI_MP_SOURCE  /* see the "Caveats" section of sproc(2) */
  -#include <sys/time.h>   /* for clock() */
  +
  +#include "system.h"
  +
  +#include <time.h>       /* for clock() */
   #include <errno.h>      /* for perror() */
   #include <stdio.h>      /* for printf() */
   #include <stdlib.h>     /* for getenv(), malloc(3c) */
  -#include <ulocks.h>     /* usinit() & friends */
  -#include <bstring.h>    /* for bzero() */
  +#include <string.h>
  +#include <fcntl.h>
  +#include <pthread.h>
  +#include <assert.h>
  +#define      Z(_a)   (assert((_a) == 0))
  +
  +#ifdef       NOTYET
  +
  +#else        /* NOTYET */
  +
  +#include <yarn.h>
  +
  +#endif       /* NOTYET */
  +
   #include <sys/resource.h> /* for prctl, get/setrlimit() */
   #include <sys/prctl.h>  /* for prctl() */
   #include <sys/types.h>  /* required by lseek(), prctl */
  @@ -36,11 +49,15 @@
   #include <sys/prctl.h>  /* ditto */
   #include <signal.h>     /* for signals - gets sys/signal and sys/siginfo */
   #include <aio.h>        /* async I/O */
  -#define BLOCKSIZE 2048  /* input units -- play with this number */
  -#define MAX_INFILES 10  /* max sprocs: anything from 4 to 20 or so */
  -#define DO_SPROCS 1     /* set 0 to do all I/O in a single process */
  +
  +#include "debug.h"
  +
  +#define BLOCKSIZE 2048    /* input units -- play with this number */
  +#define MAX_INFILES 1024  /* max sprocs: anything from 4 to 20 or so */
  +#define DO_SPROCS 0       /* set 0 to do all I/O in a single process */
   #define QUITIFNULL(PTR,MSG) if (NULL==PTR) {perror(MSG);return(errno);}
   #define QUITIFMONE(INT,MSG) if (-1==INT) {perror(MSG);return(errno);}
  +
   
/*****************************************************************************
   || The following structure contains the info needed by one child proc.
   || The main program builds an array of MAX_INFILES of these.
  @@ -59,19 +76,18 @@
       int         procid;     /* process ID of child process */
       off_t       fsize;      /* size of this input file */
           /* read-write to child */
  -    usema_t*    sema;       /* semaphore used by methods 2 & 3 */
  +    yarnLock    sema;       /* semaphore used by methods 2 & 3 */
       off_t       outbase;    /* starting offset in output file */
       off_t       inbase;     /* current offset in input file */
       clock_t     etime;      /* sum of utime/stime to read file */
  -    aiocb_t     acb;        /* aiocb used for reading and writing */
  +    struct aiocb acb;       /* aiocb used for reading and writing */
   } child_t;
   
/******************************************************************************
   || Globals, accessible to all processes
   */
   char*       ofName = NULL;  /* output file name string */
   int         outFD;          /* output file descriptor */
  -usptr_t*    arena;          /* arena where everything is built */
  -barrier_t*  convene;        /* barrier used to sync up */
  +pthread_barrier_t  convene; /* barrier used to sync up */
   int         nprocs = 1;     /* 1 + number of child procs */
   child_t*    array;          /* array of child_t structs in arena */
   int         errors = 0;     /* always incremented on an error */
  @@ -92,7 +108,6 @@
       int         argno;          /* loop counter */
       child_t*    pc;             /* ->child_t of current file */
       void (*method)(void *,size_t) = inProc0; /* ->chosen input method */
  -    char        arenaPath[128]; /* build area for arena pathname */
       char        outPath[128];   /* build area for output pathname */    
       /*
       || Ensure the name of a temporary directory.
  @@ -100,38 +115,14 @@
       tmpdir = getenv("TMPDIR");
       if (!tmpdir) tmpdir = "/var/tmp";
       /*
  -    || Build a name for the arena file.
  -    */
  -    strcpy(arenaPath,tmpdir);
  -    strcat(arenaPath,"/aiocat.wrk");
  -    /*
  -    || Create the arena. First, call usconfig() to establish the
  -    || minimum size (twice the buffer size per file, to allow for misc usage)
  -    || and the (maximum) number of processes that may later use
  -    || this arena.  For this program that is MAX_INFILES+10, allowing
  -    || for our sprocs plus those done by aio_sgi_init().
  -    || These values apply to any arenas made subsequently, until changed.
  -    */
  -    {
  -        ptrdiff_t ret;
  -        ret = usconfig(CONF_INITSIZE,2*BLOCKSIZE*MAX_INFILES);
  -        QUITIFMONE(ret,"usconfig size")
  -        ret = usconfig(CONF_INITUSERS,MAX_INFILES+10);
  -        QUITIFMONE(ret,"usconfig users")
  -        arena = usinit(arenaPath);
  -        QUITIFNULL(arena,"usinit")
  -    }
  -    /*
       || Allocate the barrier.
       */
  -    convene = new_barrier(arena);
  -    QUITIFNULL(convene,"new_barrier")
  +    Z(pthread_barrier_init(&convene, NULL, nprocs));
       /*
       || Allocate the array of child info structs and zero it.
       */
  -    array = (child_t*)usmalloc(MAX_INFILES*sizeof(child_t),arena);
  -    QUITIFNULL(array,"usmalloc")
  -    bzero((void *)array,MAX_INFILES*sizeof(child_t));
  +    array = (child_t *) calloc(MAX_INFILES, sizeof(child_t));
  +assert(array);
       /*
       || Loop over the arguments, setting up child structs and
       || counting input files.  Quit if a file won't open or seek,
  @@ -178,16 +169,14 @@
                   /*
                   || save the filename
                   */
  -                pc = &array[nfiles];
  +                pc = array + nfiles;
                   strcpy(pc->fname,argv[argno]);
                   /*
                   || allocate a buffer and a semaphore.  Not all
                   || child procs use the semaphore but so what?
                   */
  -                pc->buffer = usmalloc(BLOCKSIZE,arena);
  -                QUITIFNULL(pc->buffer,"usmalloc(buffer)")
  -                pc->sema = usnewsema(arena,0);
  -                QUITIFNULL(pc->sema,"usnewsema")
  +                pc->buffer = malloc(BLOCKSIZE);
  +             pc->sema = yarnNewLock(0);
                   /*
                   || open the file
                   */
  @@ -247,38 +236,14 @@
       */
       nprocs = 1+nfiles;
       /*
  -    || Initialize async I/O using aio_sgi_init(), in order to specify
  -    || a number of locks at least equal to the number of child procs
  -    || and in order to specify extra sproc users.
  -    */
  -    {
  -        aioinit_t ainit = {0}; /* all fields initially zero */
  -        /*
  -        || Go with the default 5 for the number of aio-created procs,
  -        || as we have no way of knowing the number of unique devices.
  -        */
  -#define AIO_PROCS 5
  -        ainit.aio_threads = AIO_PROCS;
  -        /*
  -        || Set the number of locks aio needs to the number of procs
  -        || we will start, minimum 3.
  -        */
  -        ainit.aio_locks = (nprocs > 2)?nprocs:3;
  -        /*
  -        || Warn aio of the number of user procs that will be
  -        || using its arena.
  -        */
  -        ainit.aio_numusers = nprocs;
  -        aio_sgi_init(&ainit);
  -    }
  -    /*
       || Process each input file, either in a child process or in
       || a subroutine call, as specified by the DO_SPROCS variable.
       */
       for (argno = 0; argno < nfiles; ++argno)
       {
  -        pc = &array[argno];
  +        pc = array + argno;
   #if DO_SPROCS
  +#ifdef       FIXME
   #define CHILD_STACK 64*1024
           /*
           || For each input file, start a child process as an instance
  @@ -293,6 +258,8 @@
                               ,CHILD_STACK);  /* max stack seg growth */
           QUITIFMONE(pc->procid,"sproc")
   #else
  +#endif
  +#else        /* DO_SPROCS */
           /*
           || For each input file, call the selected (-a) method as a
           || subroutine to copy its file.
  @@ -301,7 +268,7 @@
           method((void*)pc,0);
           if (errors) break;
           fprintf(stderr,"done\n");
  -#endif
  +#endif       /* DO_SPROCS */
       }
   #if DO_SPROCS
       /*
  @@ -309,15 +276,15 @@
       || When all have started and reached barrier(), all continue.
       || If any errors occurred in initialization, quit.
       */
  -    barrier(convene,nprocs);
  +    Z(pthread_barrier_wait(&convene));
       /*
       || Child processes are executing now. Reunite the family round the
       || old hearth one last time, when their processing is complete.
       || Each child ensures that all its output is complete before it
       || invokes barrier().
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
       /*
       || Close the output file and print some statistics.
       */
  @@ -329,22 +296,16 @@
           printf("    procid   time     fsize     filename\n");
           for(argno = 0, timesum = bytesum = 0 ; argno < nfiles ; ++argno)
           {
  -            pc = &array[argno];
  +         pc = array + argno;
               timesum += pc->etime;
               bytesum += pc->fsize;
  -            printf("%2d: %-8d %-8d %-8d  %s\n"
  +            printf("%2d: %-8d %-8ld %-8ld  %s\n"
                       ,argno,pc->procid,pc->etime,pc->fsize,pc->fname);
           }
           bperus = ((double)bytesum)/((double)timesum);
  -        printf("total time %d usec, total bytes %d, %g bytes/usec\n"
  +        printf("total time %ld usec, total bytes %ld, %g bytes/usec\n"
                        ,timesum            , bytesum , bperus);
       }
  -    /*
  -    || Unlink the arena file, so it won't exist when this progam runs
  -    || again. If it did exist, it would be used as the initial state of
  -    || the arena, which might or might not have any effect.
  -    */
  -    unlink(arenaPath);
       return 0;
   }
   
/******************************************************************************
  @@ -356,17 +317,20 @@
   int inWait0(child_t *pch)
   {
       int ret;
  -    aiocb_t* pab = &pch->acb;
  +    struct aiocb * pab = &pch->acb;
       while (EINPROGRESS == (ret = aio_error(pab)))
       {
  -        sginap(0);
  +     /* Yield the CPU. */
  +     sched_yield();
  +     struct timespec ts = { 0, 100*1000 };
  +     Z(nanosleep(&ts, NULL));
       }
       return ret;
   }
   void inProc0(void *arg, size_t stk)
   {
       child_t *pch = arg;         /* starting arg is ->child_t for my file */
  -    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
  +    struct aiocb *pab = &pch->acb;   /* base address of the aiocb_t in 
child_t */
       int ret;                    /* as long as this is 0, all is ok */
       int bytes;                  /* #bytes read on each input */
       /*
  @@ -378,8 +342,8 @@
       /*
       || Wait for the starting gun...
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
       pch->etime = clock();
       do /* read and write, read and write... */
       {
  @@ -389,7 +353,7 @@
           pab->aio_fildes = pch->fd;
           pab->aio_offset = pch->inbase;
           pab->aio_nbytes = BLOCKSIZE;
  -        if (ret = aio_read(pab))
  +        if ((ret = aio_read(pab)))
               break;  /* unable to schedule a read */
           ret = inWait0(pch);
           if (ret)
  @@ -409,7 +373,7 @@
           pab->aio_fildes = outFD;
           pab->aio_nbytes = bytes;
           pab->aio_offset = pch->outbase;
  -        if (ret = aio_write(pab))
  +        if ((ret = aio_write(pab)))
               break;
           ret = inWait0(pch);
           if (ret)
  @@ -435,8 +399,8 @@
       /*
       || Rendezvous with the rest of the family, then quit.
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
       return;
   } /* end inProc1 */
   
/******************************************************************************
  @@ -447,7 +411,7 @@
   int inWait1(child_t *pch)
   {
       int ret;
  -    aiocb_t* susplist[1]; /* list of 1 aiocb for aio_suspend() */
  +    struct aiocb * susplist[1]; /* list of 1 aiocb for aio_suspend() */
       susplist[0] = &pch->acb;
       /*
       || Note: aio.h declares the 1st argument of aio_suspend() as "const."
  @@ -456,13 +420,13 @@
       || must be cast to that -- else cc gives a warning.  The cast
       || in the following statement is only to avoid this warning.
       */
  -    ret = aio_suspend( (const aiocb_t **) susplist,1,NULL);
  +    ret = aio_suspend( (const struct aiocb **) susplist,1,NULL);
       return ret;
   }
   void inProc1(void *arg, size_t stk)
   {
       child_t *pch = arg;         /* starting arg is ->child_t for my file */
  -    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
  +    struct aiocb *pab = &pch->acb;   /* base address of the aiocb_t in 
child_t */
       int ret;                    /* as long as this is 0, all is ok */
       int bytes;                  /* #bytes read on each input */
       /*
  @@ -474,8 +438,8 @@
       /*
       || Wait for the starting gun...
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
       pch->etime = clock();
       do /* read and write, read and write... */
       {
  @@ -485,7 +449,7 @@
           pab->aio_fildes = pch->fd;
           pab->aio_offset = pch->inbase;
           pab->aio_nbytes = BLOCKSIZE;
  -        if (ret = aio_read(pab))
  +        if ((ret = aio_read(pab)))
               break;
           ret = inWait1(pch);
           /*
  @@ -512,7 +476,7 @@
           pab->aio_fildes = outFD;
           pab->aio_nbytes = bytes;
           pab->aio_offset = pch->outbase;
  -        if (ret = aio_write(pab))
  +        if ((ret = aio_write(pab)))
               break;
           ret = inWait1(pch);
           if (!ret) /* op is complete */
  @@ -540,23 +504,27 @@
       /*
       || Rendezvous with the rest of the family, then quit.
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
   } /* end inProc0 */
   
/******************************************************************************
   || inProc2 requests a signal upon completion of an I/O. After starting
   || an operation, it P's a semaphore which is V'd from the signal handler.
   */
   #define AIO_SIGNUM SIGRTMIN+1 /* arbitrary choice of signal number */
  -void sigHandler2(const int signo, const struct siginfo *sif )
  +RPM_GNUC_CONST
  +void sigHandler2(int signo, siginfo_t *sif, void *ucontext)
   {
       /*
       || In this minimal signal handler we pick up the address of the
       || child_t info structure -- which was put in aio_sigevent.sigev_value
       || field during initialization -- and use it to find the semaphore.
       */
  -    child_t *pch = sif->si_value.sival_ptr ;
  -    usvsema(pch->sema);
  +    child_t *pch = sif->si_value.sival_ptr;
  +    /* usvsema(pch->sema); */
  +    yarnPossess(pch->sema);
  +    yarnWaitFor(pch->sema, TO_BE_LESS_THAN, 0);
  +    yarnTwist(pch->sema, BY, +1);
       return; /* stop here with dbx to print the above address */
   }
   int inWait2(child_t *pch)
  @@ -566,7 +534,10 @@
       || handler could have been entered before this function is called,
       || or it could be entered afterward.
       */
  -    uspsema(pch->sema);
  +    /* uspsema(pch->sema); */
  +    yarnPossess(pch->sema);
  +    yarnWaitFor(pch->sema, TO_BE_MORE_THAN, 0);
  +    yarnTwist(pch->sema, BY, -1);
       /*
       || Since this process executes only one aio operation at a time,
       || we can return the status of that operation.  In a more complicated
  @@ -578,7 +549,7 @@
   void inProc2(void *arg, size_t stk)
   {
       child_t *pch = arg;         /* starting arg is ->child_t for my file */
  -    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
  +    struct aiocb *pab = &pch->acb;   /* base address of the aiocb_t in 
child_t */
       int ret;                    /* as long as this is 0, all is ok */
       int bytes;                  /* #bytes read on each input */
       /*
  @@ -594,18 +565,22 @@
       || Initialize -- set up a signal handler for AIO_SIGNUM.
       */
       {
  -        struct sigaction sa = {SA_SIGINFO,sigHandler2};
  -        ret = sigaction(AIO_SIGNUM,&sa,NULL);
  +        struct sigaction sa = {
  +         .sa_sigaction = sigHandler2,
  +         .sa_flags = SA_RESTART | SA_SIGINFO,
  +     };
  +     sigemptyset (&sa.sa_mask);
  +        ret = sigaction(AIO_SIGNUM, &sa, NULL);
           if (ret) ++errors; /* parent will shut down ASAP */
       }   
   #if DO_SPROCS
       /*
       || Wait for the starting gun...
       */
  -    barrier(convene,nprocs);
  -#else
  +    Z(pthread_barrier_wait(&convene));
  +#else        /* DO_SPROCS */
       if (ret) return;
  -#endif
  +#endif       /* DO_SPROCS */
       pch->etime = clock();
       do /* read and write, read and write... */
       {
  @@ -659,8 +634,8 @@
       /*
       || Rendezvous with the rest of the family, then quit.
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
   } /* end inProc2 */
    
   
/******************************************************************************
  @@ -668,6 +643,7 @@
   || The callback function executes a V operation.  This may come before or
   || after the P operation.
   */
  +RPM_GNUC_CONST
   void callBack3(union sigval usv)
   {
       /*
  @@ -676,7 +652,10 @@
       || post the semaphore in the child_t struct.
       */
       child_t *pch = usv.sival_ptr;
  -    usvsema(pch->sema);
  +    /* usvsema(pch->sema); */
  +    yarnPossess(pch->sema);
  +    yarnWaitFor(pch->sema, TO_BE_LESS_THAN, 0);
  +    yarnTwist(pch->sema, BY, +1);
       return;
   }
   int inWait3(child_t *pch)
  @@ -685,7 +664,10 @@
       || Suspend, if necessary, by polling the semaphore.  The callback
       || function might be entered before we reach this point, or after.
       */
  -    uspsema(pch->sema);
  +    /* uspsema(pch->sema); */
  +    yarnPossess(pch->sema);
  +    yarnWaitFor(pch->sema, TO_BE_MORE_THAN, 0);
  +    yarnTwist(pch->sema, BY, -1);
       /*
       || Return the status of the aio operation associated with the sema.
       */
  @@ -694,7 +676,7 @@
   void inProc3(void *arg, size_t stk)
   {
       child_t *pch = arg;         /* starting arg is ->child_t for my file */
  -    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
  +    struct aiocb *pab = &pch->acb;   /* base address of the aiocb_t in 
child_t */
       int ret;                    /* as long as this is 0, all is ok */
       int bytes;                  /* #bytes read on each input */
       /*
  @@ -702,16 +684,16 @@
       || the child_t struct is passed as the siginfo value to be passed
       || into the callback. 
       */
  -    pab->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
  -    pab->aio_sigevent.sigev_func = callBack3;
  +    pab->aio_sigevent.sigev_notify = SIGEV_THREAD;
  +    pab->aio_sigevent.sigev_notify_function = callBack3;
       pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch;
       pab->aio_buf = pch->buffer; /* always the same */
   #if DO_SPROCS
       /*
       || Wait for the starting gun...
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
       pch->etime = clock();
       do /* read and write, read and write... */
       {
  @@ -765,6 +747,6 @@
       /*
       || Rendezvous with the rest of the family, then quit.
       */
  -    barrier(convene,nprocs);
  -#endif
  +    Z(pthread_barrier_wait(&convene));
  +#endif       /* DO_SPROCS */
   } /* end inProc3 */
  @@ .
______________________________________________________________________
RPM Package Manager                                    http://rpm5.org
CVS Sources Repository                                rpm-cvs@rpm5.org

Reply via email to