On Fri, Sep 16, 2022 at 02:54:14PM +0700, John Naylor wrote:
> v6 demonstrates why this should have been put off towards the end. (more 
> below)

Since the SIMD code is fresh in my mind, I wanted to offer my review for
0001 in the "Improve dead tuple storage for lazy vacuum" thread [0].
However, I agree with John that the SIMD part of that work should be left
for the end, and I didn't want to distract from the radix tree part too
much.  So, here is a new thread for just the SIMD part.

>> I've updated the radix tree patch. It's now separated into two patches.
>>
>> 0001 patch introduces pg_lsearch8() and pg_lsearch8_ge() (we may find
>> better names) that are similar to the pg_lfind8() family but they
>> return the index of the key in the vector instead of true/false. The
>> patch includes regression tests.

I don't think it's clear that the "lfind" functions return whether there is
a match while the "lsearch" functions return the index of the first match.
It might be better to call these something like "pg_lfind8_idx" and
"pg_lfind8_ge_idx" instead.

> +/*
> + * Return the index of the first element in the vector that is greater than
> + * or eual to the given scalar. Return sizeof(Vector8) if there is no such
> + * element.
>
> That's a bizarre API to indicate non-existence.

+1.  It should probably just return -1 in that case.

> + *
> + * Note that this function assumes the elements in the vector are sorted.
> + */
>
> That is *completely* unacceptable for a general-purpose function.

+1

> +#else /* USE_NO_SIMD */
> + Vector8 r = 0;
> + uint8 *rp = (uint8 *) &r;
> +
> + for (Size i = 0; i < sizeof(Vector8); i++)
> + rp[i] = (((const uint8 *) &v1)[i] == ((const uint8 *) &v2)[i]) ? 0xFF : 0;
>
> I don't think we should try to force the non-simd case to adopt the
> special semantics of vector comparisons. It's much easier to just use
> the same logic as the assert builds.

+1

> +#ifdef USE_SSE2
> + return (uint32) _mm_movemask_epi8(v);
> +#elif defined(USE_NEON)
> + static const uint8 mask[16] = {
> +        1 << 0, 1 << 1, 1 << 2, 1 << 3,
> +        1 << 4, 1 << 5, 1 << 6, 1 << 7,
> +        1 << 0, 1 << 1, 1 << 2, 1 << 3,
> +        1 << 4, 1 << 5, 1 << 6, 1 << 7,
> +      };
> +
> +    uint8x16_t masked = vandq_u8(vld1q_u8(mask), (uint8x16_t)
> vshrq_n_s8(v, 7));
> +    uint8x16_t maskedhi = vextq_u8(masked, masked, 8);
> +
> +    return (uint32) vaddvq_u16((uint16x8_t) vzip1q_u8(masked, maskedhi));
>
> For Arm, we need to be careful here. This article goes into a lot of
> detail for this situation:
>
> https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/porting-x86-vector-bitmask-optimizations-to-arm-neon

The technique demonstrated in this article seems to work nicely.

For these kinds of patches, I find the best way to review them is to try
out my proposed changes as I'm reading through the patch.  I hope you don't
mind that I've done so here and attached a new version of the patch.  In
addition to addressing the aforementioned feedback, I made the following
changes:

* I renamed the vector8_search_* functions to vector8_find() and
vector8_find_ge().  IMO this is more in the spirit of existing function
names like vector8_has().

* I simplified vector8_find_ge() by essentially making it do the opposite
of what vector8_has_le() does (i.e., using saturating subtraction to find
matching bytes).  This removes the need for vector8_min(), and since
vector8_find_ge() can just call vector8_search() to find any 0 bytes,
vector8_highbit_mask() can be removed as well.

* I simplified the test for pg_lfind8_ge_idx() by making it look a little
more like the test for pg_lfind32().  I wasn't sure about the use of rand()
and qsort(), and overall it just felt a little too complicated to me.

I've tested all three code paths (i.e., SSE2, Neon, and USE_NO_SIMD), but I
haven't done any performance analysis yet.

[0] 
https://postgr.es/m/CAD21AoD3w76wERs_Lq7_uA6%2BgTaoOERPji%2BYz8Ac6aui4JwvTg%40mail.gmail.com

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From fcbb68b7d2bb9df63c92bc773240873e1e27a5a8 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Fri, 16 Sep 2022 20:44:03 -0700
Subject: [PATCH v1 1/1] introduce pg_lfind8_idx and pg_lfind8_ge_idx

---
 src/include/port/pg_lfind.h                   |  72 +++++++++++++
 src/include/port/simd.h                       | 100 ++++++++++++++++++
 .../test_lfind/expected/test_lfind.out        |  12 +++
 .../modules/test_lfind/sql/test_lfind.sql     |   2 +
 .../modules/test_lfind/test_lfind--1.0.sql    |   8 ++
 src/test/modules/test_lfind/test_lfind.c      |  81 +++++++++++++-
 6 files changed, 274 insertions(+), 1 deletion(-)

diff --git a/src/include/port/pg_lfind.h b/src/include/port/pg_lfind.h
index 0625cac6b5..34cf30e591 100644
--- a/src/include/port/pg_lfind.h
+++ b/src/include/port/pg_lfind.h
@@ -48,6 +48,42 @@ pg_lfind8(uint8 key, uint8 *base, uint32 nelem)
 	return false;
 }
 
+/*
+ * pg_lfind8_idx
+ *
+ * Return index of the first element in 'base' that equals 'key'.  Return -1 if
+ * there is no such element.
+ */
+static inline int
+pg_lfind8_idx(uint8 key, uint8 *base, int nelem)
+{
+	int			i = 0;
+
+#ifndef USE_NO_SIMD
+	/* round down to multiple of vector length */
+	int			tail_idx = nelem & ~(sizeof(Vector8) - 1);
+	Vector8		chunk;
+
+	for (; i < tail_idx; i += sizeof(Vector8))
+	{
+		int			idx;
+
+		vector8_load(&chunk, &base[i]);
+		if ((idx = vector8_find(chunk, key)) != -1)
+			return i + idx;
+	}
+#endif
+
+	/* Process the remaining elements one at a time. */
+	for (; i < nelem; i++)
+	{
+		if (key == base[i])
+			return i;
+	}
+
+	return -1;
+}
+
 /*
  * pg_lfind8_le
  *
@@ -80,6 +116,42 @@ pg_lfind8_le(uint8 key, uint8 *base, uint32 nelem)
 	return false;
 }
 
+/*
+ * pg_lfind8_ge_idx
+ *
+ * Return index of the first element in 'base' that is greater than or equal to
+ * 'key'.  Return -1 if there is no such element.
+ */
+static inline int
+pg_lfind8_ge_idx(uint8 key, uint8 *base, int nelem)
+{
+	int			i = 0;
+
+#ifndef USE_NO_SIMD
+	/* round down to multiple of vector length */
+	int			tail_idx = nelem & ~(sizeof(Vector8) - 1);
+	Vector8		chunk;
+
+	for (; i < tail_idx; i += sizeof(Vector8))
+	{
+		int			idx;
+
+		vector8_load(&chunk, &base[i]);
+		if ((idx = vector8_find_ge(chunk, key)) != -1)
+			return i + idx;
+	}
+#endif
+
+	/* Process the remaining elements one at a time. */
+	for (; i < nelem; i++)
+	{
+		if (base[i] >= key)
+			return i;
+	}
+
+	return -1;
+}
+
 /*
  * pg_lfind32
  *
diff --git a/src/include/port/simd.h b/src/include/port/simd.h
index 61ae4ecf60..e79d2ad5e4 100644
--- a/src/include/port/simd.h
+++ b/src/include/port/simd.h
@@ -60,6 +60,15 @@ typedef uint32x4_t Vector32;
 typedef uint64 Vector8;
 #endif
 
+/*
+ * Some of the functions with SIMD implementations use bitwise operations
+ * available in pg_bitutils.h.  There are currently no non-SIMD implementations
+ * that require these bitwise operations.
+ */
+#ifndef USE_NO_SIMD
+#include "port/pg_bitutils.h"
+#endif
+
 /* load/store operations */
 static inline void vector8_load(Vector8 *v, const uint8 *s);
 #ifndef USE_NO_SIMD
@@ -79,6 +88,8 @@ static inline bool vector8_has_le(const Vector8 v, const uint8 c);
 static inline bool vector8_is_highbit_set(const Vector8 v);
 #ifndef USE_NO_SIMD
 static inline bool vector32_is_highbit_set(const Vector32 v);
+static inline int vector8_find(const Vector8 v, const uint8 c);
+static inline int vector8_find_ge(const Vector8 v, const uint8 c);
 #endif
 
 /* arithmetic operations */
@@ -299,6 +310,95 @@ vector32_is_highbit_set(const Vector32 v)
 }
 #endif							/* ! USE_NO_SIMD */
 
+/*
+ * Return index of the first element in the vector that is equal to the given
+ * scalar.  Return -1 if there is no such element.
+ */
+#ifndef USE_NO_SIMD
+static inline int
+vector8_find(const Vector8 v, const uint8 c)
+{
+	Vector8		cmp;
+	int			result = -1;
+#if defined(USE_SSE2)
+	uint32		mask;
+#elif defined(USE_NEON)
+	uint64		mask;
+#endif
+
+	/* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+	int			assert_result = -1;
+
+	for (Size i = 0; i < sizeof(Vector8); i++)
+	{
+		if (((const uint8 *) &v)[i] == c)
+		{
+			assert_result = i;
+			break;
+		}
+	}
+#endif							/* USE_ASSERT_CHECKING */
+
+	cmp = vector8_eq(v, vector8_broadcast(c));
+
+#if defined(USE_SSE2)
+	mask = _mm_movemask_epi8(cmp);
+	if (mask)
+		result = pg_rightmost_one_pos32(mask);
+#elif defined(USE_NEON)
+	/*
+	 * Adapted from
+	 * https://community.arm.com/arm-community-blogs/b/infrastructure-solutions-blog/posts/porting-x86-vector-bitmask-optimizations-to-arm-neon
+	 */
+	mask = vget_lane_u64((uint64x1_t) vshrn_n_u16((uint16x8_t) cmp, 4), 0);
+	if (mask)
+		result = pg_rightmost_one_pos64(mask) >> 2;
+#endif
+
+	Assert(assert_result == result);
+	return result;
+}
+#endif							/* ! USE_NO_SIMD */
+
+/*
+ * Return index of the first element in the vector that is greater than or
+ * equal to the given scalar.  Return -1 is there is no such element.
+ */
+#ifndef USE_NO_SIMD
+static inline int
+vector8_find_ge(const Vector8 v, const uint8 c)
+{
+	Vector8		sub;
+	int			result;
+
+	/* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+	int			assert_result = -1;
+
+	for (Size i = 0; i < sizeof(Vector8); i++)
+	{
+		if (((const uint8 *) &v)[i] >= c)
+		{
+			assert_result = i;
+			break;
+		}
+	}
+#endif                          /* USE_ASSERT_CHECKING */
+
+	/*
+	 * Use saturating subtraction to find bytes >= c, which will present as
+	 * NUL bytes.  This approach is a workaround for the lack of unsigned
+	 * comparison instructions on some architectures.
+	 */
+	sub = vector8_ssub(vector8_broadcast(c), v);
+	result = vector8_find(sub, 0);
+
+	Assert(assert_result == result);
+	return result;
+}
+#endif							/* ! USE_NO_SIMD */
+
 /*
  * Return the bitwise OR of the inputs
  */
diff --git a/src/test/modules/test_lfind/expected/test_lfind.out b/src/test/modules/test_lfind/expected/test_lfind.out
index 1d4b14e703..30ecad4e9e 100644
--- a/src/test/modules/test_lfind/expected/test_lfind.out
+++ b/src/test/modules/test_lfind/expected/test_lfind.out
@@ -22,3 +22,15 @@ SELECT test_lfind32();
  
 (1 row)
 
+SELECT test_lfind8_idx();
+ test_lfind8_idx 
+-----------------
+ 
+(1 row)
+
+SELECT test_lfind8_ge_idx();
+ test_lfind8_ge_idx 
+--------------------
+ 
+(1 row)
+
diff --git a/src/test/modules/test_lfind/sql/test_lfind.sql b/src/test/modules/test_lfind/sql/test_lfind.sql
index 766c640831..0c01497aef 100644
--- a/src/test/modules/test_lfind/sql/test_lfind.sql
+++ b/src/test/modules/test_lfind/sql/test_lfind.sql
@@ -8,3 +8,5 @@ CREATE EXTENSION test_lfind;
 SELECT test_lfind8();
 SELECT test_lfind8_le();
 SELECT test_lfind32();
+SELECT test_lfind8_idx();
+SELECT test_lfind8_ge_idx();
diff --git a/src/test/modules/test_lfind/test_lfind--1.0.sql b/src/test/modules/test_lfind/test_lfind--1.0.sql
index 81801926ae..50b635794d 100644
--- a/src/test/modules/test_lfind/test_lfind--1.0.sql
+++ b/src/test/modules/test_lfind/test_lfind--1.0.sql
@@ -14,3 +14,11 @@ CREATE FUNCTION test_lfind8()
 CREATE FUNCTION test_lfind8_le()
 	RETURNS pg_catalog.void
 	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_lfind8_idx()
+	RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_lfind8_ge_idx()
+	RETURNS pg_catalog.void
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_lfind/test_lfind.c b/src/test/modules/test_lfind/test_lfind.c
index 82673d54c6..6aa33edb3b 100644
--- a/src/test/modules/test_lfind/test_lfind.c
+++ b/src/test/modules/test_lfind/test_lfind.c
@@ -115,11 +115,90 @@ test_lfind8_le(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* workhorse for test_lfind8_idx */
+static void
+test_lfind8_idx_internal(uint8 key)
+{
+	uint8		charbuf[LEN_WITH_TAIL(Vector8)];
+	const int	len_no_tail = LEN_NO_TAIL(Vector8);
+	const int	len_with_tail = LEN_WITH_TAIL(Vector8);
+	int			keypos;
+
+	memset(charbuf, 0xFF, len_with_tail);
+	/* search tail to test one-byte-at-a-time path */
+	keypos = len_with_tail - 1;
+	charbuf[keypos] = key;
+	if (key > 0x00 && (pg_lfind8_idx(key - 1, charbuf, len_with_tail) != -1))
+		elog(ERROR, "pg_lfind8_idx() found nonexistent element '0x%x'", key - 1);
+	if (key < 0xFF && (pg_lfind8_idx(key, charbuf, len_with_tail) != keypos))
+		elog(ERROR, "pg_lfind8_idx() did not find existing element '0x%x'", key);
+	if (key < 0xFE && (pg_lfind8_idx(key + 1, charbuf, len_with_tail) != -1))
+		elog(ERROR, "pg_lfind8_idx() found nonexistent element '0x%x'", key + 1);
+
+	memset(charbuf, 0xFF, len_with_tail);
+	/* search with vector operations */
+	keypos = len_no_tail - 1;
+	charbuf[keypos] = key;
+	if (key > 0x00 && (pg_lfind8_idx(key - 1, charbuf, len_no_tail) != -1))
+		elog(ERROR, "pg_lfind8_idx() found nonexistent element '0x%x'", key - 1);
+	if (key < 0xFF && (pg_lfind8_idx(key, charbuf, len_no_tail) != keypos))
+		elog(ERROR, "pg_lfind8_idx() did not find existing element '0x%x'", key);
+	if (key < 0xFE && (pg_lfind8_idx(key + 1, charbuf, len_no_tail) != -1))
+		elog(ERROR, "pg_lfind8_idx() found nonexistent element '0x%x'", key + 1);
+}
+
+PG_FUNCTION_INFO_V1(test_lfind8_idx);
+Datum
+test_lfind8_idx(PG_FUNCTION_ARGS)
+{
+	test_lfind8_idx_internal(0);
+	test_lfind8_idx_internal(1);
+	test_lfind8_idx_internal(0x7F);
+	test_lfind8_idx_internal(0x80);
+	test_lfind8_idx_internal(0x81);
+	test_lfind8_idx_internal(0xFD);
+	test_lfind8_idx_internal(0xFE);
+	test_lfind8_idx_internal(0xFF);
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(test_lfind8_ge_idx);
+Datum
+test_lfind8_ge_idx(PG_FUNCTION_ARGS)
+{
+#define TEST_ARRAY_SIZE 135
+	uint8		test_array[TEST_ARRAY_SIZE] = {0};
+
+	test_array[8] = 1;
+	test_array[64] = 3;
+	test_array[TEST_ARRAY_SIZE - 1] = 5;
+
+	if (pg_lfind8_ge_idx(1, test_array, 4) != -1)
+		elog(ERROR, "pg_lfind8_ge_idx found nonexistent element");
+	if (pg_lfind8_ge_idx(1, test_array, TEST_ARRAY_SIZE) != 8)
+		elog(ERROR, "pg_lfind8_ge_idx did not find existing element");
+
+	if (pg_lfind8_ge_idx(2, test_array, 32) != -1)
+		elog(ERROR, "pg_lfind8_ge_idx found nonexistent element");
+	if (pg_lfind8_ge_idx(2, test_array, TEST_ARRAY_SIZE) != 64)
+		elog(ERROR, "pg_lfind8_ge_idx did not find existing element");
+
+	if (pg_lfind8_ge_idx(4, test_array, 96) != -1)
+		elog(ERROR, "pg_lfind8_ge_idx found nonexistent element");
+	if (pg_lfind8_ge_idx(4, test_array, TEST_ARRAY_SIZE) != TEST_ARRAY_SIZE - 1)
+		elog(ERROR, "pg_lfind8_ge_idx did not find existing element");
+
+	if (pg_lfind8_ge_idx(6, test_array, TEST_ARRAY_SIZE) != -1)
+		elog(ERROR, "pg_lfind8_ge_idx found nonexistent element");
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(test_lfind32);
 Datum
 test_lfind32(PG_FUNCTION_ARGS)
 {
-#define TEST_ARRAY_SIZE 135
 	uint32		test_array[TEST_ARRAY_SIZE] = {0};
 
 	test_array[8] = 1;
-- 
2.25.1

Reply via email to