From 015afdfa5b1eccc039bc1c276dd7a51d3729257a Mon Sep 17 00:00:00 2001
From: Chiranmoy Bhattacharya <chiranmoy.bhattacharya@fujitsu.com>
Date: Tue, 4 Feb 2025 11:26:41 +0530
Subject: [PATCH v3] SVE support for hex encode and hex decode

---
 config/c-compiler.m4           |  58 +++++++++
 configure                      |  79 ++++++++++++
 configure.ac                   |   9 ++
 meson.build                    |  56 +++++++++
 src/backend/utils/adt/encode.c | 212 ++++++++++++++++++++++++++++++++-
 src/include/pg_config.h.in     |   3 +
 src/include/utils/builtins.h   |  55 ++++++++-
 7 files changed, 466 insertions(+), 6 deletions(-)

diff --git a/config/c-compiler.m4 b/config/c-compiler.m4
index 8534cc54c1..d99ecfb2a7 100644
--- a/config/c-compiler.m4
+++ b/config/c-compiler.m4
@@ -704,3 +704,61 @@ if test x"$Ac_cachevar" = x"yes"; then
 fi
 undefine([Ac_cachevar])dnl
 ])# PGAC_AVX512_POPCNT_INTRINSICS
+
+# PGAC_ARM_SVE_HEX_INTRINSICS
+# ------------------------------
+# Check if the compiler supports the ARM SVE intrinsic required for hex coding:
+# svtbl, svlsr_x, svand_z, svcreate2, etc.
+#
+# If the intrinsics are supported, sets pgac_arm_sve_hex_intrinsics.
+AC_DEFUN([PGAC_ARM_SVE_HEX_INTRINSICS],
+[define([Ac_cachevar], [AS_TR_SH([pgac_cv_arm_sve_hex_intrinsics])])dnl
+AC_CACHE_CHECK([for svtbl, svlsr_x, svand_z, svcreate2, etc], [Ac_cachevar],
+[AC_LINK_IFELSE([AC_LANG_PROGRAM([#include <arm_sve.h>
+    #if defined(__has_attribute) && __has_attribute (target)
+        __attribute__((target("arch=armv8-a+sve")))
+    #endif
+    static int hex_coding_test(void)
+    {
+      int vec_len = svcntb();
+      char input@<:@32@:>@;
+      char output@<:@32@:>@;
+      svbool_t pred = svptrue_b8(), cmp1, cmp2;
+      svuint8_t bytes, hextbl_vec;
+      svuint8x2_t	merged;
+
+      if (vec_len >= 16)
+      {
+        /* intrinsics used in hex_encode_sve */
+        hextbl_vec = svld1(svwhilelt_b8(0, 16), (uint8_t *) "0123456789ABCDEF");
+        bytes = svld1(pred, (uint8_t *) input);
+        bytes = svlsr_x(pred, bytes, 4);
+        bytes = svand_x(pred, bytes, 0xF);
+        merged = svcreate2(svtbl(hextbl_vec, bytes), svtbl(hextbl_vec, bytes));
+        svst2(pred, (uint8_t *) output, merged);
+
+        /* intrinsics used in hex_decode_sve */
+        bytes = svget2(svld2(pred, (uint8_t *) output), 0);
+        bytes = svsub_x(pred, bytes, 48);
+        cmp1 = svcmplt(pred, bytes, 16);
+        cmp2 = svcmpgt(pred, bytes, 9);
+        if (svptest_any(pred, svnot_z(pred, svorr_z(pred, cmp1, cmp2))))
+          return 0;
+        bytes = svsel(svand_z(pred, cmp1, cmp2), bytes, bytes);
+        bytes = svlsl_x(pred, bytes, svcntp_b8(pred, pred));
+        svst1(pred, output, bytes);
+
+        /* return computed value, to prevent the above being optimized away */
+        return output@<:@0@:>@ == 0;
+      }
+
+      return 0;
+    }],
+  [return hex_coding_test();])],
+  [Ac_cachevar=yes],
+  [Ac_cachevar=no])])
+if test x"$Ac_cachevar" = x"yes"; then
+  pgac_arm_sve_hex_intrinsics=yes
+fi
+undefine([Ac_cachevar])dnl
+])# PGAC_ARM_SVE_HEX_INTRINSICS
diff --git a/configure b/configure
index ceeef9b091..e445cb1451 100755
--- a/configure
+++ b/configure
@@ -17168,6 +17168,85 @@ $as_echo "#define USE_AVX512_POPCNT_WITH_RUNTIME_CHECK 1" >>confdefs.h
   fi
 fi
 
+# Check for ARM SVE intrinsics for hex coding
+#
+if test x"$host_cpu" = x"aarch64"; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for svtbl, svlsr_z, svand_z, svcreate2, etc" >&5
+$as_echo_n "checking for svtbl, svlsr_z, svand_z, svcreate2, etc... " >&6; }
+if ${pgac_cv_arm_sve_hex_intrinsics+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+#include <arm_sve.h>
+    #if defined(__has_attribute) && __has_attribute (target)
+        __attribute__((target("arch=armv8-a+sve")))
+    #endif
+    static int hex_coding_test(void)
+    {
+      int vec_len = svcntb();
+      char input[32];
+      char output[32];
+      svbool_t pred = svptrue_b8(), cmp1, cmp2;
+      svuint8_t bytes, hextbl_vec;
+      svuint8x2_t	merged;
+
+      if (vec_len >= 16)
+      {
+        /* intrinsics used in hex_encode_sve */
+        hextbl_vec = svld1(svwhilelt_b8(0, 16), (uint8_t *) "0123456789ABCDEF");
+        bytes = svld1(pred, (uint8_t *) input);
+        bytes = svlsr_x(pred, bytes, 4);
+        bytes = svand_x(pred, bytes, 0xF);
+        merged = svcreate2(svtbl(hextbl_vec, bytes), svtbl(hextbl_vec, bytes));
+        svst2(pred, (uint8_t *) output, merged);
+
+        /* intrinsics used in hex_decode_sve */
+        bytes = svget2(svld2(pred, (uint8_t *) output), 0);
+        bytes = svsub_x(pred, bytes, 48);
+        cmp1 = svcmplt(pred, bytes, 16);
+        cmp2 = svcmpgt(pred, bytes, 9);
+        if (svptest_any(pred, svnot_z(pred, svorr_z(pred, cmp1, cmp2))))
+          return 0;
+        bytes = svsel(svand_z(pred, cmp1, cmp2), bytes, bytes);
+        bytes = svlsl_x(pred, bytes, svcntp_b8(pred, pred));
+        svst1(pred, output, bytes);
+
+        /* return computed value, to prevent the above being optimized away */
+        return output[0] == 0;
+      }
+
+      return 0;
+    }
+int
+main ()
+{
+return hex_coding_test();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  pgac_cv_arm_sve_hex_intrinsics=yes
+else
+  pgac_cv_arm_sve_hex_intrinsics=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $pgac_cv_arm_sve_hex_intrinsics" >&5
+$as_echo "$pgac_cv_arm_sve_hex_intrinsics" >&6; }
+if test x"$pgac_cv_arm_sve_hex_intrinsics" = x"yes"; then
+  pgac_arm_sve_hex_intrinsics=yes
+fi
+
+  if test x"$pgac_arm_sve_hex_intrinsics" = x"yes"; then
+
+$as_echo "#define USE_SVE_WITH_RUNTIME_CHECK 1" >>confdefs.h
+
+  fi
+fi
+
 # Check for Intel SSE 4.2 intrinsics to do CRC calculations.
 #
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for _mm_crc32_u8 and _mm_crc32_u32" >&5
diff --git a/configure.ac b/configure.ac
index d713360f34..2dbb678cae 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2021,6 +2021,15 @@ if test x"$host_cpu" = x"x86_64"; then
   fi
 fi
 
+# Check for ARM SVE intrinsics for hex coding
+#
+if test x"$host_cpu" = x"aarch64"; then
+  PGAC_ARM_SVE_HEX_INTRINSICS()
+  if test x"$pgac_arm_sve_hex_intrinsics" = x"yes"; then
+    AC_DEFINE(USE_SVE_WITH_RUNTIME_CHECK, 1, [Define to 1 to use ARM SVE intrinsic for hex coding.])
+  fi
+fi
+
 # Check for Intel SSE 4.2 intrinsics to do CRC calculations.
 #
 PGAC_SSE42_CRC32_INTRINSICS()
diff --git a/meson.build b/meson.build
index 8e128f4982..6a10331acf 100644
--- a/meson.build
+++ b/meson.build
@@ -2194,6 +2194,62 @@ int main(void)
 endif
 
 
+###############################################################
+# Check the availability of ARM SVE intrinsics for hex coding.
+###############################################################
+
+if host_cpu == 'aarch64'
+
+  prog = '''
+#include <arm_sve.h>
+#if defined(__has_attribute) && __has_attribute (target)
+    __attribute__((target("arch=armv8-a+sve")))
+#endif
+int main(void)
+{
+    int vec_len = svcntb();
+    char input[64] = {0};
+    char output[64] = {0};
+    svbool_t pred = svptrue_b8(), cmp1, cmp2;
+    svuint8_t bytes, hextbl_vec;
+    svuint8x2_t	merged;
+
+    if (vec_len >= 16)
+    {
+      /* intrinsics used in hex_encode_sve */
+      hextbl_vec = svld1(svwhilelt_b8(0, 16), (uint8_t *) "0123456789ABCDEF");
+      bytes = svld1(pred, (uint8_t *) input);
+      bytes = svlsr_x(pred, bytes, 4);
+      bytes = svand_x(pred, bytes, 0xF);
+      merged = svcreate2(svtbl(hextbl_vec, bytes), svtbl(hextbl_vec, bytes));
+      svst2(pred, (uint8_t *) output, merged);
+
+      /* intrinsics used in hex_decode_sve */
+      bytes = svget2(svld2(pred, (uint8_t *) output), 0);
+      bytes = svsub_x(pred, bytes, 48);
+      cmp1 = svcmplt(pred, bytes, 16);
+      cmp2 = svcmpgt(pred, bytes, 9);
+      if (svptest_any(pred, svnot_z(pred, svorr_z(pred, cmp1, cmp2))))
+        return 0;
+      bytes = svsel(svand_z(pred, cmp1, cmp2), bytes, bytes);
+      bytes = svlsl_x(pred, bytes, svcntp_b8(pred, pred));
+      svst1(pred, output, bytes);
+
+      /* return computed value, to prevent the above being optimized away */
+      return output[0] == 0;
+    }
+
+    return 0;
+}
+'''
+
+  if cc.links(prog, name: 'ARM SVE hex coding', args: test_c_args)
+    cdata.set('USE_SVE_WITH_RUNTIME_CHECK', 1)
+  endif
+
+endif
+
+
 ###############################################################
 # Select CRC-32C implementation.
 #
diff --git a/src/backend/utils/adt/encode.c b/src/backend/utils/adt/encode.c
index 4ccaed815d..cf0137a1f1 100644
--- a/src/backend/utils/adt/encode.c
+++ b/src/backend/utils/adt/encode.c
@@ -20,6 +20,12 @@
 #include "utils/memutils.h"
 #include "varatt.h"
 
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+#include <arm_sve.h>
+#if defined(HAVE_ELF_AUX_INFO) || defined(HAVE_GETAUXVAL)
+#include <sys/auxv.h>
+#endif
+#endif
 
 /*
  * Encoding conversion API.
@@ -177,8 +183,81 @@ static const int8 hexlookup[128] = {
 	-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
 };
 
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+static uint64 hex_encode_sve(const char *src, size_t len, char *dst);
+static uint64 hex_decode_sve(const char *src, size_t len, char *dst);
+static uint64 hex_decode_safe_sve(const char *src, size_t len, char *dst,
+								  Node *escontext);
+static uint64 hex_encode_choose(const char *src, size_t len, char *dst);
+static uint64 hex_decode_choose(const char *src, size_t len, char *dst);
+static uint64 hex_decode_safe_choose(const char *src, size_t len, char *dst,
+									 Node *escontext);
+uint64 (*hex_encode_optimized)
+	   (const char *src, size_t len, char *dst) = hex_encode_choose;
+uint64 (*hex_decode_optimized)
+	   (const char *src, size_t len, char *dst) = hex_decode_choose;
+uint64 (*hex_decode_safe_optimized)
+	   (const char *src, size_t len, char *dst, Node *escontext) =
+		hex_decode_safe_choose;
+
+/*
+ * Returns true if the CPU supports SVE instructions.
+ */
+static inline bool
+check_sve_support(void)
+{
+#if defined(HAVE_ELF_AUX_INFO) && defined(__aarch64__)  /* FreeBSD */
+	unsigned long value;
+	return elf_aux_info(AT_HWCAP, &value, sizeof(value)) == 0 &&
+		(value & HWCAP_SVE) != 0;
+#elif defined(HAVE_GETAUXVAL) && defined(__aarch64__)   /* Linux */
+	return (getauxval(AT_HWCAP) & HWCAP_SVE) != 0;
+#else
+	return false;
+#endif
+}
+
+static inline void
+choose_hex_functions(void)
+{
+	if (check_sve_support())
+	{
+		hex_encode_optimized = hex_encode_sve;
+		hex_decode_optimized = hex_decode_sve;
+		hex_decode_safe_optimized = hex_decode_safe_sve;
+	}
+	else
+	{
+		hex_encode_optimized = hex_encode_scalar;
+		hex_decode_optimized = hex_decode_scalar;
+		hex_decode_safe_optimized = hex_decode_safe_scalar;
+	}
+}
+
+static uint64
+hex_encode_choose(const char *src, size_t len, char *dst)
+{
+	choose_hex_functions();
+	return hex_encode_optimized(src, len, dst);
+}
+
+static uint64
+hex_decode_choose(const char *src, size_t len, char *dst)
+{
+	choose_hex_functions();
+	return hex_decode_optimized(src, len, dst);
+}
+
+static uint64
+hex_decode_safe_choose(const char *src, size_t len, char *dst, Node *escontext)
+{
+	choose_hex_functions();
+	return hex_decode_safe_optimized(src, len, dst, escontext);
+}
+#endif							/* USE_SVE_WITH_RUNTIME_CHECK */
+
 uint64
-hex_encode(const char *src, size_t len, char *dst)
+hex_encode_scalar(const char *src, size_t len, char *dst)
 {
 	const char *end = src + len;
 
@@ -208,13 +287,13 @@ get_hex(const char *cp, char *out)
 }
 
 uint64
-hex_decode(const char *src, size_t len, char *dst)
+hex_decode_scalar(const char *src, size_t len, char *dst)
 {
 	return hex_decode_safe(src, len, dst, NULL);
 }
 
 uint64
-hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext)
+hex_decode_safe_scalar(const char *src, size_t len, char *dst, Node *escontext)
 {
 	const char *s,
 			   *srcend;
@@ -254,6 +333,133 @@ hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext)
 	return p - dst;
 }
 
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+/*
+ * SVE implementation of hex_encode and hex_decode.
+ */
+
+pg_attribute_target("arch=armv8-a+sve")
+uint64
+hex_encode_sve(const char *src, size_t len, char *dst)
+{
+	const char	hextbl[] = "0123456789abcdef";
+	svbool_t	pred;
+	svuint8_t	bytes,
+				high,
+				low,
+				hextbl_vec = svld1(svwhilelt_b8(0, 16), (uint8 *) hextbl);
+	svuint8x2_t	merged;
+	uint32 		vec_len = svcntb();
+
+	for (size_t i = 0; i < len; i += vec_len)
+	{
+		pred = svwhilelt_b8((uint64) i, (uint64) len);
+		bytes = svld1(pred, (uint8 *) src);
+		high = svlsr_x(pred, bytes, 4);	/* shift-right to get the high nibble */
+		low = svand_z(pred, bytes, 0xF);   /* mask high to get the low nibble */
+
+		/*
+		 * Convert the nibbles to hex digits by indexing into hextbl_vec,
+		 * for example, a nibble value of 10 indexed into hextbl_vec gives 'a'.
+		 * Finally, interleave the high and low nibbles.
+		 */
+		merged = svcreate2(svtbl(hextbl_vec, high), svtbl(hextbl_vec, low));
+		svst2(pred, (uint8 *) dst, merged);
+
+		dst += 2 * vec_len;
+		src += vec_len;
+	}
+
+	return (uint64) len * 2;
+}
+
+pg_attribute_target("arch=armv8-a+sve")
+static inline bool
+get_hex_sve(svbool_t pred, svuint8_t vec, svuint8_t *res)
+{
+	/*
+	 * Convert ASCII values '0'-'9' to integers 0-9 by subtracting 48.
+	 * Similarly, convert letters 'A'-'F' and 'a'-'f' to integers 10-15.
+	 */
+	svuint8_t	dgt_vec = svsub_x(pred, vec, 48),
+				cap_vec = svsub_x(pred, vec, 55),
+				sml_vec = svsub_x(pred, vec, 87),
+				ltr_vec;
+	/*
+	 * Identify valid integers in dgt_vec, cap_vec, and sml_vec.
+	 * Integers 0-9 are valid in dgt_vec, while integers 10-15 are valid
+	 * in cap_vec and sml_vec.
+	 */
+	svbool_t	valid_dgt = svcmplt(pred, dgt_vec, 10),
+				valid_ltr;
+
+	/* Combine cap_vec and sml_vec and mark the valid range 10-15. */
+	ltr_vec = svsel(svcmplt(pred, cap_vec, 16), cap_vec, sml_vec);
+	valid_ltr = svand_z(pred, svcmpgt(pred, ltr_vec, 9),
+							  svcmplt(pred, ltr_vec, 16));
+	/*
+	 * Check for invalid hexadecimal digits. Each value must fall
+	 * within the range 0-9 (true in valid_dgt) or 10-15 (true in valid_ltr).
+	 */
+	if (svptest_any(pred, svnot_z(pred, svorr_z(pred, valid_dgt, valid_ltr))))
+		return false;
+
+	/* Finally, combine dgt_vec and ltr_vec */
+	*res = svsel(valid_dgt, dgt_vec, ltr_vec);
+	return true;
+}
+
+uint64
+hex_decode_sve(const char *src, size_t len, char *dst)
+{
+	return hex_decode_safe_sve(src, len, dst, NULL);
+}
+
+pg_attribute_target("arch=armv8-a+sve")
+uint64
+hex_decode_safe_sve(const char *src, size_t len, char *dst, Node *escontext)
+{
+	svbool_t	pred;
+	svuint8x2_t	bytes;
+	svuint8_t	high,
+				low;
+	uint32		processed;
+	size_t		i = 0,
+				loop_bytes = len & ~1;	/* handles inputs of odd length */
+	const char *p = dst;
+
+	while (i < loop_bytes)
+	{
+		pred = svwhilelt_b8((uint64) i / 2, (uint64) len / 2);
+		bytes = svld2(pred, (uint8 *) src);
+		high = svget2(bytes, 0);	/* hex digits for high nibble */
+		low = svget2(bytes, 1);		/* hex digits for low nibble */
+
+		/* fallback if a character below ASCII '0' is found. */
+		if (svptest_any(pred, svorr_z(pred, svcmplt(pred, high, '0'),
+											svcmplt(pred, low, '0'))))
+			break;
+
+		/* fallback if invalid hexadecimal digit is found */
+		if (!get_hex_sve(pred, high, &high) || !get_hex_sve(pred, low, &low))
+			break;
+
+		/* left-shift high and perform bitwise OR with low to form the byte */
+		svst1(pred, (uint8 *) dst, svorr_z(pred, svlsl_x(pred, high, 4), low));
+
+		processed = svcntp_b8(pred, pred) * 2;
+		src += processed;
+		i += processed;
+		dst += processed / 2;
+	}
+
+	if (i < len)	/* fall back */
+		return dst - p + hex_decode_safe_scalar(src, len - i, dst, escontext);
+
+	return dst - p;
+}
+#endif							/* USE_SVE_WITH_RUNTIME_CHECK */
+
 static uint64
 hex_enc_len(const char *src, size_t srclen)
 {
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 07b2f798ab..b5096c11f4 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -648,6 +648,9 @@
 /* Define to 1 to use AVX-512 popcount instructions with a runtime check. */
 #undef USE_AVX512_POPCNT_WITH_RUNTIME_CHECK
 
+/* Define to 1 to use SVE instructions for hex coding with a runtime check. */
+#undef USE_SVE_WITH_RUNTIME_CHECK
+
 /* Define to 1 to build with Bonjour support. (--with-bonjour) */
 #undef USE_BONJOUR
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 1c98c7d225..e9b1f963dd 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -35,11 +35,60 @@ extern int	errdatatype(Oid datatypeOid);
 extern int	errdomainconstraint(Oid datatypeOid, const char *conname);
 
 /* encode.c */
-extern uint64 hex_encode(const char *src, size_t len, char *dst);
-extern uint64 hex_decode(const char *src, size_t len, char *dst);
-extern uint64 hex_decode_safe(const char *src, size_t len, char *dst,
+extern uint64 hex_encode_scalar(const char *src, size_t len, char *dst);
+extern uint64 hex_decode_scalar(const char *src, size_t len, char *dst);
+extern uint64 hex_decode_safe_scalar(const char *src, size_t len, char *dst,
 							  Node *escontext);
 
+/*
+ * We can use SVE intrinsics for hex-coding, but only if we can
+ * verify that the CPU supports it via a runtime check.
+ */
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+extern PGDLLIMPORT uint64 (*hex_encode_optimized)
+	   (const char *src, size_t len, char *dst);
+extern PGDLLIMPORT uint64 (*hex_decode_optimized)
+	   (const char *src, size_t len, char *dst);
+extern PGDLLIMPORT uint64 (*hex_decode_safe_optimized)
+	   (const char *src, size_t len, char *dst, Node *escontext);
+#endif		/* USE_SVE_WITH_RUNTIME_CHECK */
+
+static inline uint64
+hex_encode(const char *src, size_t len, char *dst)
+{
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+	int	threshold = 16;
+
+	if (len >= threshold)
+		return hex_encode_optimized(src, len, dst);
+#endif
+	return hex_encode_scalar(src, len, dst);
+}
+
+static inline uint64
+hex_decode(const char *src, size_t len, char *dst)
+{
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+	int	threshold = 32;
+
+	if (len >= threshold)
+		return hex_decode_optimized(src, len, dst);
+#endif
+	return hex_decode_scalar(src, len, dst);
+}
+
+static inline uint64
+hex_decode_safe(const char *src, size_t len, char *dst, Node *escontext)
+{
+#ifdef USE_SVE_WITH_RUNTIME_CHECK
+	int	threshold = 32;
+
+	if (len >= threshold)
+		return hex_decode_safe_optimized(src, len, dst, escontext);
+#endif
+	return hex_decode_safe_scalar(src, len, dst, escontext);
+}
+
 /* int.c */
 extern int2vector *buildint2vector(const int16 *int2s, int n);
 
-- 
2.34.1

