RPM Package Manager, CVS Repository http://rpm5.org/cvs/ ____________________________________________________________________________
Server: rpm5.org Name: Jeff Johnson Root: /v/rpm/cvs Email: j...@rpm5.org Module: rpm Date: 01-Jun-2017 20:17:20 Branch: rpm-5_4 Handle: 2017060118172000 Modified files: (Branch: rpm-5_4) rpm/rpmio aiocat.c Log: - aio: sanity. Summary: Revision Changes Path 1.1.2.2 +105 -123 rpm/rpmio/aiocat.c ____________________________________________________________________________ patch -p0 <<'@@ .' Index: rpm/rpmio/aiocat.c ============================================================================ $ cvs diff -u -r1.1.2.1 -r1.1.2.2 aiocat.c --- rpm/rpmio/aiocat.c 29 May 2017 20:44:36 -0000 1.1.2.1 +++ rpm/rpmio/aiocat.c 1 Jun 2017 18:17:20 -0000 1.1.2.2 @@ -20,14 +20,27 @@ || and concurrently using one sproc'd process per file. Thus in a || multiprocessor concurrent input can be done. ============================================================================ */ -#include "../config.h" -#define _SGI_MP_SOURCE /* see the "Caveats" section of sproc(2) */ -#include <sys/time.h> /* for clock() */ + +#include "system.h" + +#include <time.h> /* for clock() */ #include <errno.h> /* for perror() */ #include <stdio.h> /* for printf() */ #include <stdlib.h> /* for getenv(), malloc(3c) */ -#include <ulocks.h> /* usinit() & friends */ -#include <bstring.h> /* for bzero() */ +#include <string.h> +#include <fcntl.h> +#include <pthread.h> +#include <assert.h> +#define Z(_a) (assert((_a) == 0)) + +#ifdef NOTYET + +#else /* NOTYET */ + +#include <yarn.h> + +#endif /* NOTYET */ + #include <sys/resource.h> /* for prctl, get/setrlimit() */ #include <sys/prctl.h> /* for prctl() */ #include <sys/types.h> /* required by lseek(), prctl */ @@ -36,11 +49,15 @@ #include <sys/prctl.h> /* ditto */ #include <signal.h> /* for signals - gets sys/signal and sys/siginfo */ #include <aio.h> /* async I/O */ -#define BLOCKSIZE 2048 /* input units -- play with this number */ -#define MAX_INFILES 10 /* max sprocs: anything from 4 to 20 or so */ -#define DO_SPROCS 1 /* set 0 to do all I/O in a single process */ + +#include "debug.h" + +#define BLOCKSIZE 2048 /* input units -- play with this number */ +#define MAX_INFILES 1024 /* max sprocs: anything from 4 to 20 or so */ +#define DO_SPROCS 0 /* set 0 to do all I/O in a single process */ #define QUITIFNULL(PTR,MSG) if (NULL==PTR) {perror(MSG);return(errno);} #define QUITIFMONE(INT,MSG) if (-1==INT) {perror(MSG);return(errno);} + /***************************************************************************** || The following structure contains the info needed by one child proc. || The main program builds an array of MAX_INFILES of these. @@ -59,19 +76,18 @@ int procid; /* process ID of child process */ off_t fsize; /* size of this input file */ /* read-write to child */ - usema_t* sema; /* semaphore used by methods 2 & 3 */ + yarnLock sema; /* semaphore used by methods 2 & 3 */ off_t outbase; /* starting offset in output file */ off_t inbase; /* current offset in input file */ clock_t etime; /* sum of utime/stime to read file */ - aiocb_t acb; /* aiocb used for reading and writing */ + struct aiocb acb; /* aiocb used for reading and writing */ } child_t; /****************************************************************************** || Globals, accessible to all processes */ char* ofName = NULL; /* output file name string */ int outFD; /* output file descriptor */ -usptr_t* arena; /* arena where everything is built */ -barrier_t* convene; /* barrier used to sync up */ +pthread_barrier_t convene; /* barrier used to sync up */ int nprocs = 1; /* 1 + number of child procs */ child_t* array; /* array of child_t structs in arena */ int errors = 0; /* always incremented on an error */ @@ -92,7 +108,6 @@ int argno; /* loop counter */ child_t* pc; /* ->child_t of current file */ void (*method)(void *,size_t) = inProc0; /* ->chosen input method */ - char arenaPath[128]; /* build area for arena pathname */ char outPath[128]; /* build area for output pathname */ /* || Ensure the name of a temporary directory. @@ -100,38 +115,14 @@ tmpdir = getenv("TMPDIR"); if (!tmpdir) tmpdir = "/var/tmp"; /* - || Build a name for the arena file. - */ - strcpy(arenaPath,tmpdir); - strcat(arenaPath,"/aiocat.wrk"); - /* - || Create the arena. First, call usconfig() to establish the - || minimum size (twice the buffer size per file, to allow for misc usage) - || and the (maximum) number of processes that may later use - || this arena. For this program that is MAX_INFILES+10, allowing - || for our sprocs plus those done by aio_sgi_init(). - || These values apply to any arenas made subsequently, until changed. - */ - { - ptrdiff_t ret; - ret = usconfig(CONF_INITSIZE,2*BLOCKSIZE*MAX_INFILES); - QUITIFMONE(ret,"usconfig size") - ret = usconfig(CONF_INITUSERS,MAX_INFILES+10); - QUITIFMONE(ret,"usconfig users") - arena = usinit(arenaPath); - QUITIFNULL(arena,"usinit") - } - /* || Allocate the barrier. */ - convene = new_barrier(arena); - QUITIFNULL(convene,"new_barrier") + Z(pthread_barrier_init(&convene, NULL, nprocs)); /* || Allocate the array of child info structs and zero it. */ - array = (child_t*)usmalloc(MAX_INFILES*sizeof(child_t),arena); - QUITIFNULL(array,"usmalloc") - bzero((void *)array,MAX_INFILES*sizeof(child_t)); + array = (child_t *) calloc(MAX_INFILES, sizeof(child_t)); +assert(array); /* || Loop over the arguments, setting up child structs and || counting input files. Quit if a file won't open or seek, @@ -178,16 +169,14 @@ /* || save the filename */ - pc = &array[nfiles]; + pc = array + nfiles; strcpy(pc->fname,argv[argno]); /* || allocate a buffer and a semaphore. Not all || child procs use the semaphore but so what? */ - pc->buffer = usmalloc(BLOCKSIZE,arena); - QUITIFNULL(pc->buffer,"usmalloc(buffer)") - pc->sema = usnewsema(arena,0); - QUITIFNULL(pc->sema,"usnewsema") + pc->buffer = malloc(BLOCKSIZE); + pc->sema = yarnNewLock(0); /* || open the file */ @@ -247,38 +236,14 @@ */ nprocs = 1+nfiles; /* - || Initialize async I/O using aio_sgi_init(), in order to specify - || a number of locks at least equal to the number of child procs - || and in order to specify extra sproc users. - */ - { - aioinit_t ainit = {0}; /* all fields initially zero */ - /* - || Go with the default 5 for the number of aio-created procs, - || as we have no way of knowing the number of unique devices. - */ -#define AIO_PROCS 5 - ainit.aio_threads = AIO_PROCS; - /* - || Set the number of locks aio needs to the number of procs - || we will start, minimum 3. - */ - ainit.aio_locks = (nprocs > 2)?nprocs:3; - /* - || Warn aio of the number of user procs that will be - || using its arena. - */ - ainit.aio_numusers = nprocs; - aio_sgi_init(&ainit); - } - /* || Process each input file, either in a child process or in || a subroutine call, as specified by the DO_SPROCS variable. */ for (argno = 0; argno < nfiles; ++argno) { - pc = &array[argno]; + pc = array + argno; #if DO_SPROCS +#ifdef FIXME #define CHILD_STACK 64*1024 /* || For each input file, start a child process as an instance @@ -293,6 +258,8 @@ ,CHILD_STACK); /* max stack seg growth */ QUITIFMONE(pc->procid,"sproc") #else +#endif +#else /* DO_SPROCS */ /* || For each input file, call the selected (-a) method as a || subroutine to copy its file. @@ -301,7 +268,7 @@ method((void*)pc,0); if (errors) break; fprintf(stderr,"done\n"); -#endif +#endif /* DO_SPROCS */ } #if DO_SPROCS /* @@ -309,15 +276,15 @@ || When all have started and reached barrier(), all continue. || If any errors occurred in initialization, quit. */ - barrier(convene,nprocs); + Z(pthread_barrier_wait(&convene)); /* || Child processes are executing now. Reunite the family round the || old hearth one last time, when their processing is complete. || Each child ensures that all its output is complete before it || invokes barrier(). */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ /* || Close the output file and print some statistics. */ @@ -329,22 +296,16 @@ printf(" procid time fsize filename\n"); for(argno = 0, timesum = bytesum = 0 ; argno < nfiles ; ++argno) { - pc = &array[argno]; + pc = array + argno; timesum += pc->etime; bytesum += pc->fsize; - printf("%2d: %-8d %-8d %-8d %s\n" + printf("%2d: %-8d %-8ld %-8ld %s\n" ,argno,pc->procid,pc->etime,pc->fsize,pc->fname); } bperus = ((double)bytesum)/((double)timesum); - printf("total time %d usec, total bytes %d, %g bytes/usec\n" + printf("total time %ld usec, total bytes %ld, %g bytes/usec\n" ,timesum , bytesum , bperus); } - /* - || Unlink the arena file, so it won't exist when this progam runs - || again. If it did exist, it would be used as the initial state of - || the arena, which might or might not have any effect. - */ - unlink(arenaPath); return 0; } /****************************************************************************** @@ -356,17 +317,20 @@ int inWait0(child_t *pch) { int ret; - aiocb_t* pab = &pch->acb; + struct aiocb * pab = &pch->acb; while (EINPROGRESS == (ret = aio_error(pab))) { - sginap(0); + /* Yield the CPU. */ + sched_yield(); + struct timespec ts = { 0, 100*1000 }; + Z(nanosleep(&ts, NULL)); } return ret; } void inProc0(void *arg, size_t stk) { child_t *pch = arg; /* starting arg is ->child_t for my file */ - aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + struct aiocb *pab = &pch->acb; /* base address of the aiocb_t in child_t */ int ret; /* as long as this is 0, all is ok */ int bytes; /* #bytes read on each input */ /* @@ -378,8 +342,8 @@ /* || Wait for the starting gun... */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ pch->etime = clock(); do /* read and write, read and write... */ { @@ -389,7 +353,7 @@ pab->aio_fildes = pch->fd; pab->aio_offset = pch->inbase; pab->aio_nbytes = BLOCKSIZE; - if (ret = aio_read(pab)) + if ((ret = aio_read(pab))) break; /* unable to schedule a read */ ret = inWait0(pch); if (ret) @@ -409,7 +373,7 @@ pab->aio_fildes = outFD; pab->aio_nbytes = bytes; pab->aio_offset = pch->outbase; - if (ret = aio_write(pab)) + if ((ret = aio_write(pab))) break; ret = inWait0(pch); if (ret) @@ -435,8 +399,8 @@ /* || Rendezvous with the rest of the family, then quit. */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ return; } /* end inProc1 */ /****************************************************************************** @@ -447,7 +411,7 @@ int inWait1(child_t *pch) { int ret; - aiocb_t* susplist[1]; /* list of 1 aiocb for aio_suspend() */ + struct aiocb * susplist[1]; /* list of 1 aiocb for aio_suspend() */ susplist[0] = &pch->acb; /* || Note: aio.h declares the 1st argument of aio_suspend() as "const." @@ -456,13 +420,13 @@ || must be cast to that -- else cc gives a warning. The cast || in the following statement is only to avoid this warning. */ - ret = aio_suspend( (const aiocb_t **) susplist,1,NULL); + ret = aio_suspend( (const struct aiocb **) susplist,1,NULL); return ret; } void inProc1(void *arg, size_t stk) { child_t *pch = arg; /* starting arg is ->child_t for my file */ - aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + struct aiocb *pab = &pch->acb; /* base address of the aiocb_t in child_t */ int ret; /* as long as this is 0, all is ok */ int bytes; /* #bytes read on each input */ /* @@ -474,8 +438,8 @@ /* || Wait for the starting gun... */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ pch->etime = clock(); do /* read and write, read and write... */ { @@ -485,7 +449,7 @@ pab->aio_fildes = pch->fd; pab->aio_offset = pch->inbase; pab->aio_nbytes = BLOCKSIZE; - if (ret = aio_read(pab)) + if ((ret = aio_read(pab))) break; ret = inWait1(pch); /* @@ -512,7 +476,7 @@ pab->aio_fildes = outFD; pab->aio_nbytes = bytes; pab->aio_offset = pch->outbase; - if (ret = aio_write(pab)) + if ((ret = aio_write(pab))) break; ret = inWait1(pch); if (!ret) /* op is complete */ @@ -540,23 +504,27 @@ /* || Rendezvous with the rest of the family, then quit. */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ } /* end inProc0 */ /****************************************************************************** || inProc2 requests a signal upon completion of an I/O. After starting || an operation, it P's a semaphore which is V'd from the signal handler. */ #define AIO_SIGNUM SIGRTMIN+1 /* arbitrary choice of signal number */ -void sigHandler2(const int signo, const struct siginfo *sif ) +RPM_GNUC_CONST +void sigHandler2(int signo, siginfo_t *sif, void *ucontext) { /* || In this minimal signal handler we pick up the address of the || child_t info structure -- which was put in aio_sigevent.sigev_value || field during initialization -- and use it to find the semaphore. */ - child_t *pch = sif->si_value.sival_ptr ; - usvsema(pch->sema); + child_t *pch = sif->si_value.sival_ptr; + /* usvsema(pch->sema); */ + yarnPossess(pch->sema); + yarnWaitFor(pch->sema, TO_BE_LESS_THAN, 0); + yarnTwist(pch->sema, BY, +1); return; /* stop here with dbx to print the above address */ } int inWait2(child_t *pch) @@ -566,7 +534,10 @@ || handler could have been entered before this function is called, || or it could be entered afterward. */ - uspsema(pch->sema); + /* uspsema(pch->sema); */ + yarnPossess(pch->sema); + yarnWaitFor(pch->sema, TO_BE_MORE_THAN, 0); + yarnTwist(pch->sema, BY, -1); /* || Since this process executes only one aio operation at a time, || we can return the status of that operation. In a more complicated @@ -578,7 +549,7 @@ void inProc2(void *arg, size_t stk) { child_t *pch = arg; /* starting arg is ->child_t for my file */ - aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + struct aiocb *pab = &pch->acb; /* base address of the aiocb_t in child_t */ int ret; /* as long as this is 0, all is ok */ int bytes; /* #bytes read on each input */ /* @@ -594,18 +565,22 @@ || Initialize -- set up a signal handler for AIO_SIGNUM. */ { - struct sigaction sa = {SA_SIGINFO,sigHandler2}; - ret = sigaction(AIO_SIGNUM,&sa,NULL); + struct sigaction sa = { + .sa_sigaction = sigHandler2, + .sa_flags = SA_RESTART | SA_SIGINFO, + }; + sigemptyset (&sa.sa_mask); + ret = sigaction(AIO_SIGNUM, &sa, NULL); if (ret) ++errors; /* parent will shut down ASAP */ } #if DO_SPROCS /* || Wait for the starting gun... */ - barrier(convene,nprocs); -#else + Z(pthread_barrier_wait(&convene)); +#else /* DO_SPROCS */ if (ret) return; -#endif +#endif /* DO_SPROCS */ pch->etime = clock(); do /* read and write, read and write... */ { @@ -659,8 +634,8 @@ /* || Rendezvous with the rest of the family, then quit. */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ } /* end inProc2 */ /****************************************************************************** @@ -668,6 +643,7 @@ || The callback function executes a V operation. This may come before or || after the P operation. */ +RPM_GNUC_CONST void callBack3(union sigval usv) { /* @@ -676,7 +652,10 @@ || post the semaphore in the child_t struct. */ child_t *pch = usv.sival_ptr; - usvsema(pch->sema); + /* usvsema(pch->sema); */ + yarnPossess(pch->sema); + yarnWaitFor(pch->sema, TO_BE_LESS_THAN, 0); + yarnTwist(pch->sema, BY, +1); return; } int inWait3(child_t *pch) @@ -685,7 +664,10 @@ || Suspend, if necessary, by polling the semaphore. The callback || function might be entered before we reach this point, or after. */ - uspsema(pch->sema); + /* uspsema(pch->sema); */ + yarnPossess(pch->sema); + yarnWaitFor(pch->sema, TO_BE_MORE_THAN, 0); + yarnTwist(pch->sema, BY, -1); /* || Return the status of the aio operation associated with the sema. */ @@ -694,7 +676,7 @@ void inProc3(void *arg, size_t stk) { child_t *pch = arg; /* starting arg is ->child_t for my file */ - aiocb_t *pab = &pch->acb; /* base address of the aiocb_t in child_t */ + struct aiocb *pab = &pch->acb; /* base address of the aiocb_t in child_t */ int ret; /* as long as this is 0, all is ok */ int bytes; /* #bytes read on each input */ /* @@ -702,16 +684,16 @@ || the child_t struct is passed as the siginfo value to be passed || into the callback. */ - pab->aio_sigevent.sigev_notify = SIGEV_CALLBACK; - pab->aio_sigevent.sigev_func = callBack3; + pab->aio_sigevent.sigev_notify = SIGEV_THREAD; + pab->aio_sigevent.sigev_notify_function = callBack3; pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch; pab->aio_buf = pch->buffer; /* always the same */ #if DO_SPROCS /* || Wait for the starting gun... */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ pch->etime = clock(); do /* read and write, read and write... */ { @@ -765,6 +747,6 @@ /* || Rendezvous with the rest of the family, then quit. */ - barrier(convene,nprocs); -#endif + Z(pthread_barrier_wait(&convene)); +#endif /* DO_SPROCS */ } /* end inProc3 */ @@ . ______________________________________________________________________ RPM Package Manager http://rpm5.org CVS Sources Repository rpm-cvs@rpm5.org