I wrote: > I'll post a patch soon that builds on that, so you can see what I mean.
I've attached where I was imagining this heading, as a text file to avoid distracting the cfbot. Here are the numbers I got with your test on the attached, as well as your 0001, on x86-64 Clang 10, default siglen: master: 739ms v3-0001 692ms attached POC 665ms The small additional speed up is not worth it, given the code churn and complexity, so I don't want to go this route after all. I think the way to go is a simplified version of your 0001 (not 0002), with only a single function, for gist and intarray only, and a style that better matches the surrounding code. If you look at my xor functions in the attached text file, you'll get an idea of what it should look like. Note that it got the above performance without ever trying to massage the pointer alignment. I'm a bit uncomfortable with the fact that we can't rely on alignment, but maybe there's a simple fix somewhere in the gist code. -- John Naylor EDB: http://www.enterprisedb.com
src/backend/access/heap/visibilitymap.c | 13 +- src/backend/nodes/bitmapset.c | 23 +-- src/backend/utils/adt/tsgistidx.c | 14 +- src/include/port/pg_bitutils.h | 12 +- src/port/pg_bitutils.c | 240 ++++++++++++++++++++++++++------ 5 files changed, 214 insertions(+), 88 deletions(-) diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c index e198df65d8..d910eb2875 100644 --- a/src/backend/access/heap/visibilitymap.c +++ b/src/backend/access/heap/visibilitymap.c @@ -388,7 +388,6 @@ visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_fro { Buffer mapBuffer; uint64 *map; - int i; /* * Read till we fall off the end of the map. We assume that any extra @@ -409,17 +408,11 @@ visibilitymap_count(Relation rel, BlockNumber *all_visible, BlockNumber *all_fro StaticAssertStmt(MAPSIZE % sizeof(uint64) == 0, "unsupported MAPSIZE"); if (all_frozen == NULL) - { - for (i = 0; i < MAPSIZE / sizeof(uint64); i++) - nvisible += pg_popcount64(map[i] & VISIBLE_MASK64); - } + nvisible = pg_popcount_mask64(map, MAPSIZE, VISIBLE_MASK64); else { - for (i = 0; i < MAPSIZE / sizeof(uint64); i++) - { - nvisible += pg_popcount64(map[i] & VISIBLE_MASK64); - nfrozen += pg_popcount64(map[i] & FROZEN_MASK64); - } + nvisible = pg_popcount_mask64(map, MAPSIZE, VISIBLE_MASK64); + nfrozen = pg_popcount_mask64(map, MAPSIZE, FROZEN_MASK64); } ReleaseBuffer(mapBuffer); diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c index 649478b0d4..5368c72255 100644 --- a/src/backend/nodes/bitmapset.c +++ b/src/backend/nodes/bitmapset.c @@ -452,7 +452,6 @@ bms_is_member(int x, const Bitmapset *a) int bms_member_index(Bitmapset *a, int x) { - int i; int bitnum; int wordnum; int result = 0; @@ -466,14 +465,8 @@ bms_member_index(Bitmapset *a, int x) bitnum = BITNUM(x); /* count bits in preceding words */ - for (i = 0; i < wordnum; i++) - { - bitmapword w = a->words[i]; - - /* No need to count the bits in a zero word */ - if (w != 0) - result += bmw_popcount(w); - } + result = pg_popcount((const char *) a->words, + wordnum * BITS_PER_BITMAPWORD / BITS_PER_BYTE); /* * Now add bits of the last word, but only those before the item. We can @@ -645,22 +638,14 @@ bms_get_singleton_member(const Bitmapset *a, int *member) int bms_num_members(const Bitmapset *a) { - int result = 0; int nwords; - int wordnum; if (a == NULL) return 0; nwords = a->nwords; - for (wordnum = 0; wordnum < nwords; wordnum++) - { - bitmapword w = a->words[wordnum]; - /* No need to count the bits in a zero word */ - if (w != 0) - result += bmw_popcount(w); - } - return result; + return pg_popcount((const char *) a->words, + nwords * BITS_PER_BITMAPWORD / BITS_PER_BYTE); } /* diff --git a/src/backend/utils/adt/tsgistidx.c b/src/backend/utils/adt/tsgistidx.c index c09eefdda2..0602bb0b7f 100644 --- a/src/backend/utils/adt/tsgistidx.c +++ b/src/backend/utils/adt/tsgistidx.c @@ -489,17 +489,9 @@ sizebitvec(BITVECP sign, int siglen) static int hemdistsign(BITVECP a, BITVECP b, int siglen) { - int i, - diff, - dist = 0; - - LOOPBYTE(siglen) - { - diff = (unsigned char) (a[i] ^ b[i]); - /* Using the popcount functions here isn't likely to win */ - dist += pg_number_of_ones[diff]; - } - return dist; + return pg_popcount_xor((const unsigned char *) a, + (const unsigned char *) b, + siglen); } static int diff --git a/src/include/port/pg_bitutils.h b/src/include/port/pg_bitutils.h index f9b77ec278..73c923c2aa 100644 --- a/src/include/port/pg_bitutils.h +++ b/src/include/port/pg_bitutils.h @@ -208,11 +208,17 @@ pg_ceil_log2_64(uint64 num) } /* Count the number of one-bits in a uint32 or uint64 */ -extern int (*pg_popcount32) (uint32 word); -extern int (*pg_popcount64) (uint64 word); +extern int pg_popcount32 (uint32 word); +extern int pg_popcount64 (uint64 word); /* Count the number of one-bits in a byte array */ -extern uint64 pg_popcount(const char *buf, int bytes); +extern uint64 (*pg_popcount) (const char *buf, int bytes); + +/* Count the number of one-bits in the result of xor operation */ +extern uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes); + +/* Count the number of one-bits in an array of uint64, with the mask applied */ +extern uint64 pg_popcount_mask64(const uint64 *buf, int bytes, uint64 mask); /* * Rotate the bits of "word" to the right by n bits. diff --git a/src/port/pg_bitutils.c b/src/port/pg_bitutils.c index 2252021854..dccb6785a1 100644 --- a/src/port/pg_bitutils.c +++ b/src/port/pg_bitutils.c @@ -116,21 +116,21 @@ const uint8 pg_number_of_ones[256] = { #endif #endif -static int pg_popcount32_slow(uint32 word); -static int pg_popcount64_slow(uint64 word); +static uint64 pg_popcount_slow(const char *buf, int bytes); +static uint64 pg_popcount_xor_slow(const unsigned char *a, const unsigned char *b, int bytes); #ifdef USE_POPCNT_ASM static bool pg_popcount_available(void); -static int pg_popcount32_choose(uint32 word); -static int pg_popcount64_choose(uint64 word); -static int pg_popcount32_asm(uint32 word); -static int pg_popcount64_asm(uint64 word); +static uint64 pg_popcount_choose(const char *buf, int bytes); +static uint64 pg_popcount_asm(const char *buf, int bytes); +static uint64 pg_popcount_xor_choose(const unsigned char *a, const unsigned char *b, int bytes); +static uint64 pg_popcount_xor_asm(const unsigned char *a, const unsigned char *b, int bytes); -int (*pg_popcount32) (uint32 word) = pg_popcount32_choose; -int (*pg_popcount64) (uint64 word) = pg_popcount64_choose; +uint64 (*pg_popcount) (const char *buf, int bytes) = pg_popcount_choose; +uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes) = pg_popcount_xor_choose; #else -int (*pg_popcount32) (uint32 word) = pg_popcount32_slow; -int (*pg_popcount64) (uint64 word) = pg_popcount64_slow; +uint64 (*pg_popcount) (const char *buf, int bytes) = pg_popcount_slow; +uint64 (*pg_popcount_xor) (const unsigned char *a, const unsigned char *b, int bytes) = pg_popcount_xor_slow; #endif /* USE_POPCNT_ASM */ #ifdef USE_POPCNT_ASM @@ -155,50 +155,38 @@ pg_popcount_available(void) } /* - * These functions get called on the first call to pg_popcount32 etc. + * These functions get called on the first call to pg_popcount etc. * They detect whether we can use the asm implementations, and replace * the function pointers so that subsequent calls are routed directly to * the chosen implementation. */ -static int -pg_popcount32_choose(uint32 word) +static uint64 +pg_popcount_choose(const char *buf, int bytes) { if (pg_popcount_available()) - { - pg_popcount32 = pg_popcount32_asm; - pg_popcount64 = pg_popcount64_asm; - } + pg_popcount = pg_popcount_asm; else - { - pg_popcount32 = pg_popcount32_slow; - pg_popcount64 = pg_popcount64_slow; - } + pg_popcount = pg_popcount_slow; - return pg_popcount32(word); + return pg_popcount(buf, bytes); } -static int -pg_popcount64_choose(uint64 word) +static uint64 +pg_popcount_xor_choose(const unsigned char *a, const unsigned char *b, int bytes) { if (pg_popcount_available()) - { - pg_popcount32 = pg_popcount32_asm; - pg_popcount64 = pg_popcount64_asm; - } + pg_popcount_xor = pg_popcount_xor_asm; else - { - pg_popcount32 = pg_popcount32_slow; - pg_popcount64 = pg_popcount64_slow; - } + pg_popcount_xor = pg_popcount_xor_slow; - return pg_popcount64(word); + return pg_popcount_xor(a, b, bytes); } /* * pg_popcount32_asm * Return the number of 1 bits set in word */ -static int +static inline int pg_popcount32_asm(uint32 word) { uint32 res; @@ -211,7 +199,7 @@ __asm__ __volatile__(" popcntl %1,%0\n":"=q"(res):"rm"(word):"cc"); * pg_popcount64_asm * Return the number of 1 bits set in word */ -static int +static inline int pg_popcount64_asm(uint64 word) { uint64 res; @@ -222,13 +210,12 @@ __asm__ __volatile__(" popcntq %1,%0\n":"=q"(res):"rm"(word):"cc"); #endif /* USE_POPCNT_ASM */ - /* - * pg_popcount32_slow + * pg_popcount32 * Return the number of 1 bits set in word */ -static int -pg_popcount32_slow(uint32 word) +int +pg_popcount32(uint32 word) { #ifdef HAVE__BUILTIN_POPCOUNT return __builtin_popcount(word); @@ -246,11 +233,11 @@ pg_popcount32_slow(uint32 word) } /* - * pg_popcount64_slow + * pg_popcount64 * Return the number of 1 bits set in word */ -static int -pg_popcount64_slow(uint64 word) +int +pg_popcount64(uint64 word) { #ifdef HAVE__BUILTIN_POPCOUNT #if defined(HAVE_LONG_INT_64) @@ -273,13 +260,12 @@ pg_popcount64_slow(uint64 word) #endif /* HAVE__BUILTIN_POPCOUNT */ } - /* - * pg_popcount + * pg_popcount_slow * Returns the number of 1-bits in buf */ uint64 -pg_popcount(const char *buf, int bytes) +pg_popcount_slow(const char *buf, int bytes) { uint64 popcnt = 0; @@ -319,3 +305,167 @@ pg_popcount(const char *buf, int bytes) return popcnt; } + +/* + * pg_xorcount_slow + * Count the number of 1-bits in the result of xor operation. + */ +uint64 +pg_popcount_xor_slow(const unsigned char *a, const unsigned char *b, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + + /* + * We can process 64-bit chunks only if both are aligned. + */ + if (PointerIsAligned(a, uint64) && PointerIsAligned(b, uint64)) + { + const uint64 *a_words = (const uint64 *) a; + const uint64 *b_words = (const uint64 *) b; + + while (bytes >= 8) + { + popcnt += pg_popcount64(*a_words++ ^ *b_words++); + + bytes -= 8; + } + + a = (const unsigned char *) a_words; + b = (const unsigned char *) b_words; + } +#endif + // TODO: 32-bit + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[*a++ ^ *b++]; + + return popcnt; +} + +#ifdef USE_POPCNT_ASM + +/* + * pg_popcount_asm + * Returns the number of 1-bits in buf using POPCNT + */ +uint64 +pg_popcount_asm(const char *buf, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + /* Process in 64-bit chunks if the buffer is aligned. */ + if (buf == (const char *) TYPEALIGN(8, buf)) + { + const uint64 *words = (const uint64 *) buf; + + /* + * Manually unroll the loop, since the compiler won't do it for us. + */ + while (bytes >= 4 * 8) + { + popcnt += pg_popcount64_asm(words[0]); + popcnt += pg_popcount64_asm(words[1]); + popcnt += pg_popcount64_asm(words[2]); + popcnt += pg_popcount64_asm(words[3]); + + bytes -= 4 * 8; + words += 4; + } + + while (bytes >= 8) + { + popcnt += pg_popcount64_asm(*words++); + bytes -= 8; + } + + buf = (const char *) words; + } +#else + /* Process in 32-bit chunks if the buffer is aligned. */ + if (buf == (const char *) TYPEALIGN(4, buf)) + { + const uint32 *words = (const uint32 *) buf; + + while (bytes >= 4) + { + popcnt += pg_popcount32_asm(*words++); + bytes -= 4; + } + + buf = (const char *) words; + } +#endif + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[(unsigned char) *buf++]; + + return popcnt; +} + +/* + * pg_xorcount_slow + * Count the number of 1-bits in the result of xor operation. + */ +uint64 +pg_popcount_xor_asm(const unsigned char *a, const unsigned char *b, int bytes) +{ + uint64 popcnt = 0; + +#if SIZEOF_VOID_P >= 8 + + /* + * We can process 64-bit chunks only if both are aligned. + */ + if (PointerIsAligned(a, uint64) && PointerIsAligned(b, uint64)) + { + const uint64 *a_words = (const uint64 *) a; + const uint64 *b_words = (const uint64 *) b; + + while (bytes >= 8) + { + popcnt += pg_popcount64_asm(*a_words++ ^ *b_words++); + + bytes -= 8; + } + + a = (const unsigned char *) a_words; + b = (const unsigned char *) b_words; + } +#endif + // TODO: 32-bit + + /* Process any remaining bytes */ + while (bytes--) + popcnt += pg_number_of_ones[*a++ ^ *b++]; + + return popcnt; +} + +#endif /* USE_POPCNT_ASM */ + +/* + * pg_popcount_mask64 + * Returns the number of 1-bits in buf after applying the mask + * + * Note: We currently only need a 64-bit version for the visibility map. + */ +uint64 +pg_popcount_mask64(const uint64 *buf, int bytes, uint64 mask) +{ + uint64 popcnt = 0; + + Assert(bytes % sizeof(uint64) == 0); + + while (bytes >= 8) + { + popcnt += pg_popcount64(*buf++ & mask); + bytes -= 8; + } + + return popcnt; +}