"Gregory Stark" <[EMAIL PROTECTED]> writes:

> The two interfaces I'm aware of for this are posix_fadvise() and libaio.
> I've run tests with a synthetic benchmark which generates a large file then
> reads a random selection of blocks from within it using either synchronous
> reads like we do now or either of those interfaces. I saw impressive speed
> gains on a machine with only three drives in a raid array. I did this a
> while ago so I don't have the results handy. I'll rerun the tests again and
> post them.

Here's the results of running the synthetic test program on a 3-drive raid
array. Note that the results *exceeded* the 3x speedup I expected, even for
ordered blocks. Either the drive (or the OS) is capable of reordering the
block requests better than the offset into the file would appear or some other
effect is kicking in.

The test is with an 8GB file, picking 8,192 random 8k blocks from within it.
The pink diamonds represent the bandwidth obtained if the random blocks are
sorted before fetching (like a bitmap indexscan) and the blue if they're
unsorted.

<<inline: test-pfa-results.png>>



for i in 1 2 3 4 5 6 7 8 16 24 32 64 96 128 192 256 384 512 768 1024 2048 4096 
8192 ; do
  ./a.out pfa2 /mnt/data/test.data 8388608 8192 $i 8192 false ; 
done >> test-pfa-results

for i in 1 2 3 4 5 6 7 8 16 24 32 64 96 128 192 256 384 512 768 1024 2048 4096 
8192 ; do
  ./a.out pfa2 /mnt/data/test.data 8388608 8192 $i 8192 true ; 
done >> test-pfa-results

Attachment: test-pfa-results
Description: Binary data

#define _XOPEN_SOURCE 600
#define _GNU_SOURCE
#define _FILE_OFFSET_BITS 64
#define __EXTENSIONS__

#include <sys/types.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <time.h>
#include <sys/fcntl.h>

#include <errno.h>
#include <aio.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#if LINUX
#define HAVE_POSIX_MEMALIGN
#else
#include <malloc.h>
#endif

#if defined(POSIX_FADV_DONTNEED) && defined(POSIX_FADV_WILLNEED)
#define HAVE_PFA
#endif

#if defined(DIRECTIO_ON) && defined(DIRECTIO_OFF)
#define HAVE_DIRECTIO
#define WITH_DIO " w/directio"
#elif defined(O_DIRECT)
#define WITH_DIO " w/O_DIRECT"
#define PLUS_DIO "+O_DIRECT"
#else
#define WITH_DIO " with buffered i/o"
#define PLUS_DIO ""
#endif

enum method { seek, pfa, pfa2, aio} method;
static unsigned work_set_size, block_size; 

static void seek_scan(int fd, off_t *offset_list, unsigned noffsets); 
#ifdef HAVE_PFA
static void pfa_scan(int fd, off_t *offset_list, unsigned noffsets); 
static void pfa_scan2(int fd, off_t *offset_list, unsigned noffsets); 
#endif
static void aio_scan(int fd, off_t *offset_list, unsigned noffsets); 
static void gen_buf(off_t offset, char *buf);
static void check_buf(off_t offset, const char *read_buf);

/* qsort helper */
static int cmp(const void *arg1, const void *arg2)
{
  off_t a = *(off_t*)arg1;
  off_t b = *(off_t*)arg2;

  if (a < b)
    return -1;
  else if (a > b)
    return 1;
  else 
    return 0;
}


int main(int argc, char *argv[])
{
  off_t file_size, sample_size, *offset_list, existing_size;
  unsigned noffsets, sorted_offsets;
  const char *file_name;
  int fd;
  struct timeval before, after;
  unsigned i;
  double elapsed;

  if (argc <= 1)
    method = seek;
#ifdef HAVE_PFA
  else if (!strcmp(argv[1], "pfa"))
    method = pfa;
  else if (!strcmp(argv[1], "pfa2"))
    method = pfa2;
#endif
  else if (!strcmp(argv[1], "aio"))
    method = aio;
  else if (!strcmp(argv[1], "seek"))
    method = seek;
  else {
    fprintf(stderr, "usage: ./a.out [seek|pfa|pfa2|aio] [filename] [file kB] [sample blocks] [concurrent blocks] [block bytes]\n");
    exit(1);
  }
  
  if (argc <= 2)
    file_name = "test.data";
  else
    file_name = argv[2];

  if (argc <= 3)
    file_size = 1024*1024;
  else
    file_size = (off_t)1024*atoi(argv[3]);
  
  if (argc <= 4)
    sample_size = 1;
  else
    sample_size = atoi(argv[4]);

  if (argc <= 5)
    work_set_size = 128;
  else
    work_set_size = atoi(argv[5]);

  if (argc <= 6)
    block_size = 8192;
  else
    block_size = atoi(argv[6]);
  
  if (argc <= 7)
    if (*argv[7] == 't' || atoi(argv[6]))
      sorted_offsets = 1;

  if (block_size <= 0) {
    fprintf(stderr, "bad block size %u\n", block_size);
    exit(1);
  }
  file_size = file_size/block_size*block_size;
  if (file_size <= 0 || sample_size <= 0) {
    fprintf(stderr, "bad file/sample size %llu/%llu\n",
	   (long long unsigned)file_size, 
	   (long long unsigned)sample_size);
    exit(1);
  }

  fprintf(stderr, "reading random %lu %s %uk blocks out of %luM using %s (working set %u)\n", 
	  (unsigned long) sample_size, 
	  sorted_offsets ? "sorted" : "unordered",
	  block_size,
	  (unsigned long) (file_size/1024/1024),
	  (method == seek ? "lseek only" :
	   method == pfa ? "posix_fadvise" :
	   method == pfa2 ? "posix_fadvise v2" :
	   method == aio ? "aio_read" WITH_DIO: 
	   "???"),
	  work_set_size);

  fd = open(file_name, O_RDWR | O_CREAT, 0644);
  if (fd < 0) {
    perror("open");
    exit(1);
  }
  existing_size = lseek(fd, 0, SEEK_END);

  if (existing_size == file_size) {
    fprintf(stderr, "reusing %luM file\n", (unsigned long)(existing_size/1024/1024));
  } else if (existing_size == 0) {
    char *buf = malloc(block_size);
    fprintf(stderr, "generating %luM file\n", (unsigned long)(file_size/1024/1024));
    for (i=0; i<file_size/block_size; i++) {
      gen_buf((off_t)i*block_size, buf);
      ssize_t x = write(fd, buf, block_size);
      if (x < 0) {
	perror("write");
	exit(1);
      } else if (x != block_size) {
	fprintf(stderr, "short write %lu/%lu\n", 
		(unsigned long)x, 
		(unsigned long)block_size);
	exit(1);
      }
    }
    free(buf);
  } else {
    fprintf(stderr, "existing file %s in way, remove it\n", file_name);
    exit(1);
  }
  
  fsync(fd);
  close(fd);

  if (getuid())
    fprintf(stderr, "WARNING: can't drop caches\n");
  else if (system("echo 1 > /proc/sys/vm/drop_caches"))
    fprintf(stderr, "WARNING: can't drop caches\n");

  srandom(getpid());
  offset_list = malloc(sizeof(off_t) * sample_size);
  for (i = 0; i < sample_size; i++) {
    long int r1 = random(), r2 = random();
    off_t r = (off_t)r1<<32|r2;
    offset_list[i] = (r%file_size)/block_size*block_size;
  }
  noffsets = i;
  
  if (sorted_offsets)
    qsort(offset_list, noffsets, sizeof(offset_list[0]), &cmp);

  sleep(1);
  fd = open(file_name, O_RDONLY 
#ifdef O_DIRECT
	                        | (method==aio ? O_DIRECT : 0)
#endif
	    );

#ifdef HAVE_DIRECTIO
  if (method == aio)
    directio(fd, DIRECTIO_ON);
#endif

  if (gettimeofday(&before, NULL) < 0) {
    perror("gettimeofday");
    exit(1);
  }

  switch(method)
    {
    case seek:
      seek_scan(fd, offset_list, noffsets);
      break;
#ifdef HAVE_PFA
    case pfa:
      pfa_scan(fd, offset_list, noffsets);
      break;
    case pfa2:
      pfa_scan2(fd, offset_list, noffsets);
      break;
#endif
    case aio:
      aio_scan(fd, offset_list, noffsets);
      break;
    default:
      exit(3);
    }
  

  if (gettimeofday(&after, NULL) < 0) {
    perror("gettimeofday");
    exit(1);
  }

  close(fd);

  elapsed = after.tv_sec - before.tv_sec;
  elapsed += (after.tv_usec - before.tv_usec)/1000000.0;
  
  fprintf(stderr, "Elapsed time: %.3fs   Bandwidth: %.3f MB/s\n", elapsed, noffsets * ( block_size / 1024.0 / 1024.0 ) / elapsed );

  printf("%lu, %s, %u, %lu, %s, %u, %.3f\n", 
	 (unsigned long) sample_size, 
	 sorted_offsets ? "S" : "U",
	 block_size,
	 (unsigned long) (file_size/1024/1024),
	 (method == seek ? "lseek" :
	  method == pfa ? "pfa" :
	  method == pfa2 ? "pfa2" :
	  method == aio ? "aio" PLUS_DIO: 
	  "???"),
	 work_set_size,
	 noffsets * ( block_size / 1024.0 / 1024.0 ) / elapsed);
  
  fflush(stdout);

  exit(0);
}

static void 
seek_scan(int fd, off_t *offset_list, unsigned noffsets) 
{
  unsigned i;
  char *buf = malloc(block_size);

  for (i=0; i<noffsets; i++) {
    lseek(fd, offset_list[i], SEEK_SET);
    read(fd, buf, block_size);
    check_buf(offset_list[i], buf);
  }
}

#ifdef HAVE_PFA
static void
pfa_scan(int fd, off_t *offset_list, unsigned noffsets)
{
  unsigned i,j;
  char *buf = malloc(block_size);

  for (i=0; i<noffsets; i++) {
    if (i%work_set_size == 0)
      for (j = i; j <noffsets && j < i+work_set_size; j++)
	posix_fadvise(fd, offset_list[j], block_size, POSIX_FADV_WILLNEED);
    lseek(fd, offset_list[i], SEEK_SET);
    read(fd, buf, block_size);

    posix_fadvise(fd, offset_list[i], block_size, POSIX_FADV_DONTNEED);
    check_buf(offset_list[i], buf);
  }
}

static void
pfa_scan2(int fd, off_t *offset_list, unsigned noffsets)
{
  unsigned i,j;
  char *buf = malloc(block_size);

  for (j=i=0; i<noffsets; i++) {

    for(; j < noffsets && j < i+work_set_size; j++)
      posix_fadvise(fd, offset_list[j], block_size, POSIX_FADV_WILLNEED);

    lseek(fd, offset_list[i], SEEK_SET);
    read(fd, buf, block_size);

    posix_fadvise(fd, offset_list[i], block_size, POSIX_FADV_DONTNEED);
    check_buf(offset_list[i], buf);
  }
}

#endif

static void
aio_scan(int fd, off_t *offset_list, unsigned noffsets)
{
  unsigned i,j;
  char *buf;
  struct aiocb *op = calloc(sizeof(struct aiocb), work_set_size);
#ifdef HAVE_POSIX_MEMALIGN
  posix_memalign((void**)&buf, block_size, block_size * work_set_size);
#else
  buf = memalign(block_size, block_size * work_set_size);
#endif

  for (i=0; i<noffsets; i++) {
    const struct aiocb *op1;
    int aio_retval;
    if (i%work_set_size == 0)
      for (j = 0; j < work_set_size && j+i < noffsets; j++) {
	op[j].aio_fildes = fd;
	op[j].aio_buf = buf + block_size * j;
	op[j].aio_nbytes = block_size;
	op[j].aio_offset = offset_list[i+j];
	op[j].aio_sigevent.sigev_notify = SIGEV_NONE;
	if (aio_read(&op[j]) < 0)
	  perror("aio_read");
      }
    op1 = &op[i%work_set_size];
    if (aio_suspend(&op1, 1, NULL) < 0)
      perror("aio_suspend");
    aio_retval = aio_error(&op[i%work_set_size]);
    if (aio_retval == EINPROGRESS) {
      fprintf(stderr, "aio_retval == EINPROGRESS!");
      exit(1);
    }
    if (aio_retval == ECANCELED) {
      fprintf(stderr, "aio_retval == ECANCELED!");
      exit(1);
    }
    if (aio_retval != 0) {
      errno = aio_retval;
      perror("aio_retval");
      exit(1);
    }
    if (aio_return(&op[i%work_set_size]) < 0)
      perror("aio_return");
    check_buf(offset_list[i], buf + block_size * (i%work_set_size));
  }
}

static void
gen_buf(off_t offset, char *buf)
{
  memset(buf, offset%256, block_size);
}

static void
check_buf(off_t offset, const char *read_buf)
{
  int i;
  char c = offset % 256;
  for (i=0;i<block_size;i++)
    if (read_buf[i] != c)
      exit(2);
}


-- 
  Gregory Stark
  EnterpriseDB          http://www.enterprisedb.com
  Ask me about EnterpriseDB's Slony Replication support!
---------------------------(end of broadcast)---------------------------
TIP 9: In versions below 8.0, the planner will ignore your desire to
       choose an index scan if your joining column's datatypes do not
       match

Reply via email to